assistance-engine/Docker/src/utils/classifier_export.py

139 lines
4.9 KiB
Python

"""
classifier_export.py — ADR-0008 Phase 1 + ADR-0010 automatic retraining trigger
Exports classify_history_store to JSONL when the session count reaches
EXPORT_THRESHOLD. Each exported record is a labeled (query, type) pair.
After a successful export, triggers the Champion/Challenger retraining pipeline
(ADR-0010) in a background thread if RETRAIN_ON_EXPORT=true.
Export format (one JSON object per line):
{"query": "60-char topic snippet", "type": "RETRIEVAL", "session_id": "abc"}
The store is flushed after export to prevent unbounded memory growth.
Files are written to CLASSIFIER_EXPORT_DIR (default: /data/classifier_labels/).
"""
import json
import logging
import os
import subprocess
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
EXPORT_THRESHOLD = int(os.getenv("CLASSIFIER_EXPORT_THRESHOLD", "100"))
EXPORT_DIR = Path(os.getenv("CLASSIFIER_EXPORT_DIR", "/data/classifier_labels"))
RETRAIN_ON_EXPORT = os.getenv("RETRAIN_ON_EXPORT", "false").lower() == "true"
RETRAIN_SCRIPT = Path(os.getenv("RETRAIN_SCRIPT_PATH",
"/app/scripts/pipelines/classifier/retrain_pipeline.py"))
_export_lock = threading.Lock()
def maybe_export(classify_history_store: dict) -> bool:
"""Check session count and export if threshold is reached.
Called after every request. Thread-safe. Returns True if export occurred.
Flushes the store after a successful export.
"""
if len(classify_history_store) < EXPORT_THRESHOLD:
return False
with _export_lock:
# Re-check inside lock — another thread may have already exported
if len(classify_history_store) < EXPORT_THRESHOLD:
return False
return _do_export(classify_history_store)
def _do_export(classify_history_store: dict) -> bool:
try:
EXPORT_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
path = EXPORT_DIR / f"classifier_labels_{ts}.jsonl"
records = []
for session_id, entries in classify_history_store.items():
for entry in entries:
records.append({
"query": entry.get("topic", ""),
"type": entry.get("type", ""),
"session_id": session_id,
})
with open(path, "w", encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
n_sessions = len(classify_history_store)
n_records = len(records)
classify_history_store.clear()
logger.info(
f"[classifier_export] exported {n_records} records "
f"from {n_sessions} sessions → {path}"
)
if RETRAIN_ON_EXPORT:
_trigger_retrain()
return True
except Exception as e:
logger.error(f"[classifier_export] export failed: {e}", exc_info=True)
return False
def _trigger_retrain() -> None:
"""Launch retrain_pipeline.py in a background thread.
Runs as a subprocess so it does not block the gRPC server. The engine
continues serving requests while retraining happens. The new model is
loaded on the next engine restart — live reload is not performed to avoid
serving inconsistent results mid-session.
"""
def _run():
if not RETRAIN_SCRIPT.exists():
logger.warning(
f"[classifier_export] retrain script not found at {RETRAIN_SCRIPT} "
"— set RETRAIN_SCRIPT_PATH or disable RETRAIN_ON_EXPORT"
)
return
try:
logger.info(f"[classifier_export] launching retraining pipeline → {RETRAIN_SCRIPT}")
result = subprocess.run(
[sys.executable, str(RETRAIN_SCRIPT)],
capture_output=True,
text=True,
timeout=600, # 10 min hard limit
)
if result.returncode == 0:
logger.info("[classifier_export] retraining completed — restart engine to load new model")
else:
logger.error(
f"[classifier_export] retraining failed (exit {result.returncode})\n"
f"{result.stderr[-500:]}"
)
except subprocess.TimeoutExpired:
logger.error("[classifier_export] retraining timed out after 600s")
except Exception as e:
logger.error(f"[classifier_export] retraining error: {e}", exc_info=True)
threading.Thread(target=_run, daemon=True, name="retrain-pipeline").start()
def force_export(classify_history_store: dict) -> bool:
"""Export immediately regardless of threshold. Useful for shutdown hooks."""
if not classify_history_store:
logger.info("[classifier_export] nothing to export")
return False
with _export_lock:
return _do_export(classify_history_store)