From 81125eae2abb2f942e8e671c7e761dbbf7a32ebd Mon Sep 17 00:00:00 2001 From: rafa-ruiz Date: Sat, 11 Apr 2026 12:54:46 -0700 Subject: [PATCH] ADR0010 --- .gitignore | 4 + Docker/docker-compose.yaml | 10 +- Docker/protos/brunix.proto | 7 + Docker/requirements.txt | 3 + Docker/src/graph.py | 184 +++++++++- Docker/src/server.py | 26 +- Docker/src/utils/classifier_export.py | 138 ++++++++ ...8-adaptive-query-routing-intent-history.md | 27 +- .../ADR-0009-per-type-response-validation.md | 335 ++++++++++++++++++ ...R-0010-classifier-continuous-retraining.md | 156 ++++++++ docs/ARCHITECTURE.md | 83 +++-- .../PRD-0003-adaptive-query-routing.md | 21 +- scripts/pipelines/classifier/README.md | 208 +++++++++++ scripts/pipelines/classifier/requirements.txt | 4 + .../pipelines/classifier/retrain_pipeline.py | 271 ++++++++++++++ .../classifier/seed_classifier_dataset.jsonl | 95 +++++ .../pipelines/classifier/train_classifier.py | 204 +++++++++++ 17 files changed, 1716 insertions(+), 60 deletions(-) create mode 100644 Docker/src/utils/classifier_export.py create mode 100644 docs/ADR/ADR-0009-per-type-response-validation.md create mode 100644 docs/ADR/ADR-0010-classifier-continuous-retraining.md create mode 100644 scripts/pipelines/classifier/README.md create mode 100644 scripts/pipelines/classifier/requirements.txt create mode 100644 scripts/pipelines/classifier/retrain_pipeline.py create mode 100644 scripts/pipelines/classifier/seed_classifier_dataset.jsonl create mode 100644 scripts/pipelines/classifier/train_classifier.py diff --git a/.gitignore b/.gitignore index 3b1d3b3..916163b 100644 --- a/.gitignore +++ b/.gitignore @@ -122,6 +122,10 @@ celerybeat.pid # Environments .env .venv + +# Classifier — model binaries and production label exports +Docker/data/ +*.pkl env/ venv/ ENV/ diff --git a/Docker/docker-compose.yaml b/Docker/docker-compose.yaml index 7716065..25f4af7 100644 --- a/Docker/docker-compose.yaml +++ b/Docker/docker-compose.yaml @@ -7,6 +7,9 @@ services: ports: - "50052:50051" - "8000:8000" + volumes: + - ./data:/data + environment: ELASTICSEARCH_URL: ${ELASTICSEARCH_URL} ELASTICSEARCH_INDEX: ${ELASTICSEARCH_INDEX} @@ -16,9 +19,14 @@ services: LANGFUSE_SECRET_KEY: ${LANGFUSE_SECRET_KEY} OLLAMA_URL: ${OLLAMA_URL} OLLAMA_MODEL_NAME: ${OLLAMA_MODEL_NAME} + OLLAMA_MODEL_NAME_CONVERSATIONAL: ${OLLAMA_MODEL_NAME_CONVERSATIONAL} OLLAMA_EMB_MODEL_NAME: ${OLLAMA_EMB_MODEL_NAME} + CLASSIFIER_EXPORT_THRESHOLD: ${CLASSIFIER_EXPORT_THRESHOLD} + CLASSIFIER_EXPORT_DIR: ${CLASSIFIER_EXPORT_DIR} + CLASSIFIER_MODEL_PATH: ${CLASSIFIER_MODEL_PATH} + CLASSIFIER_CONFIDENCE_THRESHOLD: ${CLASSIFIER_CONFIDENCE_THRESHOLD} PROXY_THREAD_WORKERS: 10 - + extra_hosts: - "host.docker.internal:host-gateway" diff --git a/Docker/protos/brunix.proto b/Docker/protos/brunix.proto index 99b4487..cef3cb2 100644 --- a/Docker/protos/brunix.proto +++ b/Docker/protos/brunix.proto @@ -40,6 +40,13 @@ message AgentRequest { string extra_context = 5; string user_info = 6; + + // ── Caller-declared type (v3 — ADR-0008 Phase 3) ───────────────────────── + // Optional. When set by the caller, all three classifier layers are bypassed + // and the engine routes directly using this value. + // Valid values: RETRIEVAL | CODE_GENERATION | CONVERSATIONAL | PLATFORM + // Empty string (default) means: run the normal L1 → L2 → L3 pipeline. + string query_type = 7; } message AgentResponse { diff --git a/Docker/requirements.txt b/Docker/requirements.txt index 5ff3ce9..c126884 100644 --- a/Docker/requirements.txt +++ b/Docker/requirements.txt @@ -95,6 +95,9 @@ joblib==1.5.3 # via # model2vec # nltk + # scikit-learn +scikit-learn>=1.4.0 + # via assistance-engine (ADR-0008 Layer 2 classifier) jsonpatch==1.33 # via langchain-core jsonpointer==3.0.0 diff --git a/Docker/src/graph.py b/Docker/src/graph.py index 3e558f2..1789277 100644 --- a/Docker/src/graph.py +++ b/Docker/src/graph.py @@ -1,11 +1,22 @@ import logging +import os +import re as _re from collections import defaultdict +from pathlib import Path + from elasticsearch import Elasticsearch from langchain_core.documents import Document from langchain_core.messages import AIMessage, SystemMessage, HumanMessage, BaseMessage from langgraph.graph import END, StateGraph from langgraph.graph.state import CompiledStateGraph +try: + import joblib + import numpy as _np + _LAYER2_AVAILABLE = True +except ImportError: + _LAYER2_AVAILABLE = False + from prompts import ( CLASSIFY_PROMPT_TEMPLATE, CODE_GENERATION_PROMPT, @@ -221,10 +232,118 @@ def _format_intent_history(classify_history: list) -> str: return "\n".join(lines) +_RC01_PREFIXES = ( + # System-injected prefixes — always PLATFORM (RC-01) + "you are a direct and concise assistant", +) + +_RC02_KEYWORDS = ( + # Account / subscription vocabulary (RC-02) + "usage percentage", + "project usage", + "account usage", + "your quota", + "your subscription", + "your plan", + "your account", + "your limits", + "your consumption", + "api calls", + "llamadas este mes", + "uso de la cuenta", + "plan contratado", + "límite de", + "limite de", +) + +# RC-02 structural signals: "N%" in context of usage/quota/consumption +_RC02_PERCENTAGE_RE = _re.compile( + r'\b\d+(\.\d+)?\s*%', # any percentage figure +) +_RC02_USAGE_CONTEXT_WORDS = ( + "usage", "quota", "consumption", "limit", "billing", + "uso", "cuota", "consumo", "límite", "facturación", + "project", "account", "plan", "subscription", +) + + def _is_platform_query(question: str) -> bool: - """Fast-path: skip LLM classifier for known platform prompt prefixes.""" + """Deterministic fast-path for PLATFORM classification. + + RC-01: known system-injected prefixes → always PLATFORM, no LLM call. + RC-02: messages containing account/metrics/usage signals → always PLATFORM, + regardless of conversation history or LLM classifier output. + + Both rules are hard-coded here, not in the prompt, so they cannot be + overridden by model behavior. + """ q = question.strip().lower() - return "you are a direct and concise assistant" in q + + # RC-01 — known platform-injected prefixes + if any(prefix in q for prefix in _RC01_PREFIXES): + return True + + # RC-02 — vocabulary signals + if any(kw in q for kw in _RC02_KEYWORDS): + return True + + # RC-02 — structural signal: percentage figure + usage context word + if _RC02_PERCENTAGE_RE.search(q): + if any(ctx in q for ctx in _RC02_USAGE_CONTEXT_WORDS): + return True + + return False + + +# ── Layer 2 — embedding classifier (ADR-0008 Phase 2) ───────────────────────── + +_LAYER2_MODEL_PATH = os.getenv("CLASSIFIER_MODEL_PATH", "/data/classifier_model.pkl") +_LAYER2_CONFIDENCE_THRESHOLD = float(os.getenv("CLASSIFIER_CONFIDENCE_THRESHOLD", "0.85")) +_layer2_model: dict | None = None + + +def _load_layer2_model() -> None: + """Load the serialized Layer 2 classifier from disk (called once at startup). + + Safe to call even if the model file does not exist yet — the engine falls + back to the LLM classifier (Layer 3) for every request. + """ + global _layer2_model + if not _LAYER2_AVAILABLE: + logger.info("[classifier/L2] joblib/numpy not available — Layer 2 disabled") + return + p = Path(_LAYER2_MODEL_PATH) + if p.exists(): + _layer2_model = joblib.load(p) + meta = {k: v for k, v in _layer2_model.items() if k != "clf"} + logger.info(f"[classifier/L2] model loaded from {p} — {meta}") + else: + logger.info(f"[classifier/L2] model not found at {p} — using LLM fallback only") + + +def _classify_layer2(query: str, embeddings) -> str | None: + """Embed query and classify via Layer 2 model. + + Returns the predicted type if confidence >= threshold, else None (falls + through to the LLM classifier, Layer 3). + + Never raises — any error returns None to preserve the Layer 3 fallback. + """ + if _layer2_model is None or not _LAYER2_AVAILABLE: + return None + try: + vec = _np.array(embeddings.embed_query(query)).reshape(1, -1) + proba = _layer2_model["clf"].predict_proba(vec)[0] + conf = float(proba.max()) + if conf >= _LAYER2_CONFIDENCE_THRESHOLD: + label = _layer2_model["classes"][int(proba.argmax())] + logger.info(f"[classifier/L2] → {label} (conf={conf:.2f})") + return label + logger.info(f"[classifier/L2] low confidence ({conf:.2f}) → LLM fallback") + return None + except Exception as e: + logger.warning(f"[classifier/L2] error: {e} — falling through to LLM") + return None def _parse_query_type(raw: str) -> tuple[str, bool]: @@ -263,8 +382,20 @@ def build_graph(llm, embeddings, es_client, index_name, llm_conversational=None) topic_snippet = question.strip()[:60].replace("\n", " ") + # Phase 3 — caller-declared type: bypass all layers (ADR-0008 RC) + declared = state.get("query_type", "") + if declared in ("RETRIEVAL", "CODE_GENERATION", "CONVERSATIONAL", "PLATFORM"): + logger.info(f"[classify] caller-declared → {declared}") + entry: ClassifyEntry = {"type": declared, "topic": topic_snippet} + return { + "query_type": declared, + "use_editor_context": False, + "classify_history": classify_history + [entry], + } + + # Layer 1 — hard rules (RC-01, RC-02) if _is_platform_query(question): - logger.info(f"[classify] platform prefix detected -> PLATFORM") + logger.info("[classify] L1 → PLATFORM") entry: ClassifyEntry = {"type": "PLATFORM", "topic": topic_snippet} return { "query_type": "PLATFORM", @@ -272,13 +403,24 @@ def build_graph(llm, embeddings, es_client, index_name, llm_conversational=None) "classify_history": classify_history + [entry], } + # Layer 2 — embedding classifier (ADR-0008 Phase 2) + l2_type = _classify_layer2(question, embeddings) + if l2_type: + entry: ClassifyEntry = {"type": l2_type, "topic": topic_snippet} + return { + "query_type": l2_type, + "use_editor_context": False, + "classify_history": classify_history + [entry], + } + + # Layer 3 — LLM classifier (bootstrap fallback) intent_history_text = _format_intent_history(classify_history) prompt_content = _build_classify_prompt(question, intent_history_text, selected_text) resp = llm.invoke([SystemMessage(content=prompt_content)]) raw = resp.content.strip().upper() query_type, use_editor_ctx = _parse_query_type(raw) - logger.info(f"[classify] selected={bool(selected_text)} raw='{raw}' -> {query_type} editor={use_editor_ctx}") + logger.info(f"[classify] L3 selected={bool(selected_text)} raw='{raw}' -> {query_type} editor={use_editor_ctx}") entry: ClassifyEntry = {"type": query_type, "topic": topic_snippet} return { @@ -438,10 +580,23 @@ def build_prepare_graph(llm, embeddings, es_client, index_name): history_msgs = messages[:-1] selected_text = state.get("selected_text", "") + classify_history = state.get("classify_history") or [] + topic_snippet = question.strip()[:60].replace("\n", " ") + + # Phase 3 — caller-declared type: bypass all layers (ADR-0008 RC) + declared = state.get("query_type", "") + if declared in ("RETRIEVAL", "CODE_GENERATION", "CONVERSATIONAL", "PLATFORM"): + logger.info(f"[prepare/classify] caller-declared → {declared}") + entry: ClassifyEntry = {"type": declared, "topic": topic_snippet} + return { + "query_type": declared, + "use_editor_context": False, + "classify_history": classify_history + [entry], + } + + # Layer 1 — hard rules (RC-01, RC-02) if _is_platform_query(question): - logger.info(f"[prepare/classify] platform prefix detected -> PLATFORM") - classify_history = state.get("classify_history") or [] - topic_snippet = question.strip()[:60].replace("\n", " ") + logger.info("[prepare/classify] L1 → PLATFORM") entry: ClassifyEntry = {"type": "PLATFORM", "topic": topic_snippet} return { "query_type": "PLATFORM", @@ -449,15 +604,24 @@ def build_prepare_graph(llm, embeddings, es_client, index_name): "classify_history": classify_history + [entry], } - classify_history = state.get("classify_history") or [] - topic_snippet = question.strip()[:60].replace("\n", " ") + # Layer 2 — embedding classifier (ADR-0008 Phase 2) + l2_type = _classify_layer2(question, embeddings) + if l2_type: + entry: ClassifyEntry = {"type": l2_type, "topic": topic_snippet} + return { + "query_type": l2_type, + "use_editor_context": False, + "classify_history": classify_history + [entry], + } + + # Layer 3 — LLM classifier (bootstrap fallback) intent_history_text = _format_intent_history(classify_history) prompt_content = _build_classify_prompt(question, intent_history_text, selected_text) resp = llm.invoke([SystemMessage(content=prompt_content)]) raw = resp.content.strip().upper() query_type, use_editor_ctx = _parse_query_type(raw) - logger.info(f"[prepare/classify] selected={bool(selected_text)} raw='{raw}' -> {query_type} editor={use_editor_ctx}") + logger.info(f"[prepare/classify] L3 selected={bool(selected_text)} raw='{raw}' -> {query_type} editor={use_editor_ctx}") entry: ClassifyEntry = {"type": query_type, "topic": topic_snippet} return { "query_type": query_type, diff --git a/Docker/src/server.py b/Docker/src/server.py index e1ea744..0014edf 100644 --- a/Docker/src/server.py +++ b/Docker/src/server.py @@ -14,7 +14,8 @@ from langchain_core.messages import AIMessage from utils.llm_factory import create_chat_model from utils.emb_factory import create_embedding_model -from graph import build_graph, build_prepare_graph, build_final_messages, session_store, classify_history_store +from graph import build_graph, build_prepare_graph, build_final_messages, session_store, classify_history_store, _load_layer2_model +from utils.classifier_export import maybe_export, force_export from evaluate import run_evaluation @@ -87,6 +88,7 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): index_name = self.index_name, ) + _load_layer2_model() logger.info("Brunix Engine initialized.") @@ -113,11 +115,13 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): extra_context = "" logger.warning("[AskAgent] extra_context base64 decode failed") - user_info = request.user_info or "{}" + user_info = request.user_info or "{}" + query_type = request.query_type or "" logger.info( f"[AskAgent] session={session_id} " f"editor={bool(editor_content)} selected={bool(selected_text)} " + f"declared_type={query_type or 'none'} " f"query='{query[:80]}'" ) @@ -131,7 +135,7 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): "session_id": session_id, "reformulated_query": "", "context": "", - "query_type": "", + "query_type": query_type, "classify_history": classify_history, "editor_content": editor_content, @@ -148,6 +152,7 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): if session_id: classify_history_store[session_id] = final_state.get("classify_history", classify_history) + maybe_export(classify_history_store) logger.info(f"[AskAgent] query_type={final_state.get('query_type')} " f"answer='{result_text[:100]}'") @@ -189,11 +194,13 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): extra_context = "" logger.warning("[AskAgent] extra_context base64 decode failed") - user_info = request.user_info or "{}" + user_info = request.user_info or "{}" + query_type = request.query_type or "" logger.info( f"[AskAgentStream] session={session_id} " - f"editor={bool(editor_content)} selected={bool(selected_text)} context={extra_context} " + f"editor={bool(editor_content)} selected={bool(selected_text)} " + f"declared_type={query_type or 'none'} " f"query='{query[:80]}'" ) @@ -207,7 +214,7 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): "session_id": session_id, "reformulated_query": "", "context": "", - "query_type": "", + "query_type": query_type, "classify_history": classify_history, "editor_content": editor_content, @@ -243,6 +250,7 @@ class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer): list(prepared["messages"]) + [AIMessage(content=complete_text)] ) classify_history_store[session_id] = prepared.get("classify_history", classify_history) + maybe_export(classify_history_store) logger.info( f"[AskAgentStream] done — " @@ -323,7 +331,11 @@ def serve(): server.add_insecure_port("[::]:50051") logger.info("[ENGINE] listen on 50051 (gRPC)") server.start() - server.wait_for_termination() + try: + server.wait_for_termination() + finally: + force_export(classify_history_store) + logger.info("[ENGINE] classifier labels flushed on shutdown") if __name__ == "__main__": diff --git a/Docker/src/utils/classifier_export.py b/Docker/src/utils/classifier_export.py new file mode 100644 index 0000000..079bc66 --- /dev/null +++ b/Docker/src/utils/classifier_export.py @@ -0,0 +1,138 @@ +""" +classifier_export.py — ADR-0008 Phase 1 + ADR-0010 automatic retraining trigger + +Exports classify_history_store to JSONL when the session count reaches +EXPORT_THRESHOLD. Each exported record is a labeled (query, type) pair. + +After a successful export, triggers the Champion/Challenger retraining pipeline +(ADR-0010) in a background thread if RETRAIN_ON_EXPORT=true. + +Export format (one JSON object per line): + {"query": "60-char topic snippet", "type": "RETRIEVAL", "session_id": "abc"} + +The store is flushed after export to prevent unbounded memory growth. +Files are written to CLASSIFIER_EXPORT_DIR (default: /data/classifier_labels/). +""" + +import json +import logging +import os +import subprocess +import sys +import threading +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +EXPORT_THRESHOLD = int(os.getenv("CLASSIFIER_EXPORT_THRESHOLD", "100")) +EXPORT_DIR = Path(os.getenv("CLASSIFIER_EXPORT_DIR", "/data/classifier_labels")) +RETRAIN_ON_EXPORT = os.getenv("RETRAIN_ON_EXPORT", "false").lower() == "true" +RETRAIN_SCRIPT = Path(os.getenv("RETRAIN_SCRIPT_PATH", + "/app/scripts/pipelines/classifier/retrain_pipeline.py")) + +_export_lock = threading.Lock() + + +def maybe_export(classify_history_store: dict) -> bool: + """Check session count and export if threshold is reached. + + Called after every request. Thread-safe. Returns True if export occurred. + Flushes the store after a successful export. + """ + if len(classify_history_store) < EXPORT_THRESHOLD: + return False + + with _export_lock: + # Re-check inside lock — another thread may have already exported + if len(classify_history_store) < EXPORT_THRESHOLD: + return False + + return _do_export(classify_history_store) + + +def _do_export(classify_history_store: dict) -> bool: + try: + EXPORT_DIR.mkdir(parents=True, exist_ok=True) + + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + path = EXPORT_DIR / f"classifier_labels_{ts}.jsonl" + + records = [] + for session_id, entries in classify_history_store.items(): + for entry in entries: + records.append({ + "query": entry.get("topic", ""), + "type": entry.get("type", ""), + "session_id": session_id, + }) + + with open(path, "w", encoding="utf-8") as f: + for record in records: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + + n_sessions = len(classify_history_store) + n_records = len(records) + + classify_history_store.clear() + + logger.info( + f"[classifier_export] exported {n_records} records " + f"from {n_sessions} sessions → {path}" + ) + + if RETRAIN_ON_EXPORT: + _trigger_retrain() + + return True + + except Exception as e: + logger.error(f"[classifier_export] export failed: {e}", exc_info=True) + return False + + +def _trigger_retrain() -> None: + """Launch retrain_pipeline.py in a background thread. + + Runs as a subprocess so it does not block the gRPC server. The engine + continues serving requests while retraining happens. The new model is + loaded on the next engine restart — live reload is not performed to avoid + serving inconsistent results mid-session. + """ + def _run(): + if not RETRAIN_SCRIPT.exists(): + logger.warning( + f"[classifier_export] retrain script not found at {RETRAIN_SCRIPT} " + "— set RETRAIN_SCRIPT_PATH or disable RETRAIN_ON_EXPORT" + ) + return + try: + logger.info(f"[classifier_export] launching retraining pipeline → {RETRAIN_SCRIPT}") + result = subprocess.run( + [sys.executable, str(RETRAIN_SCRIPT)], + capture_output=True, + text=True, + timeout=600, # 10 min hard limit + ) + if result.returncode == 0: + logger.info("[classifier_export] retraining completed — restart engine to load new model") + else: + logger.error( + f"[classifier_export] retraining failed (exit {result.returncode})\n" + f"{result.stderr[-500:]}" + ) + except subprocess.TimeoutExpired: + logger.error("[classifier_export] retraining timed out after 600s") + except Exception as e: + logger.error(f"[classifier_export] retraining error: {e}", exc_info=True) + + threading.Thread(target=_run, daemon=True, name="retrain-pipeline").start() + + +def force_export(classify_history_store: dict) -> bool: + """Export immediately regardless of threshold. Useful for shutdown hooks.""" + if not classify_history_store: + logger.info("[classifier_export] nothing to export") + return False + with _export_lock: + return _do_export(classify_history_store) diff --git a/docs/ADR/ADR-0008-adaptive-query-routing-intent-history.md b/docs/ADR/ADR-0008-adaptive-query-routing-intent-history.md index 1cc0d44..f78dea4 100644 --- a/docs/ADR/ADR-0008-adaptive-query-routing-intent-history.md +++ b/docs/ADR/ADR-0008-adaptive-query-routing-intent-history.md @@ -1,7 +1,8 @@ # ADR-0008: Adaptive Query Routing — Taxonomy, Contract, and Classifier Strategy **Date:** 2026-04-09 -**Status:** Accepted +**Last updated:** 2026-04-10 +**Status:** Implemented **Deciders:** Rafael Ruiz (CTO) **Related ADRs:** ADR-0002 (Two-Phase Streaming), ADR-0003 (Hybrid Retrieval RRF) @@ -103,7 +104,11 @@ Adding a prefix requires updating `_PLATFORM_PATTERNS` and this list. If the query contains usage percentages, account metrics, consumption figures, quota data, or billing information, the output **MUST** be `PLATFORM` regardless of history or classifier confidence. -In the current bootstrap implementation this is enforced via ``. In the future discriminative classifier it should be a hard pre-filter in Layer 1. +**Implementation status: hard-coded in `_is_platform_query()` (2026-04-10).** Two detection mechanisms: +- Vocabulary signals: keywords like `"usage percentage"`, `"your quota"`, `"your account"`, `"api calls"`, etc. (see `_RC02_KEYWORDS` in `graph.py`) +- Structural signal: regex `\b\d+(\.\d+)?\s*%` (any percentage figure) combined with a usage context word (`usage`, `quota`, `consumption`, `limit`, `billing`, etc.) + +This rule is no longer enforced via `` in the prompt — it is deterministic code and cannot be overridden by model behavior. ### RC-03 — Intent history scoping (priority: medium) @@ -224,14 +229,16 @@ The LLM classifier is the **teacher**. The embedding classifier is the **student The platform generates `PLATFORM` prompts and knows the type at generation time. Adding `query_type` to `AgentRequest` (proto field 7) lets the caller declare the type explicitly, bypassing all three layers. This makes RC-01 and RC-02 redundant for platform-generated traffic. +**Implementation status: complete (2026-04-10).** `query_type` is proto field 7. When set, the classify node in both `build_graph` and `build_prepare_graph` returns the declared type immediately without invoking any layer. + ### Convergence path -| Phase | What changes | Layer 3 traffic | -|---|---|---| -| Now — bootstrap | LLM classifier for all unmatched queries | ~95% | -| Phase 1 | Collect labels via `classify_history_store` | ~95% | -| Phase 2 | Deploy embedding classifier (Layer 2) | ~10–20% | -| Phase 3 | Caller-declared type for platform prompts | <5% | -| Phase 4 | LLM classifier as anomaly handler only | <2% | +| Phase | What changes | Layer 3 traffic | Status | +|---|---|---|---| +| Bootstrap | LLM classifier for all unmatched queries | ~95% | ✅ Implemented | +| Phase 1 | Collect labels via `classify_history_store` + export JSONL | ~95% | ✅ Complete — `classifier_export.py` | +| Phase 2 | Deploy embedding classifier (Layer 2) | ~10–20% | ✅ Complete — bge-m3 + LogisticRegression, seed dataset, `_classify_layer2()` in `graph.py` | +| Phase 3 | Caller-declared type for platform prompts | <5% | ✅ Complete — proto field 7, bypasses all layers | +| Phase 4 | LLM classifier as anomaly handler only | <2% | ⏳ Production outcome — no code change required | -Phase 2 is the highest-leverage step: it replaces the dominant code path (LLM inference per request) with CPU-only inference, with no change to the routing contract or the downstream graph. +Phase 2 is the highest-leverage step: it replaces the dominant code path (LLM inference per request) with CPU-only inference, with no change to the routing contract or the downstream graph. Phase 4 is reached when the platform adopts Phase 3 and Layer 2 is retrained with production data. diff --git a/docs/ADR/ADR-0009-per-type-response-validation.md b/docs/ADR/ADR-0009-per-type-response-validation.md new file mode 100644 index 0000000..8877a12 --- /dev/null +++ b/docs/ADR/ADR-0009-per-type-response-validation.md @@ -0,0 +1,335 @@ +# ADR-0009: Per-Type Response Validation Layer + +**Date:** 2026-04-10 +**Status:** Accepted +**Deciders:** Rafael Ruiz (CTO) +**Related ADRs:** ADR-0007 (MSVL for RAG Evaluation), ADR-0008 (Adaptive Query Routing), ADR-0003 (Hybrid Retrieval RRF) + +--- + +## Context + +### Problem 1 — Syntactically invalid code reaches users + +ADR-0007 documents that 10–16% of `CODE_GENERATION` responses contain syntactically invalid AVAP code — foreign language injection (Go, Python, JavaScript) or hallucinated commands (`getSHA256`, `readParam`, `returnResult`). The LLM judge used in `EvaluateRAG` does not detect these failures because it evaluates semantic coherence, not syntactic validity. + +This problem exists in production today. A user receiving a `CODE_GENERATION` response has no indication that the generated code would fail on the PLATON kernel. + +The AVAP Parser gRPC service — established in ADR-0007 as a hard dependency of the evaluation pipeline — is already available in the stack. It returns not just `VALID / INVALID` but a **complete line-by-line execution trace** on failure: + +``` +Line 3: unknown command 'getSHA256' — expected known identifier +Line 7: unexpected construct 'for i in range(...)' — AVAP loop syntax required +Line 12: 'returnResult' not defined — did you mean 'addResult'? +``` + +This trace is structured, specific, and directly actionable by the LLM. A retry informed by the parser trace is fundamentally different from a blind retry — the model knows exactly what failed and where. + +### Problem 2 — Context relevance is not evaluated pre-generation + +The engine retrieves 8 chunks from Elasticsearch for every `RETRIEVAL` and `CODE_GENERATION` query without checking whether those chunks actually answer the question. `CONFIDENCE_PROMPT_TEMPLATE` has been scaffolded in `prompts.py` since the initial implementation but is not wired into the graph. + +Undetected low-relevance retrieval produces responses that are semantically fluent but factually ungrounded — the model generates plausible-sounding AVAP explanations or code not supported by the retrieved documentation. + +### Why these are one ADR + +Both problems share the same architectural response: adding validation nodes to the production graph with type-specific logic and feedback-informed retry. The decision is one — **add a per-type validation layer** — and the implementation shares the same graph positions (post-retrieve, post-generate), the same retry contract (maximum 1 retry per request), and the same rationale (the engine must not silently return responses it has evidence to question). + +Splitting into separate ADRs would produce two documents that cannot be understood independently. + +--- + +## Decision + +Add a **Per-Type Response Validation Layer (PTVL)** to the production LangGraph pipeline. Each query type has a distinct validation strategy matching its failure modes. + +### Validation contract by type + +| Type | When | What | Mechanism | +|---|---|---|---| +| `CODE_GENERATION` | Post-generation | Syntactic validity of generated AVAP code | AVAP Parser gRPC — deterministic | +| `RETRIEVAL` | Pre-generation | Relevance of retrieved context to the query | LLM relevance check — `CONFIDENCE_PROMPT_TEMPLATE` | +| `CONVERSATIONAL` | None | — | No retrieval, no code generated | +| `PLATFORM` | None | — | No retrieval, no code generated | + +--- + +### Decision 1 — CODE_GENERATION: parser validation with trace-guided retry + +#### Flow + +``` +generate_code node + │ + ▼ +[V1] AVAP Parser gRPC + │ + ├── VALID ──────────────────────────────► return response + │ + └── INVALID + line-by-line trace + │ + ▼ + [inject trace into retry prompt] + │ + ▼ + generate_code_retry node (1 attempt only) + │ + ▼ + [V2] AVAP Parser gRPC + │ + ├── VALID ──────────────────────► return response + │ + └── INVALID ────────────────────► return response + validation_status flag +``` + +#### Trace-guided retry + +The parser trace is injected into the generation prompt as a structured correction context: + +``` + +The previous attempt produced invalid AVAP code. Specific failures: + +Line 3: unknown command 'getSHA256' — expected known identifier +Line 7: unexpected construct 'for i in range(...)' — AVAP loop syntax required + +Correct these errors. Do not repeat the same constructs. + +``` + +This is not a blind retry. The LLM receives the exact failure points and can target its corrections. ADR-0007 documented the mapping between common hallucinated commands and their valid AVAP equivalents (`getSHA256` → `encodeSHA256`, `returnResult` → `addResult`, etc.) — the trace makes these corrections automatic without hardcoding the mapping. + +#### Parser SLA + +Inherited from ADR-0007: ≤2 seconds per call. **Silent fallback is permitted in production** (unlike evaluation, where ADR-0007 mandates abort). The distinction is that evaluation scores must be trustworthy; production responses degrade gracefully. + +#### Parser availability — circuit breaker + +A single timeout or connection error does not mean the parser is down. A sustained outage does. Hammering an unavailable gRPC service on every `CODE_GENERATION` request adds latency to every user request with zero benefit. + +The engine implements a **circuit breaker** with three states: + +``` +CLOSED ──[N consecutive failures]──► OPEN +OPEN ──[cooldown expires]────────► HALF-OPEN +HALF-OPEN ──[probe succeeds]────────► CLOSED +HALF-OPEN ──[probe fails]───────────► OPEN +``` + +| Parameter | Default | Env var | +|---|---|---| +| Failure threshold to open | 3 consecutive failures | `PARSER_CB_THRESHOLD` | +| Cooldown before half-open | 30 seconds | `PARSER_CB_COOLDOWN` | +| Timeout per call | 2 seconds | `AVAP_PARSER_TIMEOUT` | + +While the circuit is **OPEN**, `CODE_GENERATION` responses are returned immediately with `validation_status = PARSER_UNAVAILABLE` — no gRPC call is attempted. The cooldown prevents thundering-herd reconnection attempts. + +``` +[ptvl] circuit OPEN — skipping parser, returning unvalidated +[ptvl] circuit HALF-OPEN — probing parser availability +[ptvl] circuit CLOSED — parser reachable +``` + +Setting `AVAP_PARSER_TIMEOUT=0` permanently opens the circuit — disables parser validation entirely. Useful during development when the parser service is not deployed. + +#### New environment variables + +``` +AVAP_PARSER_URL=grpc://... # URL of AVAP Parser gRPC service +AVAP_PARSER_TIMEOUT=2 # seconds per call; 0 = disable validation +PARSER_CB_THRESHOLD=3 # consecutive failures before circuit opens +PARSER_CB_COOLDOWN=30 # seconds before circuit attempts half-open probe +``` + +--- + +### Decision 2 — RETRIEVAL: context relevance check with reformulation retry + +#### Flow + +``` +retrieve node + │ + ▼ +[V1] CONFIDENCE_PROMPT_TEMPLATE + YES / NO + │ + ├── YES ────────────────────────────────► generate node (normal path) + │ + └── NO + │ + ▼ + reformulate_with_hint node + [reformulate query signalling context was insufficient] + │ + ▼ + retrieve (retry) + │ + ▼ + generate node (regardless of second retrieval result) +``` + +On second retrieval, generation proceeds unconditionally. The retry is a single best-effort improvement, not a gate. The model generates with whatever context is available — the user receives a response, not a refusal. + +#### Relevance check + +`CONFIDENCE_PROMPT_TEMPLATE` evaluates whether the retrieved context contains at least one passage relevant to the question. It returns `YES` or `NO` — no graduated score. + +The relevance check is only applied to `RETRIEVAL` queries here. `CODE_GENERATION` has the parser as a stronger post-generation signal; the pre-generation relevance check would add latency with lower signal value than the parser provides. + +#### Reformulation hint + +When context is insufficient, the reformulation node receives a hint that standard context was not found. This produces a semantically different reformulation — broader synonyms, alternative phrasings — rather than a near-duplicate of the original query. + +``` +[CONTEXT_INSUFFICIENT] +The previous retrieval did not return relevant context for: "{original_query}" +Reformulate this query using broader terms or alternative phrasing. +``` + +--- + +## Graph changes + +### New nodes + +| Node | Graph | Trigger | +|---|---|---| +| `validate_code` | `build_graph` | After `generate_code` | +| `generate_code_retry` | `build_graph` | After `validate_code` when INVALID | +| `check_context_relevance` | `build_graph` + `build_prepare_graph` | After `retrieve`, before `generate` (RETRIEVAL only) | +| `reformulate_with_hint` | `build_graph` + `build_prepare_graph` | After `check_context_relevance` when NO | + +### Updated flow — `build_graph` + +```mermaid +flowchart TD + START([start]) --> CL[classify] + + CL -->|RETRIEVAL| RF[reformulate] + CL -->|CODE_GENERATION| RF + CL -->|CONVERSATIONAL| RC[respond_conversational] + CL -->|PLATFORM| RP[respond_platform] + + RF --> RT[retrieve] + + RT -->|CODE_GENERATION| GC[generate_code] + RT -->|RETRIEVAL| CR{check_context\nrelevance} + + CR -->|YES| GE[generate] + CR -->|NO| RH[reformulate_with_hint] + RH --> RT2[retrieve retry] + RT2 --> GE + + GC --> VC{validate_code\nParser gRPC} + VC -->|VALID| END([end]) + VC -->|INVALID + trace| GCR[generate_code_retry\ntrace-guided] + GCR --> VC2{validate_code\nParser gRPC} + VC2 -->|VALID| END + VC2 -->|INVALID| END + + GE --> END + RC --> END + RP --> END +``` + +--- + +## New AgentState fields + +```python +class AgentState(TypedDict): + ... + # PTVL fields + parser_trace: str # raw parser trace from first validation attempt (empty if valid) + validation_status: str # see validation status values below + context_relevant: bool # result of CONFIDENCE_PROMPT check (RETRIEVAL only) +``` + +### Validation status values + +| Value | Meaning | When set | +|---|---|---| +| `""` (empty) | Valid — no issues detected | Parser returned VALID on first or second attempt | +| `INVALID_UNRESOLVED` | Parser ran, code failed both attempts | Two parser calls made, both returned INVALID | +| `PARSER_UNAVAILABLE` | Parser was unreachable or circuit is open | No parser call was made or all calls timed out | + +These are semantically distinct signals. `INVALID_UNRESOLVED` means the engine has evidence the code is wrong. `PARSER_UNAVAILABLE` means the engine has no evidence either way — the code may be correct. Clients must not treat them equivalently. + +`validation_status` is surfaced to the client via `AgentResponse`: + +```protobuf +message AgentResponse { + string text = 1; + string avap_code = 2; + bool is_final = 3; + string validation_status = 4; // "" | "INVALID_UNRESOLVED" | "PARSER_UNAVAILABLE" +} +``` + +Clients that do not read `validation_status` are unaffected — the field defaults to empty string. + +--- + +## Routing contract additions (RC-07, RC-08) + +These rules extend the contract defined in ADR-0008. + +### RC-07 — Parser validation gate (priority: high) + +Every `CODE_GENERATION` response **MUST** be submitted to the AVAP Parser before delivery to the client, unless `AVAP_PARSER_TIMEOUT=0` or the parser service is unreachable. + +``` +route(q) = CODE_GENERATION → parser_validate(response) before yield +``` + +A `CODE_GENERATION` response returned without parser validation due to parser unavailability **MUST** be logged as `[ptvl] parser unavailable — returning unvalidated`. + +### RC-08 — Retry budget (priority: medium) + +Each request has a maximum of **1 retry** regardless of type. A `CODE_GENERATION` request that fails parser validation twice returns the second attempt with `validation_status=true`. A `RETRIEVAL` request whose context is insufficient reformulates once and generates unconditionally on the second retrieval. + +No request may enter more than one retry cycle. + +--- + +## Consequences + +### Positive + +- Syntactically invalid AVAP code no longer reaches users silently. `validation_status` gives the client a typed signal: `INVALID_UNRESOLVED` (evidence of bad code) vs `PARSER_UNAVAILABLE` (no evidence either way) — clients can respond differently to each. +- The parser trace makes retries targeted rather than blind — the LLM corrects specific lines, not the whole response. +- Circuit breaker prevents parser outages from adding latency to every `CODE_GENERATION` request. After 3 consecutive failures the engine stops trying for 30 seconds. +- Context relevance check catches retrievals that return topically adjacent but non-answering chunks, reducing fluent-but-ungrounded responses. +- `AVAP_PARSER_TIMEOUT=0` allows development without the parser service — no hard dependency at startup. + +### Negative / Trade-offs + +- **`CODE_GENERATION` latency**: +1 parser gRPC call per request (~50–200ms for valid code). +1 LLM generation call + 1 parser call on invalid code (~1–2s additional). +- **`RETRIEVAL` latency**: +1 LLM call (relevance check) on every request. At `qwen3:1.7b` local inference, this adds ~300–500ms to every RETRIEVAL request — not negligible. +- The parser becomes a **soft production dependency** for CODE_GENERATION. Parser outages degrade validation silently; monitoring must alert on sustained `parser unavailable` log volume. +- The context relevance check is a **generative model doing a binary classification task** — the same architectural mismatch noted in ADR-0008 for the classifier. It is the correct interim solution while no discriminative relevance model exists. + +### Open questions + +1. **`RETRIEVAL` latency budget**: The +300–500ms from the relevance LLM call may be unacceptable for the VS Code extension use case where streaming latency is user-visible. A discriminative relevance model (embedding similarity between query vector and context vector, cosine threshold) would be ~1ms and eliminate this cost entirely. Deferred to a future amendment. + +2. **`validation_status` UX**: The proto field is defined but the client behavior is not specified. What should the VS Code extension or AVS Platform display when `validation_status=true`? Requires a product decision outside this ADR's scope. + +3. **Parser version pinning**: Inherited from ADR-0007 open question 2. Parser upgrades may alter what is considered valid AVAP. A policy for handling parser version changes in the production pipeline has not been defined. + +--- + +## Future Path + +The context relevance check for `RETRIEVAL` (Decision 2) uses a generative LLM for a discriminative task — the same pattern that ADR-0008 identified as tactical debt for the classifier. The correct steady-state implementation is a cosine similarity threshold between the query embedding vector and the average context embedding vector: + +``` +relevance_score = cosine(embed(query), mean(embed(chunks))) +if relevance_score < RELEVANCE_THRESHOLD: + reformulate_with_hint() +``` + +This runs in microseconds using the `bge-m3` embeddings already computed during retrieval. It replaces the `CONFIDENCE_PROMPT_TEMPLATE` LLM call entirely and eliminates the +300–500ms latency penalty on every RETRIEVAL request. + +**Trigger for this upgrade:** once the RETRIEVAL validation LLM call appears as a measurable latency contribution in Langfuse traces. diff --git a/docs/ADR/ADR-0010-classifier-continuous-retraining.md b/docs/ADR/ADR-0010-classifier-continuous-retraining.md new file mode 100644 index 0000000..9bdedcb --- /dev/null +++ b/docs/ADR/ADR-0010-classifier-continuous-retraining.md @@ -0,0 +1,156 @@ +# ADR-0010: Classifier Continuous Retraining — Champion/Challenger Pipeline + +**Date:** 2026-04-10 +**Status:** Accepted +**Deciders:** Rafael Ruiz (CTO) +**Related ADRs:** ADR-0008 (Adaptive Query Routing — Layer 2 classifier), ADR-0009 (Per-Type Response Validation) + +--- + +## Context + +ADR-0008 Phase 2 deployed a Layer 2 embedding classifier trained on a **seed dataset of 94 hand-crafted examples**. This model works well for the initial distribution of queries but has two structural limitations: + +1. **The seed dataset does not reflect production traffic.** Hand-crafted examples are idealized. Real users ask questions with typos, mixed languages, ambiguous phrasing, and domain-specific vocabulary that is not in the seed. + +2. **The model never improves without manual intervention.** The data flywheel (ADR-0008 Phase 1) accumulates labeled examples automatically via `classify_history_store`, but nothing uses them. Data piles up in `/data/classifier_labels/` and the model stays frozen at its initial accuracy. + +The consequence is that Layer 2 confidence degrades over time relative to the actual query distribution, pushing more requests to Layer 3 (LLM classifier) than necessary. + +--- + +## Decision + +Implement a **Champion/Challenger automatic retraining pipeline** that triggers every time a new batch of labeled data is exported. + +### Core design + +```mermaid +flowchart TD + S[classify_history_store\naccumulates sessions] -->|100 sessions| EX[classifier_export.py\nexport JSONL] + EX -->|RETRAIN_ON_EXPORT=true| RT[retrain_pipeline.py\nbackground thread] + + RT --> LOAD[Load seed + all exports] + LOAD --> EMB[Embed with bge-m3] + EMB --> SPLIT[Stratified 80/20 split\ntrain / held-out] + + SPLIT --> CV[Cross-validate challenger\nStratifiedKFold] + CV -->|CV < 0.90| ABORT[Abort — do not deploy\nlog alert] + CV -->|CV ≥ 0.90| TRAIN[Train challenger\non full train split] + + TRAIN --> EVAL_CH[Evaluate challenger\non held-out set] + EVAL_CH --> EVAL_CP[Evaluate champion\non same held-out set] + + EVAL_CP --> DECISION{challenger ≥ champion?} + DECISION -->|yes| BACKUP[Backup champion] + BACKUP --> DEPLOY[Deploy challenger\noverwrite CLASSIFIER_MODEL_PATH] + DEPLOY --> ARCHIVE[Archive processed exports] + + DECISION -->|no| KEEP[Keep champion\nlog alert\ndiscard challenger] + KEEP --> ARCHIVE +``` + +### Champion/Challenger semantics + +The model currently in production is the **champion**. The newly trained model is the **challenger**. The challenger is only promoted if its accuracy on the held-out set is **greater than or equal to** the champion's accuracy on the same set. + +This guarantees that the production model never regresses. A retraining run triggered by noisy or unbalanced data will produce a challenger that loses the comparison and is discarded automatically. + +If no champion exists (first deployment), the challenger is promoted unconditionally provided CV accuracy ≥ 0.90. + +### Trigger + +The pipeline is triggered by `classifier_export.py` after every successful export — which happens every `CLASSIFIER_EXPORT_THRESHOLD` sessions (default: **100**). + +The pipeline runs in a **background daemon thread** inside the engine container. It does not block gRPC request handling. A 10-minute hard timeout prevents runaway retraining from consuming resources indefinitely. + +### Model loading after promotion + +The engine does **not** hot-reload the model mid-operation. The new champion is written to `CLASSIFIER_MODEL_PATH` and loaded on the next engine restart. This is intentional: + +- Hot-reload would require locking around every inference call, adding latency. +- Mid-session model changes could produce inconsistent classification for the same user within a conversation. +- Docker container restarts are cheap and already part of the deployment workflow. + +The engine logs a clear message after promotion: + +``` +[classifier_export] retraining completed — restart engine to load new model +``` + +### Safety mechanisms + +| Mechanism | Purpose | +|---|---| +| CV accuracy gate (≥ 0.90) | Rejects challengers trained on insufficient or unbalanced data before the held-out comparison | +| Champion comparison on held-out | Prevents regression — challenger must equal or beat the current production model | +| Champion backup before overwrite | `classifier_model_backup_{timestamp}.pkl` — roll back manually if needed | +| Export archiving | Processed files moved to `CLASSIFIER_ARCHIVE_DIR` — prevents re-inclusion in future runs | +| 10-minute subprocess timeout | Prevents runaway retraining from blocking the engine indefinitely | +| `RETRAIN_ON_EXPORT=false` | Disables automatic retraining without code changes — useful in staging or during debugging | + +--- + +## Environment variables + +| Variable | Default | Purpose | +|---|---|---| +| `CLASSIFIER_EXPORT_THRESHOLD` | `100` | Sessions before export + retrain trigger | +| `RETRAIN_ON_EXPORT` | `true` | Enable/disable automatic retraining | +| `RETRAIN_SCRIPT_PATH` | `/app/scripts/pipelines/classifier/retrain_pipeline.py` | Path to retrain script inside container | +| `CLASSIFIER_ARCHIVE_DIR` | `/data/classifier_labels/archived` | Where processed exports are moved after retraining | +| `CLASSIFIER_SEED_DATASET` | `/app/scripts/pipelines/classifier/seed_classifier_dataset.jsonl` | Seed dataset always included in retraining | +| `CLASSIFIER_MIN_CV_ACCURACY` | `0.90` | Minimum CV accuracy for challenger to proceed | +| `CLASSIFIER_HELD_OUT_RATIO` | `0.20` | Fraction of merged dataset reserved for champion/challenger comparison | + +--- + +## Files + +| File | Role | +|---|---| +| `scripts/pipelines/classifier/retrain_pipeline.py` | Champion/Challenger training, evaluation, promotion, and archiving | +| `Docker/src/utils/classifier_export.py` | Export trigger — launches `retrain_pipeline.py` in background after export | +| `scripts/pipelines/classifier/seed_classifier_dataset.jsonl` | Always included in retraining — anchors the model on known-good examples | + +--- + +## Convergence behavior + +Each retraining cycle merges the seed dataset with all accumulated production exports. As production traffic grows, the model progressively reflects real user queries rather than the hand-crafted seed. + +```mermaid +flowchart LR + T0["Cycle 0\n94 seed examples\nCV 1.0 on seed"] --> + T1["Cycle 1\n94 + ~100 production\nreal query distribution"] --> + T2["Cycle 2\n94 + ~200 production\nincreasing coverage"] --> + TN["Cycle N\nseed becomes minority\nmodel reflects production traffic"] +``` + +The seed dataset is never removed — it acts as a regularizer that prevents the model from drifting entirely to production distribution edge cases. + +--- + +## Consequences + +### Positive + +- Layer 2 accuracy improves automatically with usage — no human intervention required after initial deployment. +- The champion/challenger gate prevents production regressions from noisy batches. +- `RETRAIN_ON_EXPORT=false` provides a complete off switch without code changes. +- Export archiving keeps `/data/classifier_labels/` clean — only unprocessed exports accumulate. +- The backup mechanism allows manual rollback in the rare case a promoted challenger performs unexpectedly in production. + +### Negative / Trade-offs + +- **Retraining uses Ollama (bge-m3) on the host.** The retrain script runs inside the container but needs `OLLAMA_LOCAL_URL` reachable. If Ollama is down at retraining time, the pipeline fails and logs an error — the champion is unchanged. +- **The engine requires a restart to load the new model.** Promotions are invisible to users until restart. In low-traffic periods this is acceptable; high-traffic deployments may need an orchestrated restart strategy. +- **Accumulated exports grow unboundedly if retraining fails.** If the pipeline consistently fails (e.g., Ollama unreachable), exports accumulate in `/data/classifier_labels/` without being archived. A monitoring alert on directory size is recommended. + +### Open questions + +1. **Orchestrated restart policy:** Who triggers the engine restart after a promotion? Currently manual. Could be automated via a health-check endpoint that returns a `model_updated` flag, allowing the orchestrator to restart when traffic is low. + +2. **Held-out set stability:** The held-out set is resampled on every retraining cycle from the merged dataset. As the dataset grows, the held-out set changes between cycles, making champion accuracy scores not directly comparable across cycles. A fixed held-out set (frozen after the first N examples) would improve comparability. Deferred. + +3. **Class imbalance over time:** Production traffic may not be balanced across the four query types. If `CODE_GENERATION` is rare in production, its representation in the training set shrinks relative to the seed. The CV gate catches catastrophic imbalance but not gradual drift. A per-class recall threshold could be added to the gate. diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index c7afd61..c40522a 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -1,8 +1,8 @@ # Brunix Assistance Engine — Architecture Reference > **Audience:** Engineers contributing to this repository, architects reviewing the system design, and operators responsible for its deployment. -> **Last updated:** 2026-04-09 -> **Version:** 1.8.x +> **Last updated:** 2026-04-10 +> **Version:** 1.9.x > **Architect:** Rafael Ruiz (CTO, 101OBEX Corp) > **Related ADRs:** ADR-0001 · ADR-0002 · ADR-0003 · ADR-0004 · ADR-0005 · ADR-0006 · ADR-0007 · ADR-0008 > **Related PRDs:** PRD-0001 · PRD-0002 · PRD-0003 @@ -116,13 +116,15 @@ Langfuse is the exception — it has a public IP (`45.77.119.180`) and is access |---|---|---| | gRPC server | `server.py` | Entry point for all AI requests. Manages session store, model selection, and state initialization | | HTTP proxy | `openai_proxy.py` | OpenAI + Ollama compatible HTTP layer. Translates REST → gRPC | -| LangGraph orchestrator | `graph.py` | Builds and executes the agentic routing graph | +| LangGraph orchestrator | `graph.py` | Builds and executes the agentic routing graph. Hosts L1, L2, and L3 classifier layers | | Prompt definitions | `prompts.py` | All prompt templates in one place: classifier, reformulator, generators, platform | | Agent state | `state.py` | `AgentState` TypedDict shared across all graph nodes | | LLM factory | `utils/llm_factory.py` | Provider-agnostic model instantiation (Ollama, OpenAI, Anthropic, Bedrock) | | Embedding factory | `utils/emb_factory.py` | Provider-agnostic embedding model instantiation | +| Classifier export | `utils/classifier_export.py` | Exports `classify_history_store` to labeled JSONL when threshold is reached. Data flywheel for Layer 2 retraining | | Evaluation pipeline | `evaluate.py` | RAGAS evaluation with Claude as judge | | Proto contract | `protos/brunix.proto` | Source of truth for the gRPC API | +| Classifier training | `scripts/pipelines/classifier/train_classifier.py` | Offline script. Embeds labeled queries with bge-m3, trains LogisticRegression, serializes model | **Model slots:** @@ -199,21 +201,36 @@ The classifier is the most critical node. It determines the entire execution pat ### Classification pipeline +Three-layer pipeline. Each layer is only invoked if the previous one cannot produce a confident answer. + ```mermaid flowchart TD - Q[Incoming query] --> FP{Fast-path check\n_is_platform_query} - FP -->|known platform prefix| PLATFORM[PLATFORM\nno LLM call] - FP -->|no match| LLM[LLM Classifier\nqwen3:1.7b] + Q([Incoming query]) --> CD{Caller-declared\nquery_type?} + CD -->|proto field 7 set| DECL[Use declared type\nbypass all layers] + CD -->|empty| L1 - LLM --> IH[Intent history\nlast 6 entries as context] - IH --> OUT[Output: TYPE + EDITOR/NO_EDITOR] + L1{"Layer 1\nHard rules\nRC-01 · RC-02\nO(1) deterministic"} + L1 -->|match| R1[Classification result] + L1 -->|no match| L2 - OUT --> R[RETRIEVAL] - OUT --> C[CODE_GENERATION] - OUT --> V[CONVERSATIONAL] - OUT --> PL[PLATFORM] + L2{"Layer 2\nEmbedding classifier\nbge-m3 + LogisticRegression\n~1ms · CPU · no LLM"} + L2 -->|confidence ≥ 0.85| R1 + L2 -->|confidence < 0.85| L3 + + L3{"Layer 3\nLLM classifier\nqwen3:1.7b\nfallback for ambiguous queries"} + L3 --> R1 + + DECL --> R1 + R1 --> RT[RETRIEVAL] + R1 --> CG[CODE_GENERATION] + R1 --> CV[CONVERSATIONAL] + R1 --> PL[PLATFORM] ``` +**Caller-declared type (Phase 3):** When the calling system already knows the query type — for example, the AVS Platform generating a `PLATFORM` prompt — it sets `query_type` in the proto request. All three classifier layers are skipped entirely. + +**Layer 2** is loaded from `CLASSIFIER_MODEL_PATH` (`/data/classifier_model.pkl`) at startup. If the file does not exist, the engine starts normally and uses Layer 3 only. + ### Intent history — solving anchoring bias The classifier does not receive raw conversation messages. It receives a compact trace of prior classifications: @@ -386,16 +403,23 @@ Implemented in PRD-0003. Queries about account, metrics, usage, or billing bypas ```mermaid flowchart TD - Q[Incoming query] --> FP{_is_platform_query?} - FP -->|yes — known prefix| SKIP[skip classifier LLM\nroute = PLATFORM] - FP -->|no| CL[LLM classifier] - CL -->|PLATFORM| ROUTE[route to respond_platform] - SKIP --> ROUTE + Q[Incoming query] --> CD{query_type\ndeclared in proto?} + CD -->|yes| DECL[bypass all layers\nroute = declared type] + CD -->|no| FP + + FP{Layer 1\n_is_platform_query?\nRC-01 · RC-02} + FP -->|yes| SKIP[skip L2 + L3\nroute = PLATFORM] + FP -->|no| L2[Layer 2 + Layer 3\nnormal classification] + L2 -->|PLATFORM| ROUTE + + DECL -->|PLATFORM| ROUTE + SKIP --> ROUTE[route to respond_platform] ROUTE --> PROMPT["PLATFORM_PROMPT\n+ extra_context injection\n+ user_info available"] PROMPT --> LLM[qwen3:0.6b\nconversational model slot] LLM --> RESP[response] + style DECL fill:#2d6a4f,color:#fff style SKIP fill:#2d6a4f,color:#fff style ROUTE fill:#2d6a4f,color:#fff ``` @@ -422,7 +446,7 @@ flowchart LR CHS --> CE ``` -**`classify_history_store` is also a data flywheel.** Every session generates labeled `(topic, type)` pairs automatically. When sufficient sessions accumulate (~500), this store can be exported to train the Layer 2 embedding classifier described in ADR-0008 Future Path — eliminating the need for the LLM classifier on the majority of requests. +**`classify_history_store` is the data flywheel for Layer 2 retraining.** Every session generates labeled `(topic, type)` pairs automatically. `utils/classifier_export.py` exports them to JSONL when `CLASSIFIER_EXPORT_THRESHOLD` sessions accumulate (default: 500), and flushes on shutdown. These files feed directly into `scripts/pipelines/classifier/train_classifier.py` for retraining Layer 2 with production data — no manual labeling required. ### AgentState fields @@ -524,21 +548,24 @@ Langfuse is accessed directly via public IP — no kubectl tunnel required. The | Item | Description | ADR | |---|---|---| -| LLM classifier | Generative model doing discriminative work — non-deterministic, pays full inference cost for a 4-class label | ADR-0008 | -| RC-02 is soft | Platform data signal enforced via prompt ``, not code — can be overridden by model | ADR-0008 | -| `classify_history` not exported | Data flywheel accumulates but has no export mechanism yet | ADR-0008 | +| LLM classifier (Layer 3) | Still the dominant path until Layer 2 is retrained with production data and confidence improves | ADR-0008 | | `user_info` unused | `dev_id`, `project_id`, `org_id` are in state but not consumed by any graph node | PRD-0002 | | `CONFIDENCE_PROMPT_TEMPLATE` unused | Self-RAG capability is scaffolded in `prompts.py` but not wired into the graph | — | +| Layer 2 seed dataset only | Current model trained on 94 hand-crafted examples. Must be retrained with production exports to reduce L3 fallback rate | ADR-0008 | -### Roadmap (ADR-0008 Future Path) +### ADR-0008 implementation status ```mermaid flowchart LR - P0["Now\nLLM classifier\n~95% of traffic"] --> - P1["Phase 1\nExport classify_history_store\nlabeled dataset"] --> - P2["Phase 2\nEmbedding classifier Layer 2\nbge-m3 + logistic regression\n~1ms CPU"] --> - P3["Phase 3\nCaller-declared query_type\nproto field 7"] --> - P4["Phase 4\nLLM classifier = anomaly handler\n<2% of traffic"] + P0["Bootstrap\nLLM classifier\n✅ Implemented"] --> + P1["Phase 1\nData flywheel\nclassifier_export.py\n✅ Complete"] --> + P2["Phase 2\nLayer 2 — bge-m3\n+ LogisticRegression\n✅ Complete"] --> + P3["Phase 3\nCaller-declared type\nproto field 7\n✅ Complete"] --> + P4["Phase 4\nLLM = anomaly handler\n<2% traffic\n⏳ Production outcome"] ``` -Target steady-state: the LLM classifier handles fewer than 2% of requests — only genuinely ambiguous queries that neither hard rules nor the trained embedding classifier can resolve with confidence. +Phase 4 is not a code deliverable — it is a production state reached when the platform client sends `query_type` for generated prompts and Layer 2 confidence improves with production data. Monitor with: + +```bash +docker logs brunix-assistance-engine 2>&1 | grep "classifier/L" | grep -c "L3" +``` diff --git a/docs/product/PRD-0003-adaptive-query-routing.md b/docs/product/PRD-0003-adaptive-query-routing.md index b5aea2e..35cfead 100644 --- a/docs/product/PRD-0003-adaptive-query-routing.md +++ b/docs/product/PRD-0003-adaptive-query-routing.md @@ -1,7 +1,8 @@ # PRD-0003: Adaptive Query Routing with Platform Intent and Model Specialization **Date:** 2026-04-09 -**Status:** Implemented +**Last updated:** 2026-04-10 +**Status:** Implemented (Phases 1–3 complete) **Requested by:** Rafael Ruiz (CTO) **Purpose:** Route platform queries correctly and reduce inference cost for non-RAG requests **Related ADR:** ADR-0008 (Adaptive Query Routing — Taxonomy, Contract, and Classifier Strategy) @@ -100,11 +101,15 @@ A user asks a general AVAP question, then `"en menos palabras"`. The engine clas - Select `active_llm` per request in `AskAgentStream` based on `query_type` - Add `classify_history` field to `AgentState` and `ClassifyEntry` type to `state.py` +**Also in scope (ADR-0008 Phases 1–3):** +- Export `classify_history_store` to labeled JSONL when threshold is reached (`classifier_export.py`) — data flywheel for Layer 2 retraining +- Embedding classifier Layer 2: bge-m3 + LogisticRegression trained on seed dataset, loaded at startup, intercepts queries before LLM with ≥0.85 confidence threshold +- Caller-declared `query_type`: proto field 7 in `AgentRequest` — when set, all three classifier layers are bypassed + **Out of scope:** - Changes to `EvaluateRAG` — golden dataset does not include platform queries - `user_info` consumption in graph logic — available in state, not yet acted upon - Sub-typing `PLATFORM` (e.g. `PLATFORM_METRICS` vs `PLATFORM_BILLING`) — deferred -- Proto changes to add caller-declared `query_type` field — deferred to Phase 3 of Future Path (ADR-0008) --- @@ -151,6 +156,10 @@ Classifier prompt additions: |---|---|---| | `OLLAMA_MODEL_NAME` | Main model: RETRIEVAL + CODE_GENERATION | required | | `OLLAMA_MODEL_NAME_CONVERSATIONAL` | Light model: CONVERSATIONAL + PLATFORM | falls back to `OLLAMA_MODEL_NAME` | +| `CLASSIFIER_EXPORT_THRESHOLD` | Sessions before auto-export of labeled data | `500` | +| `CLASSIFIER_EXPORT_DIR` | Directory for exported JSONL label files | `/data/classifier_labels` | +| `CLASSIFIER_MODEL_PATH` | Path to serialized Layer 2 model | `/data/classifier_model.pkl` | +| `CLASSIFIER_CONFIDENCE_THRESHOLD` | Minimum confidence for Layer 2 to classify (else fall through to LLM) | `0.85` | --- @@ -169,10 +178,14 @@ Classifier prompt additions: | Log line | Expected | |---|---| -| `[classify] platform prefix detected -> PLATFORM` | Fast-path firing for known prefixes | -| `[prepare/classify] ... -> PLATFORM` | LLM classifier correctly routing platform queries | +| `[classify] L1 → PLATFORM` | Layer 1 fast-path firing for known prefixes | +| `[classifier/L2] → PLATFORM (conf=0.97)` | Layer 2 classifying with high confidence | +| `[classifier/L2] low confidence (0.72) → LLM fallback` | Layer 2 falling through to Layer 3 | +| `[classify] caller-declared → PLATFORM` | Phase 3 — caller bypassing all layers | +| `[prepare/classify] L3 ... -> PLATFORM` | Layer 3 LLM classifier correctly routing | | `[AskAgentStream] query_type=PLATFORM context_len=0` | Zero retrieval for PLATFORM | | `[hybrid] RRF -> N final docs` | Must NOT appear for PLATFORM queries | +| `[classifier/L2] model loaded from /data/classifier_model.pkl` | Layer 2 loaded at startup | --- diff --git a/scripts/pipelines/classifier/README.md b/scripts/pipelines/classifier/README.md new file mode 100644 index 0000000..bc92500 --- /dev/null +++ b/scripts/pipelines/classifier/README.md @@ -0,0 +1,208 @@ +# Layer 2 Classifier — Training Pipeline + +**Author:** Rafael Ruiz (CTO, 101OBEX Corp) +**Related:** ADR-0008 — Adaptive Query Routing + +Part of **ADR-0008 Phase 2**: trains the embedding-based classifier that intercepts +queries before they reach the LLM (Layer 3), reducing per-request Ollama calls to +near zero for well-represented query types. + +--- + +## Overview + +The classifier embeds each query with **bge-m3** (already running in the stack), +trains a **LogisticRegression** on the resulting vectors, and serializes the model +with joblib. At engine startup, `graph.py` loads the model and uses it as Layer 2 +in the classification pipeline. + +```mermaid +flowchart TD + Q([Query]) --> L1 + + L1["Layer 1 — Hard rules\nRC-01 · RC-02\nO(1), deterministic"] + L1 -->|match| R([Classification result]) + L1 -->|no match| L2 + + L2["Layer 2 — Embedding classifier\nbge-m3 + LogisticRegression\n~1ms · CPU only · no LLM"] + L2 -->|confidence ≥ 0.85| R + L2 -->|confidence < 0.85| L3 + + L3["Layer 3 — LLM classifier\nOllama fallback\n~300–800ms"] + L3 --> R +``` + +If the model file does not exist, the engine starts normally and uses L3 only. + +--- + +## Files + +| File | Purpose | +|---|---| +| `train_classifier.py` | Training script | +| `seed_classifier_dataset.jsonl` | Labeled dataset (seed, 100 examples) | +| `requirements.txt` | Python dependencies for the training venv | + +--- + +## Setup + +```bash +cd scripts/pipelines/classifier + +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +--- + +## Running the training + +```bash +python train_classifier.py +``` + +Default behaviour (no flags needed if running from this directory): +- **data**: `seed_classifier_dataset.jsonl` in the same folder +- **ollama**: `http://localhost:11434` (or `$OLLAMA_LOCAL_URL`) +- **output**: `/data/classifier_model.pkl` (or `$CLASSIFIER_MODEL_PATH`) +- **min CV accuracy**: 0.90 + +The script exits with code 1 if CV accuracy is below the threshold — the model is +**not saved** in that case. + +### All arguments + +| Argument | Default | Description | +|---|---|---| +| `--data` | `seed_classifier_dataset.jsonl` | Path to labeled JSONL dataset | +| `--output` | `/data/classifier_model.pkl` | Output path for serialized model | +| `--ollama` | `http://localhost:11434` | Ollama base URL | +| `--min-cv-accuracy` | `0.90` | Minimum CV accuracy to save the model | + +### Custom output path (recommended when running from host) + +```bash +python train_classifier.py \ + --output ../../../Docker/data/classifier_model.pkl +``` + +--- + +## Deploying the model to Docker + +```mermaid +sequenceDiagram + participant Host + participant Docker + + Host->>Host: python train_classifier.py --output ./Docker/data/classifier_model.pkl + Host->>Docker: docker cp classifier_model.pkl brunix-assistance-engine:/data/ + Host->>Docker: docker restart brunix-assistance-engine + Docker-->>Host: [classifier/L2] model loaded from /data/classifier_model.pkl +``` + +On startup you will see in the logs: + +``` +[classifier/L2] model loaded from /data/classifier_model.pkl — {'classes': [...], 'n_train': 100, 'cv_mean': 0.96, ...} +``` + +If the model is missing: + +``` +[classifier/L2] model not found at /data/classifier_model.pkl — using LLM fallback only +``` + +--- + +## Dataset format + +Every line is a JSON object with two required fields: + +```json +{"query": "What is addVar in AVAP?", "type": "RETRIEVAL"} +``` + +Valid types: `RETRIEVAL`, `CODE_GENERATION`, `CONVERSATIONAL`, `PLATFORM`. + +Lines with missing `query` or `type`, or invalid JSON, are skipped with a warning. + +### Merging production exports with the seed + +```mermaid +flowchart LR + S[seed_classifier_dataset.jsonl] --> M + E["/data/classifier_labels/\nclassifier_labels_*.jsonl\n(production exports)"] --> M + M([merge]) --> D[merged_dataset.jsonl] + D --> T[train_classifier.py] + T --> P[classifier_model.pkl] + P --> E2[engine restart] +``` + +```bash +cat seed_classifier_dataset.jsonl \ + /data/classifier_labels/classifier_labels_*.jsonl \ + > merged_dataset.jsonl + +python train_classifier.py --data merged_dataset.jsonl +``` + +The engine exports labeled data automatically to `CLASSIFIER_EXPORT_DIR` +(default `/data/classifier_labels/`) once `CLASSIFIER_EXPORT_THRESHOLD` sessions +accumulate. Those files use the same format and can be merged directly. + +--- + +## Expected output + +``` +[1/4] Loading data from seed_classifier_dataset.jsonl + 100 examples loaded + Distribution: {'RETRIEVAL': 25, 'CODE_GENERATION': 25, 'CONVERSATIONAL': 25, 'PLATFORM': 25} + +[2/4] Embedding with bge-m3 via http://localhost:11434 + Embedding batch 1/4 (32 queries)... + Embedding batch 2/4 (32 queries)... + Embedding batch 3/4 (32 queries)... + Embedding batch 4/4 (4 queries)... + Embedding matrix: (100, 1024) + +[3/4] Training LogisticRegression (C=1.0) with 5-fold CV + CV accuracy: 0.970 ± 0.021 (folds: [0.95, 1.0, 0.95, 0.95, 1.0]) + + Per-class report: + precision recall f1-score support + CODE_GENERATION 1.00 0.96 0.98 25 + CONVERSATIONAL 0.96 1.00 0.98 25 + PLATFORM 1.00 1.00 1.00 25 + RETRIEVAL 0.96 0.96 0.96 25 + +[4/4] Saving model to ../../../Docker/data/classifier_model.pkl + Model saved → ../../../Docker/data/classifier_model.pkl + +Done. Classes: ['CODE_GENERATION', 'CONVERSATIONAL', 'PLATFORM', 'RETRIEVAL'] +``` + +--- + +## Troubleshooting + +**`CV accuracy below threshold`** — Add more examples to the underperforming class +(check the per-class recall column). 5–10 extra examples per class usually suffice. + +**`langchain-ollama not installed`** — Run `pip install -r requirements.txt` inside +the venv. + +**Ollama connection error** — Verify Ollama is running and bge-m3 is pulled: + +```bash +curl http://localhost:11434/api/tags | grep bge-m3 +# if missing: +ollama pull bge-m3 +``` + +**Container not picking up the model** — The engine loads the model once at startup. +A `docker restart` is required after every `docker cp`. diff --git a/scripts/pipelines/classifier/requirements.txt b/scripts/pipelines/classifier/requirements.txt new file mode 100644 index 0000000..006ba99 --- /dev/null +++ b/scripts/pipelines/classifier/requirements.txt @@ -0,0 +1,4 @@ +scikit-learn>=1.4.0 +joblib>=1.3.0 +numpy>=1.26.0 +langchain-ollama>=0.2.0 diff --git a/scripts/pipelines/classifier/retrain_pipeline.py b/scripts/pipelines/classifier/retrain_pipeline.py new file mode 100644 index 0000000..a4e42f6 --- /dev/null +++ b/scripts/pipelines/classifier/retrain_pipeline.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +""" +retrain_pipeline.py — Automatic Champion/Challenger retraining (ADR-0010). + +Triggered automatically by classifier_export.py when RETRAIN_THRESHOLD new +sessions accumulate. Can also be run manually. + +Flow: + 1. Merge all JSONL exports in EXPORT_DIR with the seed dataset + 2. Split into train (80%) and held-out (20%) — stratified + 3. Train challenger model + 4. Load champion model (current production model at CLASSIFIER_MODEL_PATH) + 5. Evaluate both on held-out set + 6. If challenger accuracy >= champion accuracy: deploy challenger → champion + 7. If challenger < champion: discard challenger, keep champion, log alert + 8. Archive processed export files to EXPORT_ARCHIVE_DIR + +Usage: + python retrain_pipeline.py # auto mode (uses env vars) + python retrain_pipeline.py --force # skip champion comparison, always deploy + python retrain_pipeline.py --dry-run # evaluate only, do not deploy +""" + +import argparse +import json +import logging +import os +import shutil +from collections import Counter +from datetime import datetime, timezone +from pathlib import Path + +import joblib +import numpy as np +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import accuracy_score +from sklearn.model_selection import StratifiedShuffleSplit, cross_val_score, StratifiedKFold + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [retrain] %(message)s") +logger = logging.getLogger("retrain_pipeline") + +# ── Configuration ────────────────────────────────────────────────────────────── + +EXPORT_DIR = Path(os.getenv("CLASSIFIER_EXPORT_DIR", "/data/classifier_labels")) +EXPORT_ARCHIVE = Path(os.getenv("CLASSIFIER_ARCHIVE_DIR", "/data/classifier_labels/archived")) +CHAMPION_PATH = Path(os.getenv("CLASSIFIER_MODEL_PATH", "/data/classifier_model.pkl")) +CHALLENGER_PATH = CHAMPION_PATH.parent / "classifier_model_challenger.pkl" +SEED_DATASET = Path(os.getenv("CLASSIFIER_SEED_DATASET", + str(Path(__file__).parent / "seed_classifier_dataset.jsonl"))) +OLLAMA_URL = os.getenv("OLLAMA_LOCAL_URL", "http://localhost:11434") +MIN_CV_ACCURACY = float(os.getenv("CLASSIFIER_MIN_CV_ACCURACY", "0.90")) +HELD_OUT_RATIO = float(os.getenv("CLASSIFIER_HELD_OUT_RATIO", "0.20")) + + +# ── Data loading ─────────────────────────────────────────────────────────────── + +def load_jsonl(path: Path) -> tuple[list[str], list[str]]: + queries, labels = [], [] + with open(path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + q = rec.get("query", "").strip() + t = rec.get("type", "").strip() + if q and t: + queries.append(q) + labels.append(t) + except json.JSONDecodeError: + pass + return queries, labels + + +def load_all_data(export_dir: Path, seed_path: Path) -> tuple[list[str], list[str], list[Path]]: + """Load seed dataset + all unarchived export files. Returns (queries, labels, export_files).""" + queries, labels = [], [] + + if seed_path.exists(): + q, l = load_jsonl(seed_path) + queries.extend(q) + labels.extend(l) + logger.info(f"Seed dataset: {len(q)} examples from {seed_path}") + else: + logger.warning(f"Seed dataset not found at {seed_path}") + + export_files = sorted(export_dir.glob("classifier_labels_*.jsonl")) + for f in export_files: + q, l = load_jsonl(f) + queries.extend(q) + labels.extend(l) + logger.info(f"Export file: {len(q)} examples from {f.name}") + + logger.info(f"Total dataset: {len(queries)} examples — {dict(Counter(labels))}") + return queries, labels, export_files + + +# ── Embedding ────────────────────────────────────────────────────────────────── + +def embed_queries(queries: list[str], labels: list[str], base_url: str) -> tuple[np.ndarray, list[str]]: + """Embed with bge-m3, one at a time to handle NaN vectors gracefully.""" + try: + from langchain_ollama import OllamaEmbeddings + except ImportError: + logger.error("langchain-ollama not installed") + raise + + emb = OllamaEmbeddings(model="bge-m3", base_url=base_url) + vectors, kept_labels = [], [] + skipped = 0 + + for i, (query, label) in enumerate(zip(queries, labels)): + try: + vec = emb.embed_query(query) + if any(v != v for v in vec): + skipped += 1 + continue + vectors.append(vec) + kept_labels.append(label) + except Exception as e: + logger.warning(f"Embedding failed for query {i}: {e}") + skipped += 1 + if (i + 1) % 20 == 0: + logger.info(f" Embedded {i+1}/{len(queries)}...") + + if skipped: + logger.warning(f"Skipped {skipped} queries due to NaN or embedding errors") + + return np.array(vectors), kept_labels + + +# ── Training ─────────────────────────────────────────────────────────────────── + +def train_model(X: np.ndarray, y: list[str]) -> LogisticRegression: + clf = LogisticRegression(max_iter=1000, C=1.0, random_state=42) + clf.fit(X, y) + return clf + + +def cross_validate(X: np.ndarray, y: list[str]) -> float: + dist = Counter(y) + n_splits = min(5, min(dist.values())) + if n_splits < 2: + logger.warning("Too few examples per class for cross-validation") + return 0.0 + clf = LogisticRegression(max_iter=1000, C=1.0, random_state=42) + cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42) + scores = cross_val_score(clf, X, y, cv=cv, scoring="accuracy") + logger.info(f"CV accuracy: {scores.mean():.3f} ± {scores.std():.3f} (folds: {scores.round(3).tolist()})") + return float(scores.mean()) + + +# ── Champion evaluation ──────────────────────────────────────────────────────── + +def evaluate_champion(champion_path: Path, X_held: np.ndarray, y_held: list[str]) -> float: + """Evaluate the current champion on the held-out set. Returns accuracy.""" + if not champion_path.exists(): + logger.info("No champion model found — challenger will be deployed unconditionally") + return 0.0 + model = joblib.load(champion_path) + preds = model["clf"].predict(X_held) + acc = accuracy_score(y_held, preds) + logger.info(f"Champion accuracy on held-out set: {acc:.3f} (trained on {model.get('n_train', '?')} examples)") + return acc + + +# ── Archive ──────────────────────────────────────────────────────────────────── + +def archive_exports(export_files: list[Path], archive_dir: Path) -> None: + archive_dir.mkdir(parents=True, exist_ok=True) + for f in export_files: + dest = archive_dir / f.name + shutil.move(str(f), str(dest)) + logger.info(f"Archived {f.name} → {archive_dir}") + + +# ── Main ─────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Champion/Challenger classifier retraining (ADR-0010)") + parser.add_argument("--ollama", default=OLLAMA_URL) + parser.add_argument("--force", action="store_true", help="Deploy challenger without comparing to champion") + parser.add_argument("--dry-run", action="store_true", help="Evaluate only — do not deploy or archive") + parser.add_argument("--min-cv", type=float, default=MIN_CV_ACCURACY) + args = parser.parse_args() + + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + logger.info(f"=== Retraining pipeline started {ts} ===") + + # ── 1. Load data ─────────────────────────────────────────────────────────── + queries, labels, export_files = load_all_data(EXPORT_DIR, SEED_DATASET) + + if len(queries) < 20: + logger.error("Too few examples (< 20). Aborting.") + return + + # ── 2. Embed ─────────────────────────────────────────────────────────────── + logger.info(f"Embedding {len(queries)} queries via bge-m3 at {args.ollama}...") + X, labels = embed_queries(queries, labels, args.ollama) + logger.info(f"Embedding matrix: {X.shape}") + + # ── 3. Stratified train / held-out split ─────────────────────────────────── + sss = StratifiedShuffleSplit(n_splits=1, test_size=HELD_OUT_RATIO, random_state=42) + train_idx, held_idx = next(sss.split(X, labels)) + X_train, X_held = X[train_idx], X[held_idx] + y_train = [labels[i] for i in train_idx] + y_held = [labels[i] for i in held_idx] + logger.info(f"Train: {len(y_train)} | Held-out: {len(y_held)}") + + # ── 4. Cross-validate challenger ─────────────────────────────────────────── + logger.info("Cross-validating challenger...") + cv_acc = cross_validate(X_train, y_train) + + if cv_acc < args.min_cv: + logger.error(f"Challenger CV accuracy {cv_acc:.3f} below minimum {args.min_cv:.2f}. Aborting.") + return + + # ── 5. Train challenger on full train split ──────────────────────────────── + challenger_clf = train_model(X_train, y_train) + challenger_acc = accuracy_score(y_held, challenger_clf.predict(X_held)) + logger.info(f"Challenger accuracy on held-out: {challenger_acc:.3f}") + + # ── 6. Evaluate champion on same held-out set ────────────────────────────── + champion_acc = evaluate_champion(CHAMPION_PATH, X_held, y_held) + + # ── 7. Promotion decision ────────────────────────────────────────────────── + promote = args.force or (challenger_acc >= champion_acc) + + if promote: + reason = "forced" if args.force else f"challenger ({challenger_acc:.3f}) >= champion ({champion_acc:.3f})" + logger.info(f"PROMOTING challenger — {reason}") + + if not args.dry_run: + # Back up champion before overwriting + if CHAMPION_PATH.exists(): + backup = CHAMPION_PATH.parent / f"classifier_model_backup_{ts}.pkl" + shutil.copy(str(CHAMPION_PATH), str(backup)) + logger.info(f"Champion backed up → {backup.name}") + + joblib.dump( + { + "clf": challenger_clf, + "classes": challenger_clf.classes_.tolist(), + "n_train": len(y_train), + "cv_mean": round(cv_acc, 4), + "held_acc": round(challenger_acc, 4), + "champion_acc": round(champion_acc, 4), + "retrained_at": ts, + }, + CHAMPION_PATH, + ) + logger.info(f"New champion saved → {CHAMPION_PATH}") + logger.info("Restart brunix-assistance-engine to load the new model.") + else: + logger.info("[dry-run] Promotion skipped") + else: + logger.warning( + f"KEEPING champion — challenger ({challenger_acc:.3f}) < champion ({champion_acc:.3f}). " + "Consider adding more labeled data." + ) + + # ── 8. Archive processed exports ────────────────────────────────────────── + if not args.dry_run and export_files: + archive_exports(export_files, EXPORT_ARCHIVE) + + logger.info(f"=== Retraining pipeline finished {datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')} ===") + + +if __name__ == "__main__": + main() diff --git a/scripts/pipelines/classifier/seed_classifier_dataset.jsonl b/scripts/pipelines/classifier/seed_classifier_dataset.jsonl new file mode 100644 index 0000000..409331b --- /dev/null +++ b/scripts/pipelines/classifier/seed_classifier_dataset.jsonl @@ -0,0 +1,95 @@ +{"query": "What is addVar in AVAP?", "type": "RETRIEVAL"} +{"query": "How do I declare a variable in AVAP Cloud?", "type": "RETRIEVAL"} +{"query": "What does the pipeline node do?", "type": "RETRIEVAL"} +{"query": "Explain the difference between addVar and addObject", "type": "RETRIEVAL"} +{"query": "What is a virtual API in AVAP?", "type": "RETRIEVAL"} +{"query": "How does the routing engine work in AVAP?", "type": "RETRIEVAL"} +{"query": "What is the purpose of the command node?", "type": "RETRIEVAL"} +{"query": "How do I handle errors in an AVAP flow?", "type": "RETRIEVAL"} +{"query": "What is the difference between a dataset and a variable?", "type": "RETRIEVAL"} +{"query": "How does AVAP handle authentication in API calls?", "type": "RETRIEVAL"} +{"query": "What are the available data types in AVAP?", "type": "RETRIEVAL"} +{"query": "Explain how the loop node works", "type": "RETRIEVAL"} +{"query": "What is addHeader used for?", "type": "RETRIEVAL"} +{"query": "How do I connect to an external REST API from AVAP?", "type": "RETRIEVAL"} +{"query": "What is the AVAP Cloud execution model?", "type": "RETRIEVAL"} +{"query": "How does conditional branching work in AVAP?", "type": "RETRIEVAL"} +{"query": "What is the difference between addRow and addObject?", "type": "RETRIEVAL"} +{"query": "How do I define a reusable function in AVAP?", "type": "RETRIEVAL"} +{"query": "What does the return node do in a pipeline?", "type": "RETRIEVAL"} +{"query": "How does AVAP handle JSON transformations?", "type": "RETRIEVAL"} +{"query": "What is the catalog in AVAP Cloud?", "type": "RETRIEVAL"} +{"query": "How do I paginate results in an AVAP API?", "type": "RETRIEVAL"} +{"query": "What logging options are available in AVAP?", "type": "RETRIEVAL"} +{"query": "How are environment variables managed in AVAP Cloud?", "type": "RETRIEVAL"} +{"query": "What is a connector in AVAP?", "type": "RETRIEVAL"} +{"query": "Write an API endpoint in AVAP that returns a list of users", "type": "CODE_GENERATION"} +{"query": "Generate AVAP code to call an external REST API and return the result", "type": "CODE_GENERATION"} +{"query": "Create a pipeline that reads from a dataset and filters by field", "type": "CODE_GENERATION"} +{"query": "Write a loop in AVAP that iterates over a JSON array", "type": "CODE_GENERATION"} +{"query": "Generate code to authenticate with OAuth2 and call a protected endpoint", "type": "CODE_GENERATION"} +{"query": "Write an AVAP function that transforms a JSON response", "type": "CODE_GENERATION"} +{"query": "Create an API endpoint that accepts POST and validates input", "type": "CODE_GENERATION"} +{"query": "Generate AVAP code to merge two datasets", "type": "CODE_GENERATION"} +{"query": "Write a pipeline that handles errors and returns a custom message", "type": "CODE_GENERATION"} +{"query": "Create a reusable AVAP function that formats dates", "type": "CODE_GENERATION"} +{"query": "Write code to call a database query node and return paginated results", "type": "CODE_GENERATION"} +{"query": "Generate an AVAP endpoint that proxies another API", "type": "CODE_GENERATION"} +{"query": "Write a pipeline that conditionally routes requests based on a field value", "type": "CODE_GENERATION"} +{"query": "Create AVAP code to parse and validate a JWT token", "type": "CODE_GENERATION"} +{"query": "Generate code for a webhook handler in AVAP", "type": "CODE_GENERATION"} +{"query": "Write an AVAP pipeline that aggregates data from multiple API calls", "type": "CODE_GENERATION"} +{"query": "Create a function that maps one JSON schema to another", "type": "CODE_GENERATION"} +{"query": "Write code to implement rate limiting in an AVAP API", "type": "CODE_GENERATION"} +{"query": "Generate an AVAP endpoint with custom response headers", "type": "CODE_GENERATION"} +{"query": "Create a pipeline that retries a failed API call up to 3 times", "type": "CODE_GENERATION"} +{"query": "Can you explain that again more simply?", "type": "CONVERSATIONAL"} +{"query": "What did you mean by that last part?", "type": "CONVERSATIONAL"} +{"query": "Can you summarize what you just said?", "type": "CONVERSATIONAL"} +{"query": "I didn't understand the second point", "type": "CONVERSATIONAL"} +{"query": "Can you give me a shorter version?", "type": "CONVERSATIONAL"} +{"query": "What was the example you mentioned before?", "type": "CONVERSATIONAL"} +{"query": "Can you be more specific about that?", "type": "CONVERSATIONAL"} +{"query": "Repeat the last answer but focus on the part about variables", "type": "CONVERSATIONAL"} +{"query": "That's not what I asked, can you try again?", "type": "CONVERSATIONAL"} +{"query": "OK but what does that mean in practice?", "type": "CONVERSATIONAL"} +{"query": "Can you expand on that?", "type": "CONVERSATIONAL"} +{"query": "I still don't get it, try a different explanation", "type": "CONVERSATIONAL"} +{"query": "What was the difference you mentioned earlier?", "type": "CONVERSATIONAL"} +{"query": "Can you give me an analogy?", "type": "CONVERSATIONAL"} +{"query": "That makes sense, and what about the second case?", "type": "CONVERSATIONAL"} +{"query": "Are you sure about that?", "type": "CONVERSATIONAL"} +{"query": "Say it in one sentence", "type": "CONVERSATIONAL"} +{"query": "Put that in simpler terms", "type": "CONVERSATIONAL"} +{"query": "What was the first thing you said about pipelines?", "type": "CONVERSATIONAL"} +{"query": "OK, now compare both options you described", "type": "CONVERSATIONAL"} +{"query": "Can you elaborate on the error handling part?", "type": "CONVERSATIONAL"} +{"query": "Go back to what you said about connectors", "type": "CONVERSATIONAL"} +{"query": "Give me a bullet point summary of your previous answer", "type": "CONVERSATIONAL"} +{"query": "I need the same explanation but focused on performance", "type": "CONVERSATIONAL"} +{"query": "Can you rephrase that as a step-by-step guide?", "type": "CONVERSATIONAL"} +{"query": "You have a project usage percentage of 20%. Provide an insight.", "type": "PLATFORM"} +{"query": "Your account has consumed 3000 API calls this month", "type": "PLATFORM"} +{"query": "You are a direct and concise assistant. Your quota is at 80%.", "type": "PLATFORM"} +{"query": "Your subscription plan allows 10000 requests per day", "type": "PLATFORM"} +{"query": "Your account usage is at 95% of the monthly limit", "type": "PLATFORM"} +{"query": "You have used 45% of your available API quota", "type": "PLATFORM"} +{"query": "Your current plan is Basic with 5000 API calls remaining", "type": "PLATFORM"} +{"query": "Your project has exceeded 90% of its allocated resources", "type": "PLATFORM"} +{"query": "How many API calls do I have left this month?", "type": "PLATFORM"} +{"query": "What is my current plan and its limits?", "type": "PLATFORM"} +{"query": "Show me my account usage statistics", "type": "PLATFORM"} +{"query": "How much of my quota have I consumed?", "type": "PLATFORM"} +{"query": "When does my subscription renew?", "type": "PLATFORM"} +{"query": "What are my billing details?", "type": "PLATFORM"} +{"query": "How many projects can I create on my current plan?", "type": "PLATFORM"} +{"query": "Am I close to my API limit?", "type": "PLATFORM"} +{"query": "What happens if I exceed my quota?", "type": "PLATFORM"} +{"query": "Can I upgrade my current subscription?", "type": "PLATFORM"} +{"query": "Your free trial expires in 3 days", "type": "PLATFORM"} +{"query": "Your account is currently suspended due to overuse", "type": "PLATFORM"} +{"query": "You have 150 API calls remaining in your current billing cycle", "type": "PLATFORM"} +{"query": "Your consumption this week is 2.5x higher than last week", "type": "PLATFORM"} +{"query": "What is my current monthly spend on AVAP Cloud?", "type": "PLATFORM"} +{"query": "How do I add more API capacity to my plan?", "type": "PLATFORM"} +{"query": "Your project usage percentage is critically high at 98%", "type": "PLATFORM"} diff --git a/scripts/pipelines/classifier/train_classifier.py b/scripts/pipelines/classifier/train_classifier.py new file mode 100644 index 0000000..7cb1731 --- /dev/null +++ b/scripts/pipelines/classifier/train_classifier.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +""" +Phase 2 — Train Layer 2 embedding classifier (ADR-0008). + +Reads labeled (query, type) pairs from a JSONL file, embeds them with bge-m3 +via Ollama, trains a LogisticRegression classifier, and serializes the model +with joblib. + +Usage: + python train_classifier.py + python train_classifier.py --data path/to/dataset.jsonl --output /data/classifier_model.pkl + +The output file is loaded at engine startup by graph.py (_load_layer2_model). +Any JSONL file produced by classifier_export.py is compatible as additional +training data — merge with the seed dataset before retraining. + +Requirements (add to requirements.txt if not present): + scikit-learn + joblib + numpy +""" + +import argparse +import json +import os +import sys +from collections import Counter +from pathlib import Path + +import joblib +import numpy as np +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import classification_report +from sklearn.model_selection import StratifiedKFold, cross_val_score, cross_val_predict + + +def load_data(path: str) -> tuple[list[str], list[str]]: + queries, labels = [], [] + skipped = 0 + with open(path, encoding="utf-8") as f: + for i, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + except json.JSONDecodeError as e: + print(f" [WARN] line {i} skipped — JSON error: {e}", file=sys.stderr) + skipped += 1 + continue + q = rec.get("query", "").strip() + t = rec.get("type", "").strip() + if not q or not t: + skipped += 1 + continue + queries.append(q) + labels.append(t) + if skipped: + print(f" [WARN] {skipped} records skipped (missing query/type or invalid JSON)") + return queries, labels + + +def embed_queries(queries: list[str], labels: list[str], base_url: str) -> tuple[np.ndarray, list[str]]: + """Embed queries with bge-m3 via Ollama, one at a time. + + bge-m3 occasionally produces NaN vectors for certain inputs (known Ollama bug). + Embedding one by one lets us detect and skip those queries instead of failing + the entire batch. + + Returns (vectors, kept_labels) — labels aligned to the returned vectors. + """ + try: + from langchain_ollama import OllamaEmbeddings + except ImportError: + print("[ERROR] langchain-ollama not installed. Run: pip install langchain-ollama", file=sys.stderr) + sys.exit(1) + + emb = OllamaEmbeddings(model="bge-m3", base_url=base_url) + vectors: list[list[float]] = [] + kept_labels: list[str] = [] + skipped = 0 + + for i, (query, label) in enumerate(zip(queries, labels)): + try: + vec = emb.embed_query(query) + if any(v != v for v in vec): # NaN check (NaN != NaN) + print(f" [WARN] query {i+1} produced NaN vector, skipped: '{query[:60]}'", file=sys.stderr) + skipped += 1 + continue + vectors.append(vec) + kept_labels.append(label) + except Exception as e: + print(f" [WARN] query {i+1} embedding failed ({e}), skipped: '{query[:60]}'", file=sys.stderr) + skipped += 1 + continue + + if (i + 1) % 10 == 0 or (i + 1) == len(queries): + print(f" Embedded {i+1}/{len(queries)}...", end="\r") + + print() + if skipped: + print(f" [WARN] {skipped} queries skipped due to NaN or embedding errors") + + return np.array(vectors), kept_labels + + +def main(): + parser = argparse.ArgumentParser(description="Train Layer 2 classifier for ADR-0008") + parser.add_argument( + "--data", + default=str(Path(__file__).parent / "seed_classifier_dataset.jsonl"), + help="Path to labeled JSONL dataset", + ) + parser.add_argument( + "--output", + default=os.getenv("CLASSIFIER_MODEL_PATH", "/data/classifier_model.pkl"), + help="Output path for serialized model", + ) + parser.add_argument( + "--ollama", + default=os.getenv("OLLAMA_LOCAL_URL", "http://localhost:11434"), + help="Ollama base URL", + ) + parser.add_argument( + "--min-cv-accuracy", + type=float, + default=0.90, + help="Minimum cross-validation accuracy to proceed with saving (default: 0.90)", + ) + args = parser.parse_args() + + # ── Load data ────────────────────────────────────────────────────────────── + print(f"\n[1/4] Loading data from {args.data}") + queries, labels = load_data(args.data) + print(f" {len(queries)} examples loaded") + + dist = Counter(labels) + print(f" Distribution: {dict(dist)}") + + if len(queries) < 20: + print("[ERROR] Too few examples (< 20). Add more to the dataset.", file=sys.stderr) + sys.exit(1) + + min_class_count = min(dist.values()) + n_splits = min(5, min_class_count) + if n_splits < 2: + print(f"[ERROR] At least one class has fewer than 2 examples. Add more data.", file=sys.stderr) + sys.exit(1) + + # ── Embed ────────────────────────────────────────────────────────────────── + print(f"\n[2/4] Embedding with bge-m3 via {args.ollama}") + vectors, labels = embed_queries(queries, labels, args.ollama) + print(f" Embedding matrix: {vectors.shape} ({len(labels)} examples kept)") + + # ── Train + cross-validate ───────────────────────────────────────────────── + print(f"\n[3/4] Training LogisticRegression (C=1.0) with {n_splits}-fold CV") + clf = LogisticRegression(max_iter=1000, C=1.0, random_state=42) + cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42) + + scores = cross_val_score(clf, vectors, labels, cv=cv, scoring="accuracy") + cv_mean = scores.mean() + cv_std = scores.std() + print(f" CV accuracy: {cv_mean:.3f} ± {cv_std:.3f} (folds: {scores.round(3).tolist()})") + + # Per-class report via cross-validated predictions + y_pred = cross_val_predict(clf, vectors, labels, cv=cv) + print("\n Per-class report:") + report = classification_report(labels, y_pred, zero_division=0) + for line in report.splitlines(): + print(f" {line}") + + if cv_mean < args.min_cv_accuracy: + print( + f"\n[FAIL] CV accuracy {cv_mean:.3f} is below threshold {args.min_cv_accuracy:.2f}. " + "Add more examples to the dataset before deploying.", + file=sys.stderr, + ) + sys.exit(1) + + print(f"\n CV accuracy {cv_mean:.3f} ≥ {args.min_cv_accuracy:.2f} — proceeding to save.") + + # ── Fit final model on full dataset ──────────────────────────────────────── + clf.fit(vectors, labels) + + # ── Save ─────────────────────────────────────────────────────────────────── + print(f"\n[4/4] Saving model to {args.output}") + out_path = Path(args.output) + out_path.parent.mkdir(parents=True, exist_ok=True) + joblib.dump( + { + "clf": clf, + "classes": clf.classes_.tolist(), + "n_train": len(queries), + "cv_mean": round(cv_mean, 4), + "cv_std": round(cv_std, 4), + }, + out_path, + ) + print(f" Model saved → {out_path}") + print(f"\nDone. Classes: {clf.classes_.tolist()}") + + +if __name__ == "__main__": + main()