#!/usr/bin/env python3 """ AVAP Dataset Generator v2 — MAP-Elites Quality-Diversity Pipeline ================================================================== View reference """ import argparse import json import math import os import sys import time from collections import defaultdict from itertools import combinations from pathlib import Path import anthropic import requests from construct_prior import ConstructPrior, AVAP_NODE_NAMES AVAP_NODE_TYPES = { "addParam": ["addParam("], "addResult": ["addResult("], "_status": ["_status"], "addVar": ["addVar("], "getListLen": ["getListLen("], "getQueryParamList": ["getQueryParamList("], "itemFromList": ["itemFromList("], "replace": ["replace("], "randomString": ["randomString("], "if_mode1": ["if("], "if_mode2": ["if(None, None,"], "else": ["else()"], "end": ["end()"], "startLoop": ["startLoop("], "endLoop": ["endLoop()"], "try": ["try()"], "exception": ["exception()"], "return": ["return("], "go": ["go("], "gather": ["gather("], "avapConnector": ["avapConnector("], "ormCheckTable": ["ormCheckTable("], "ormDirect": ["ormDirect("], "ormAccessSelect": ["ormAccessSelect("], "ormAccessInsert": ["ormAccessInsert("], "ormAccessUpdate": ["ormAccessUpdate("], "variableFromJSON": ["variableFromJSON("], "AddVariableToJSON": ["AddVariableToJSON("], "encodeSHA256": ["encodeSHA256("], "encodeMD5": ["encodeMD5("], "getTimeStamp": ["getTimeStamp("], "getDateTime": ["getDateTime("], "stampToDatetime": ["stampToDatetime("], "RequestGet": ["RequestGet("], "RequestPost": ["RequestPost("], "function": ["function "], "import": ["import "], "include": ["include("], } NODE_TYPE_NAMES = AVAP_NODE_NAMES _PRIOR_EPSILON = 0.05 class CellValidator: def __init__(self, parser_url: str, parser_timeout: int = 5): self.parser_url = parser_url.rstrip("/") self.parser_timeout = parser_timeout self._parser_available = True def parse(self, code: str) -> tuple[bool, dict, str]: if not self._parser_available: return None, {}, "parser_unavailable" try: resp = requests.post( f"{self.parser_url}/parse", json={"code": code}, timeout=self.parser_timeout, ) data = resp.json() if data.get("valid", False): return True, data.get("ast", {}), "" return False, {}, data.get("error", "parse error") except requests.exceptions.ConnectionError: self._parser_available = False return None, {}, "parser_unavailable" except Exception as e: return False, {}, str(e) def detect_constructs(self, code: str, ast: dict) -> set: if ast: return self._from_ast(ast) return self._from_source(code) def _from_ast(self, ast: dict) -> set: found = set() if isinstance(ast, dict): if "type" in ast: found.add(ast["type"]) for v in ast.values(): found |= self._from_ast(v) elif isinstance(ast, list): for item in ast: found |= self._from_ast(item) return found def _from_source(self, code: str) -> set: found = set() if "if(None, None," in code: found.add("if_mode2") elif "if(" in code: found.add("if_mode1") for name, patterns in AVAP_NODE_TYPES.items(): if name in ("if_mode1", "if_mode2"): continue # already handled for pat in patterns: if pat in code: found.add(name) break return found def cell_quality( self, code: str, ast: dict, test_list: list, cell: frozenset, alpha: float = 0.3, beta: float = 0.2, gamma: float = 0.1, ) -> tuple[float, dict]: detected = self.detect_constructs(code, ast) all_types = set(NODE_TYPE_NAMES) cell_constructs = set(cell) present_required = cell_constructs & detected fidelity = len(present_required) / max(len(cell_constructs), 1) extra = detected - cell_constructs bonus_ratio = len(extra) / max(len(all_types) - len(cell_constructs), 1) tq = sum( 1 for t in test_list if isinstance(t, str) and "re.match(" in t and len(t.strip()) > 10 ) / max(len(test_list), 1) lines = [l.strip() for l in code.split("\n") if l.strip()] richness = min(len(lines) / 30.0, 1.0) # cap at 30 lines = 1.0 quality = fidelity + alpha * bonus_ratio + beta * tq + gamma * richness return quality, { "fidelity": round(fidelity, 3), "bonus_ratio": round(bonus_ratio, 3), "test_quality": round(tq, 3), "richness": round(richness, 3), "quality": round(quality, 3), "detected": sorted(detected), "cell": sorted(cell), "extra": sorted(extra), } class CoverageMap: def __init__(self, cell_size: int = 3): self.cell_size = cell_size self._map: dict[frozenset, tuple[dict, float, dict]] = {} self._attempts: dict[frozenset, int] = defaultdict(int) self._all_cells = self._build_cells() def _build_cells(self) -> list[frozenset]: cells = [] for size in range(2, self.cell_size + 1): for combo in combinations(NODE_TYPE_NAMES, size): cells.append(frozenset(combo)) return cells @property def total_cells(self) -> int: return len(self._all_cells) @property def filled_cells(self) -> int: return len(self._map) @property def fill_rate(self) -> float: return self.filled_cells / max(self.total_cells, 1) def update( self, cell: frozenset, example: dict, quality: float, components: dict, ) -> bool: self._attempts[cell] += 1 current = self._map.get(cell) if current is None or quality > current[1]: self._map[cell] = (example, quality, components) return True return False def get_empty_cells(self) -> list[frozenset]: return [c for c in self._all_cells if c not in self._map] def get_low_quality_cells(self, threshold: float = 0.7) -> list[frozenset]: return [ c for c, (_, q, _) in self._map.items() if q < threshold ] def get_example(self, cell: frozenset) -> dict | None: entry = self._map.get(cell) return entry[0] if entry else None def all_examples(self) -> list[dict]: return [ex for ex, _, _ in self._map.values()] def node_type_frequency(self) -> dict[str, int]: freq = defaultdict(int) for cell in self._map: for nt in cell: freq[nt] += 1 return dict(freq) def distribution_entropy(self) -> float: freq = self.node_type_frequency() total = sum(freq.values()) if total == 0: return 0.0 entropy = 0.0 for count in freq.values(): p = count / total if p > 0: entropy -= p * math.log2(p) return round(entropy, 3) def fill_summary(self) -> str: empty = len(self.get_empty_cells()) low = len(self.get_low_quality_cells()) entropy = self.distribution_entropy() return ( f"Cells: {self.filled_cells}/{self.total_cells} filled " f"({100*self.fill_rate:.1f}%) | " f"Low quality: {low} | " f"Empty: {empty} | " f"Entropy: {entropy:.2f} bits" ) class CellSelector: def __init__( self, coverage_map: CoverageMap, quality_threshold: float = 0.80, ucb_c: float = 1.0, ): self.map = coverage_map self.quality_threshold = quality_threshold self.ucb_c = ucb_c self._total_calls = 0 import random self._rng = random.Random(42) def select(self) -> frozenset: self._total_calls += 1 empty = self.map.get_empty_cells() if empty: return self._rng.choice(empty) low = self.map.get_low_quality_cells(self.quality_threshold) if low: return self._rng.choice(low) return self._ucb_select() def _ucb_select(self) -> frozenset: best_cell = None best_score = -float("inf") total = max(self._total_calls, 1) for cell in self.map._all_cells: attempts = max(self.map._attempts.get(cell, 0), 1) entry = self.map._map.get(cell) quality = entry[1] if entry else 0.0 score = quality + self.ucb_c * math.sqrt(math.log(total) / attempts) if score > best_score: best_score = score best_cell = cell return best_cell class CellSelectorPrior(CellSelector): def __init__( self, coverage_map: CoverageMap, prior: ConstructPrior, quality_threshold: float = 0.80, ucb_c: float = 1.0, phase3_threshold: float = 0.70, ): super().__init__(coverage_map, quality_threshold, ucb_c) self.prior = prior self.phase3_threshold = phase3_threshold self._tail_cells: set[frozenset] = set() self._phase3_active = False def select(self) -> frozenset: self._total_calls += 1 empty = self.map.get_empty_cells() if empty: high_prior_empty = [ c for c in empty if self.prior.cell_weight(c) > self.prior.epsilon * 1.5 ] if high_prior_empty: return self._weighted_sample(high_prior_empty) return self._weighted_sample(empty) low = self.map.get_low_quality_cells(self.quality_threshold) if low: return self._ucb_prior_select(low) return self._ucb_prior_select(self.map._all_cells) def _weighted_sample(self, cells: list[frozenset]) -> frozenset: weights = [self.prior.cell_weight(c) for c in cells] total = sum(weights) if total == 0: return self._rng.choice(cells) r = self._rng.random() * total cumsum = 0.0 for cell, w in zip(cells, weights): cumsum += w if r <= cumsum: return cell return cells[-1] def _ucb_prior_select(self, cells) -> frozenset: best_cell = None best_score = -float("inf") total = max(self._total_calls, 1) for cell in cells: attempts = max(self.map._attempts.get(cell, 0), 1) entry = self.map._map.get(cell) quality = entry[1] if entry else 0.0 prior_w = self.prior.cell_weight(cell) ucb_term = self.ucb_c * math.sqrt(math.log(total) / attempts) score = prior_w * (quality + ucb_term) if score > best_score: best_score = score best_cell = cell return best_cell SYSTEM_PROMPT = """Eres un experto en el lenguaje AVAP. Se te proporciona el Language Reference Manual (LRM) completo de AVAP. Tu tarea es generar UN problema de benchmark estilo MBPP para evaluar modelos de lenguaje en su capacidad de generar código AVAP correcto. REGLAS ESTRICTAS para el código AVAP generado: 1. Una instrucción por línea. EOL es el terminador absoluto. 2. Sin indentación significativa (es solo decorativa). 3. Bloques: if()...else()...end(), startLoop()...endLoop(), try()...exception()...end() 4. Funciones: function name(args) { ... return(val) } 5. if() Modo 1: if(var_o_literal, var_o_literal, "operador") 6. if() Modo 2: if(None, None, `expresion_completa_como_string`) 7. _status se asigna con: addVar(_status, 404) 8. ormAccessSelect firma: ormAccessSelect(campos, "tabla", selector, varTarget) 9. ormCheckTable firma: ormCheckTable(nombre_tabla, varTarget) 10. ormDirect firma: ormDirect("SELECT ... %s" % var, varTarget) 11. getQueryParamList firma: getQueryParamList(param_name, varTarget) 12. NUNCA uses registerEndpoint(), NUNCA uses mainHandler(). 13. El código se ejecuta DIRECTAMENTE, línea a línea. FORMATO DE SALIDA: responde ÚNICAMENTE con UN objeto JSON válido (no array). Sin texto adicional, sin bloques de código markdown. { "task_id": 1, "text": "", "code": "", "test_inputs": { "": }, "test_list": ["re.match(r'', )", ...] } test_list: USA ÚNICAMENTE re.match(). NUNCA comparaciones directas (==, !=). """ def build_cell_prompt( lrm: str, cell: frozenset, existing_example: dict | None, map_summary: str, ) -> str: constructs_list = ", ".join(f"`{c}`" for c in sorted(cell)) improvement_note = "" if existing_example: improvement_note = f""" El siguiente ejemplo YA existe para esta combinación con calidad mejorable. Genera algo DISTINTO y MÁS COMPLEJO que lo supere: ``` {existing_example.get('code', '')} ``` """ return f"""# LRM AVAP — Language Reference Manual {lrm} --- # ESTADO DEL MAPA DE COBERTURA {map_summary} --- # TAREA — ESPECIFICACIÓN OBLIGATORIA Genera UN ejemplo AVAP que use OBLIGATORIAMENTE TODOS estos constructs: **{constructs_list}** El ejemplo DEBE contener todos los constructs listados arriba. Si tu código no los usa todos, la tarea fracasa. Adicionalmente: - Combina los constructs requeridos en un escenario realista de microservicio HTTP - Añade constructs adicionales donde sea natural (aumenta la puntuación) - Código complejo y rico — no ejemplos triviales de 3 líneas - 2-3 aserciones re.match() en test_list {improvement_note} Responde ÚNICAMENTE con el objeto JSON. Sin texto antes ni después. """ def call_api( client: anthropic.Anthropic, lrm: str, cell: frozenset, task_id: int, existing_example: dict | None, map_summary: str, retries: int = 3, ) -> dict | None: for attempt in range(1, retries + 1): try: message = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4000, system=SYSTEM_PROMPT, messages=[{ "role": "user", "content": build_cell_prompt(lrm, cell, existing_example, map_summary), }], ) raw = message.content[0].text.strip() if raw.startswith("```"): lines = raw.splitlines() raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) problem = json.loads(raw) if not isinstance(problem, dict): raise ValueError("Response is not a JSON object") for field in ("text", "code", "test_list"): if field not in problem: raise ValueError(f"Missing field '{field}'") if "test_inputs" not in problem: problem["test_inputs"] = {} problem["task_id"] = task_id return problem except (json.JSONDecodeError, ValueError) as e: print(f"\n Attempt {attempt}/{retries} — parse error: {e}") if attempt < retries: time.sleep(2 ** attempt) except anthropic.RateLimitError: wait = 30 * attempt print(f"\n Rate limit — waiting {wait}s...") time.sleep(wait) except anthropic.APIError as e: print(f"\n API error at attempt {attempt}: {e}") if attempt < retries: time.sleep(5) return None def run_map_elites(args, client, lrm, output_path): validator = CellValidator(parser_url=args.parser) cmap = CoverageMap(cell_size=args.cell_size) selector = CellSelector(cmap, quality_threshold=args.quality_threshold) dataset = [] task_id = 1 call_count = 0 valid_count = 0 cell_updates = 0 print(f"\n MAP-Elites mode | cells: {cmap.total_cells} | target: {args.problems} examples") print(f" Cell size: {args.cell_size} | Quality threshold: {args.quality_threshold}") print("─" * 65) max_calls = args.problems * 4 while len(dataset) < args.problems and call_count < max_calls: cell = selector.select() existing = cmap.get_example(cell) call_count += 1 print( f" [{call_count:04d}] Cell {sorted(cell)} " f"| filled={cmap.filled_cells}/{cmap.total_cells} " f"| dataset={len(dataset)} ... ", end="", flush=True, ) problem = call_api( client, lrm, cell, task_id, existing_example=existing, map_summary=cmap.fill_summary(), ) if problem is None: print("SKIP (generation failed)") continue code = problem["code"] test_list = problem.get("test_list", []) is_valid, ast, error_msg = validator.parse(code) if is_valid is None: is_valid, ast = True, {} if call_count == 1: print(f"\n Parser unavailable — using keyword fallback", flush=True) if is_valid is False: print(f"INVALID ({error_msg[:40]})") problem["_validation"] = {"valid": False, "error": error_msg} continue valid_count += 1 # Compute cell quality quality, components = validator.cell_quality( code, ast, test_list, cell, alpha=args.alpha, beta=args.beta, gamma=args.gamma, ) problem["_cell"] = sorted(cell) problem["_quality"] = components if components["fidelity"] < 1.0: missing = set(cell) - set(components["detected"]) print(f"MISSING constructs: {sorted(missing)}") continue updated = cmap.update(cell, problem, quality, components) if updated: cell_updates += 1 dataset.append(problem) task_id += 1 print( f"OK quality={quality:.3f} " f"fidelity={components['fidelity']:.2f} " f"extra={len(components['extra'])}" ) if len(dataset) % 50 == 0: _save(dataset, output_path, cmap) freq = cmap.node_type_frequency() entropy = cmap.distribution_entropy() print(f"\n ── Checkpoint ──────────────────────────────────") print(f" Dataset: {len(dataset)} | Valid: {valid_count}/{call_count}") print(f" {cmap.fill_summary()}") print(f" Top-5 most frequent: {sorted(freq, key=freq.get, reverse=True)[:5]}") print(f" Top-5 least frequent: {sorted(freq, key=freq.get)[:5]}") print(f" ────────────────────────────────────────────────\n") time.sleep(0.5) _save(dataset, output_path, cmap) return dataset, cmap, valid_count, call_count def run_map_elites_prior(args, client, lrm, output_path): print("\n Loading ConstructPrior...", flush=True) prior_map = getattr(args, "prior_map","construct_map.yaml") epsilon = getattr(args, "prior_epsilon", _PRIOR_EPSILON) yaml_path = Path(prior_map) if yaml_path.exists(): prior = ConstructPrior.from_yaml(yaml_path, epsilon=epsilon) else: # Fallback: yaml not found — use static prior and warn print(f" [WARN] construct_map.yaml not found at '{yaml_path}'.") print(f" [WARN] Using static fallback prior. Generate the real prior with:") print(f" [WARN] python construct_prior.py --generate-map --github-token TOKEN") prior = ConstructPrior.from_static_fallback(epsilon=epsilon) print(f" {prior.coverage_summary()}") validator = CellValidator(parser_url=args.parser) cmap = CoverageMap(cell_size=args.cell_size) selector = CellSelectorPrior( cmap, prior, quality_threshold=args.quality_threshold, phase3_threshold=getattr(args, "prior_phase3_threshold", 0.70), ) dataset = [] task_id = 1 call_count = 0 valid_count = 0 cell_updates = 0 print(f"\n MAP-Elites+Prior mode | cells: {cmap.total_cells} | target: {args.problems} examples") print(f" Cell size: {args.cell_size} | Quality threshold: {args.quality_threshold}") print("─" * 65) max_calls = args.problems * 4 while len(dataset) < args.problems and call_count < max_calls: cell = selector.select() existing = cmap.get_example(cell) prior_w = prior.cell_weight(cell) call_count += 1 print( f" [{call_count:04d}] Cell {sorted(cell)} " f"| prior={prior_w:.3f} " f"| filled={cmap.filled_cells}/{cmap.total_cells} " f"| dataset={len(dataset)} ... ", end="", flush=True, ) problem = call_api( client, lrm, cell, task_id, existing_example=existing, map_summary=cmap.fill_summary(), ) if problem is None: print("SKIP (generation failed)") continue code = problem["code"] test_list = problem.get("test_list", []) is_valid, ast, error_msg = validator.parse(code) if is_valid is None: is_valid, ast = True, {} if call_count == 1: print(f"\n Parser unavailable — using keyword fallback", flush=True) if is_valid is False: print(f"INVALID ({error_msg[:40]})") problem["_validation"] = {"valid": False, "error": error_msg} continue valid_count += 1 quality, components = validator.cell_quality( code, ast, test_list, cell, alpha=args.alpha, beta=args.beta, gamma=args.gamma, ) problem["_cell"] = sorted(cell) problem["_prior_weight"] = round(prior_w, 4) problem["_quality"] = components if components["fidelity"] < 1.0: missing = set(cell) - set(components["detected"]) print(f"MISSING constructs: {sorted(missing)}") continue updated = cmap.update(cell, problem, quality, components) if updated: cell_updates += 1 dataset.append(problem) task_id += 1 print( f"OK quality={quality:.3f} " f"fidelity={components['fidelity']:.2f} " f"prior={prior_w:.3f} " f"extra={len(components['extra'])}" ) if len(dataset) % 50 == 0: _save(dataset, output_path, cmap, prior=prior) freq = cmap.node_type_frequency() entropy = cmap.distribution_entropy() kl = prior.kl_divergence(freq) print(f"\n ── Checkpoint ──────────────────────────────────") print(f" Dataset: {len(dataset)} | Valid: {valid_count}/{call_count}") print(f" {cmap.fill_summary()}") print(f" KL(dataset ‖ prior): {kl:.4f} (lower = closer to production patterns)") print(f" Top-5 most frequent: {sorted(freq, key=freq.get, reverse=True)[:5]}") print(f" Top-5 least frequent: {sorted(freq, key=freq.get)[:5]}") print(f" ────────────────────────────────────────────────\n") time.sleep(0.5) _save(dataset, output_path, cmap, prior=prior) return dataset, cmap, valid_count, call_count, prior def _save(dataset: list, path: Path, cmap: CoverageMap, prior: ConstructPrior = None): with open(path, "w", encoding="utf-8") as f: json.dump(dataset, f, ensure_ascii=False, indent=2) # Save coverage map statistics alongside dataset stats_path = path.with_name(path.stem + "_coverage_stats.json") freq = cmap.node_type_frequency() stats = { "total_cells": cmap.total_cells, "filled_cells": cmap.filled_cells, "fill_rate": round(cmap.fill_rate, 4), "distribution_entropy": cmap.distribution_entropy(), "node_type_frequency": freq, "low_quality_cells": len(cmap.get_low_quality_cells()), "empty_cells": len(cmap.get_empty_cells()), } if prior is not None: stats["kl_divergence_dataset_vs_prior"] = prior.kl_divergence(freq) stats["prior_summary"] = prior.coverage_summary() with open(stats_path, "w", encoding="utf-8") as f: json.dump(stats, f, ensure_ascii=False, indent=2) def main(): parser = argparse.ArgumentParser( description="AVAP Dataset Generator v2 — MAP-Elites Quality-Diversity Pipeline" ) parser.add_argument("--lrm", default="avap.md") parser.add_argument("--output", default="output/mbpp_avap_v2.json") parser.add_argument("--problems", type=int, default=5000) parser.add_argument("--parser", default="http://localhost:8080", help="AVAP parser URL") parser.add_argument("--cell-size", type=int, default=3, help="Max constructs per cell: 2=pairs, 3=pairs+trios (default: 3)") parser.add_argument("--quality-threshold", type=float, default=0.80, help="Min quality to consider a cell 'good' (default: 0.80)") parser.add_argument("--alpha", type=float, default=0.30, help="Weight for bonus constructs in cell quality (default: 0.30)") parser.add_argument("--beta", type=float, default=0.20, help="Weight for test quality in cell quality (default: 0.20)") parser.add_argument("--gamma", type=float, default=0.10, help="Weight for code richness in cell quality (default: 0.10)") parser.add_argument( "--mode", choices=["map-elites-prior", "map-elites", "reward"], default="map-elites-prior", help=( "map-elites-prior: Candidate F — MAP-Elites + ConstructPrior (default)\n" "map-elites: Candidate E — MAP-Elites, uniform cell weighting\n" "reward: Candidate A — CW-Reward pool (comparison baseline)" ), ) parser.add_argument( "--prior-map", default="construct_map.yaml", metavar="FILE", help=( "Path to construct_map.yaml generated by construct_prior.py.\n" "Generate it first: python construct_prior.py --generate-map\n" "Default: construct_map.yaml (in current directory)" ), ) parser.add_argument( "--prior-epsilon", type=float, default=_PRIOR_EPSILON, help=f"Minimum prior weight for tail cells (default: {_PRIOR_EPSILON})", ) parser.add_argument( "--prior-phase3-threshold", type=float, default=0.70, help=( "Quality threshold above which Phase 2 ends and tail (low-prior) " "cells become the focus. Default: 0.70" ), ) parser.add_argument("--api-key", default=None) args = parser.parse_args() api_key = args.api_key or os.environ.get("ANTHROPIC_API_KEY") if not api_key: sys.exit("ERROR: ANTHROPIC_API_KEY not set.") lrm_path = Path(args.lrm) if not lrm_path.exists(): sys.exit(f"ERROR: LRM '{lrm_path}' not found.") lrm = lrm_path.read_text(encoding="utf-8") output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) client = anthropic.Anthropic(api_key=api_key) mode_label = { "map-elites-prior": "Candidate F — MAP-Elites + ConstructPrior", "map-elites": "Candidate E — MAP-Elites (uniform)", "reward": "Candidate A — CW-Reward pool", }[args.mode] print("=" * 65) print(" AVAP Dataset Generator v2 — MAP-Elites Pipeline") print("=" * 65) print(f" Mode : {mode_label}") print(f" LRM : {lrm_path}") print(f" Output : {output_path}") print(f" Target examples: {args.problems}") print(f" Parser URL : {args.parser}") print(f" Cell size : {args.cell_size}") print(f" Quality thresh : {args.quality_threshold}") if args.mode == "map-elites-prior": yaml_exists = Path(args.prior_map).exists() print(f" Prior map : {args.prior_map} ({'✓ found' if yaml_exists else '✗ not found — will use static fallback'})") print(f" Prior epsilon : {args.prior_epsilon}") print("=" * 65) prior = None if args.mode == "map-elites-prior": result = run_map_elites_prior(args, client, lrm, output_path) dataset, cmap, valid_count, call_count, prior = result elif args.mode == "map-elites": dataset, cmap, valid_count, call_count = run_map_elites(args, client, lrm, output_path) else: sys.exit("ERROR: --mode reward (Candidate A) is not yet implemented in v2. " "Use generate_mbap.py for the v1 reward baseline.") # Final report freq = cmap.node_type_frequency() entropy = cmap.distribution_entropy() print("\n" + "=" * 65) print(" Pipeline complete") print(f" Mode : {mode_label}") print(f" Total API calls : {call_count}") print(f" Valid examples : {valid_count} ({100*valid_count/max(call_count,1):.1f}%)") print(f" Dataset size : {len(dataset)}") print(f" {cmap.fill_summary()}") print(f" Distribution entropy : {entropy:.3f} bits (max={math.log2(len(NODE_TYPE_NAMES)):.2f})") if prior is not None: kl = prior.kl_divergence(freq) print(f" KL(dataset ‖ prior) : {kl:.4f} (0 = perfect alignment with production code)") print(f" Most covered : {sorted(freq, key=freq.get, reverse=True)[:5]}") print(f" Least covered : {sorted(freq, key=freq.get)[:5]}") print(f" Output : {output_path}") print("=" * 65) if __name__ == "__main__": main()