diff --git a/docs/ADR/ADR-0006-reward-algorithm-dataset-synthesis.md b/docs/ADR/ADR-0006-reward-algorithm-dataset-synthesis.md new file mode 100644 index 0000000..fafe5c0 --- /dev/null +++ b/docs/ADR/ADR-0006-reward-algorithm-dataset-synthesis.md @@ -0,0 +1,363 @@ +# ADR-0006: Reward Algorithm for Self-Improving Dataset Synthesis + +**Date:** 2026-03-25 +**Status:** Under Evaluation — Primary comparison: Candidate A vs Candidate E vs Candidate F +**Deciders:** Rafael Ruiz (CTO), MrHouston Engineering (AI Team) +**Research lead:** Ivar Zapata + +--- + +## Context + +The AVAP dataset synthesis pipeline (Track A) generates AVAP code examples using a large language model, filtered by a three-stage quality pipeline: parser validation (Stage 1), Execution Coverage Score (Stage 2), and semantic novelty (Stage 3). The current pipeline has two structural limitations that the reward mechanism must address. + +### Limitation 1 — Static generation + +Each batch is generated from the same static prompt (LRM + category description). The generator has no memory of what it has already produced and no model of what "good" looks like for the constructs it hasn't explored yet. + +### Limitation 2 — Distribution bias (the fundamental problem) + +The generator (Claude claude-sonnet) has its own internal distribution over what AVAP code "looks like", derived from its training on mainstream languages. It naturally gravitates toward the simplest patterns — linear code, basic conditionals, single-construct examples — because those are closest to what it knows. Any reward mechanism based on selecting the best from what the model spontaneously produces and feeding those back as few-shots **amplifies this bias**: the pool fills with what the model does easily, and the model never explores what it does poorly. + +This is not model collapse in the classical sense (weights are not updated), but it is **cumulative distribution bias** — the effective generation distribution narrows toward the model's comfort zone with each iteration. + +### The correct framing + +The solution is not to reward what the model produces spontaneously. It is to **specify externally what must be produced** and evaluate quality relative to that specification. Coverage of the DSL's grammar space must be guaranteed by construction, not hoped for through probabilistic exploration. + +--- + +## Decision + +**Conduct a primary comparative evaluation of Candidate A (CW-Reward, reward-driven pool), Candidate E (MAP-Elites, externally-specified coverage cells), and Candidate F (MAP-Elites with ConstructPrior transfer from real production code)** before selecting the production algorithm. Candidates B, C, D are secondary alternatives evaluated only if none of A, E, or F meets quality thresholds. + +The fundamental research question has two layers: +1. **Does forced external specification of construct combinations produce a less biased, higher-quality dataset than reward-driven spontaneous exploration?** (A vs E) +2. **Does seeding cell selection with real production code co-occurrence distributions further improve coverage quality and downstream RAG performance over blind MAP-Elites?** (E vs F) + +--- + +## Candidate Analysis + +### Candidate A — CW-Reward (Composite Weighted Reward) + +**Algorithm class:** In-context reward — no parameter updates. + +**Mechanism:** A composite reward is computed for each parser-valid example: + +``` +reward(e) = w_ecs · ECS(e) + w_novelty · Jaccard_novelty(e, Pool) + w_tests · test_quality(e) +``` + +High-reward examples enter a GoldPool (top-K). The pool is injected as few-shot context in subsequent generation calls. Coverage summary steers the prompt toward underrepresented constructs. + +**Known bias risk:** The pool amplifies the model's natural generation distribution. Examples that are easy for the model (simple patterns, single constructs) tend to enter the pool first and persist. The Jaccard novelty metric penalises structural similarity but cannot detect semantic simplicity — two examples with different node type sets can both be trivially shallow. + +**Appropriate when:** The base LLM has strong prior knowledge of the target language (mainstream languages). For AVAP, where the model has zero prior knowledge, the bias risk is materially higher. + +--- + +### Candidate E — MAP-Elites with Externally-Defined Coverage Cells (Proposed Primary) + +**Algorithm class:** Quality-Diversity algorithm — no parameter updates, coverage guaranteed by construction. + +**Core insight:** Instead of rewarding the best examples from spontaneous generation, define the coverage space externally from the grammar and direct the generator to fill specific cells. The model's distribution bias is neutralised because it is never asked to "explore freely" — it is always given a precise specification. + +**Coverage space definition:** + +The behavior space is defined over **pairs and trios of AVAP node types** drawn from the full grammar vocabulary. Each cell represents a construct combination that must be represented in the dataset: + +``` +Cell key = frozenset of 2 or 3 AVAP node types +Cell value = (best_example_so_far, quality_score) + +Example cells: + {"startLoop", "ormAccessSelect"} → best example using both + {"try", "go", "RequestPost"} → best example using all three + {"function", "if_mode2", "encodeSHA256"} → best example using all three +``` + +**Space size:** +- Pairs: C(38, 2) = 703 cells +- Trios: C(38, 3) = 8,436 cells +- Total: 9,139 cells + +With 5,000 examples targeted, average coverage is ~0.55 examples per cell — statistical coverage of pairwise and triadic construct combinations is achievable with focused cell selection strategy. Full coverage of high-prior cells is expected within budget; tail cells are addressed in Phase 3. + +**Generation protocol:** + +``` +1. SELECT target cell: + - Empty cells first (exploration phase) + - Then lowest-quality cells (exploitation phase) + - Interleave: every 10 calls, select a cell adjacent to a + recently improved cell (local neighborhood search) + +2. SPECIFY in the prompt: + "Generate an AVAP example that MUST use ALL of these constructs: + {cell_constructs}. Use additional constructs where natural." + +3. VALIDATE: + a. Parser: syntactically valid? (Stage 1) + b. Construct presence: all cell constructs in AST? (cell gate) + c. If both pass → compute cell quality score + +4. UPDATE cell: + If quality > current cell quality → replace cell entry +``` + +**Cell quality score:** + +``` +cell_quality(e, cell) = + construct_fidelity(e, cell) # fraction of cell constructs actually present + + α · bonus_constructs(e, cell) # extra constructs beyond cell specification + + β · test_quality(e) # quality of test assertions + + γ · code_length_norm(e) # normalised code length (longer = richer) +``` + +`construct_fidelity` is the primary gate: an example that does not contain all cell constructs scores 0 regardless of other criteria. + +**Why this eliminates distribution bias:** + +The model is never asked what it "wants" to generate. It receives a precise specification: "you must use these three constructs." If it produces something that satisfies the specification, it enters the map. If not, it is discarded and the cell remains available for the next attempt. The coverage trajectory is determined by the cell selection strategy, not by the model's natural distribution. + +The only residual bias is the model's ability to satisfy arbitrary construct specifications — some cells may be harder to fill than others. This is empirically measurable (fill rate per cell) and is itself a research finding about the generator's capabilities. + +**Appropriate when:** The target language is novel or partially unknown to the generator. The external specification mechanism compensates for the model's lack of prior knowledge. + +--- + +### Candidate F — MAP-Elites with ConstructPrior Transfer (Proposed Disruptive Extension) + +**Algorithm class:** Quality-Diversity algorithm with informed cell selection — no parameter updates, coverage guaranteed by construction. + +**Core insight:** Candidate E specifies *which* constructs must appear but treats all cells as equally valuable. Real production code does not use constructs uniformly: some combinations (e.g., `ormAccessSelect` + `try`) appear in virtually every real API endpoint; others (e.g., `encodeSHA256` + `startLoop`) appear rarely. A golden dataset that mirrors production code distributions will retrieve more relevant examples for real developer queries. The ConstructPrior module transfers this knowledge from large public codebases to weight MAP-Elites cell selection. + +**ConstructPrior design:** + +``` +ConstructPrior = weighted combination of 4 domain sources: + + Source 1 — The Stack (BigCode, 50% weight) + Filter: paths matching /api/, /routes/, /handlers/, /endpoints/ + Languages: Python, Go, JavaScript/TypeScript, Java + Process: extract function-level code blocks → map language constructs + to AVAP semantic equivalents → compute co-occurrence frequency + per (construct_a, construct_b) and (construct_a, construct_b, construct_c) + Rationale: real microservice API code; largest and most representative source + + Source 2 — CodeSearchNet (30% weight) + Filter: semantic search for "api endpoint", "http handler", "database query" + Languages: Python, Go, Java, JavaScript + Process: same mapping pipeline as Source 1 + Rationale: function-docstring pairs provide semantic context for mapping quality + + Source 3 — HumanEval-X Go (10% weight) + Filter: problems using goroutines, channels, wait groups + Process: map Go concurrency primitives → AVAP {go, gather, startLoop} + Rationale: AVAP's concurrency model mirrors Go's; coverage of concurrent patterns + + Source 4 — Spider SQL Dataset (10% weight) + Filter: multi-table joins, aggregations, nested queries + Process: map SQL operations → AVAP {ormAccessSelect, ormAccessInsert, ormAccessUpdate} + Rationale: AVAP ORM constructs semantically equivalent to SQL clauses +``` + +**Construct mapping table (AVAP ← source constructs):** + +| AVAP construct | Python equivalent | Go equivalent | SQL equivalent | +|---|---|---|---| +| `ormAccessSelect` | `cursor.fetchall()`, `session.query()` | `db.Query()`, `rows.Scan()` | `SELECT` | +| `ormAccessInsert` | `session.add()`, `cursor.execute(INSERT)` | `db.Exec(INSERT)` | `INSERT INTO` | +| `ormAccessUpdate` | `session.merge()`, `cursor.execute(UPDATE)` | `db.Exec(UPDATE)` | `UPDATE` | +| `RequestGet` | `requests.get()`, `httpx.get()` | `http.Get()`, `client.Get()` | — | +| `RequestPost` | `requests.post()`, `httpx.post()` | `http.Post()`, `client.Post()` | — | +| `startLoop` | `for item in list:` | `for _, v := range` | `CURSOR LOOP` | +| `go` + `gather` | `asyncio.gather()`, `ThreadPoolExecutor` | `go func()`, `sync.WaitGroup` | — | +| `try` + `exception` | `try: except:` | `if err != nil` | — | +| `encodeSHA256` | `hashlib.sha256()` | `sha256.New()` | — | +| `function` | `def func():` | `func name()` | `CREATE FUNCTION` | + +**Cell weighting formula:** + +``` +cell_prior_weight(cell) = + Σ_{s ∈ Sources} weight_s · freq_s(cell_constructs) + + where freq_s(cell) = co-occurrence frequency of the construct set in source s, + normalized to [0, 1] within each source. + + Cells with prior_weight = 0 (no source coverage) receive a minimum weight ε = 0.05 + to ensure all cells remain reachable. +``` + +**Modified cell selection with ConstructPrior:** + +``` +PHASE 1 (exploration): + Select empty cells, weighted by cell_prior_weight. + High-prior cells filled first — these are patterns real developers use. + +PHASE 2 (exploitation): + Select lowest-quality filled cells, UCB-weighted, + also weighted by cell_prior_weight. + High-prior, low-quality cells deprioritized for richer improvement. + +PHASE 3 (tail coverage): + Cells with prior_weight = ε are visited last, after all + production-relevant cells reach quality > 0.7. + Ensures complete mathematical coverage without wasting + early generation budget on rare combinations. +``` + +**Why this is disruptive:** + +1. **First formal connection between DSL dataset synthesis and production code distributions.** Prior dataset synthesis work (MBPP, HumanEval, APPS) uses human-authored problems or scrapes competitive programming sites. For novel DSLs with no prior human authors, this approach provides the first principled method to bootstrap coverage from semantically equivalent languages. + +2. **Eliminates the uniform sampling assumption.** Standard Quality-Diversity algorithms treat all niches as equally valuable. The ConstructPrior breaks this assumption: cells that correspond to real production patterns are assigned higher value, producing a dataset whose distribution mirrors real developer usage rather than mathematical combinatorial completeness. + +3. **Zero human annotation required.** The prior is derived automatically from public datasets under permissive licenses (The Stack: Apache 2.0; CodeSearchNet: MIT; HumanEval-X: MIT; Spider: CC BY-SA 4.0). + +4. **Residual bias is semantic, not structural.** Candidate E's residual bias is the model's ability to satisfy arbitrary construct specifications (some cells may be hard to fill). Candidate F's residual bias is the construct mapping quality (how faithfully Python/Go/SQL constructs map to AVAP equivalents). The latter is measurable, improvable, and fully transparent. + +**Expected improvement over Candidate E:** + +- RAGAS Composite: +0.03–0.08 (hypothesis: production-weighted cells retrieve more relevant examples for real queries) +- Distribution entropy: similar or slightly lower than E (intentionally non-uniform — mirrors production distribution) +- Downstream task success: +5–15% on held-out real developer queries (hypothesis: high-prior cells produce examples that match actual query patterns) + +**Appropriate when:** Target DSL has identifiable semantic equivalents in mainstream languages, and a production-weighted dataset is preferred over a mathematically uniform one. + +--- + +### Out of Scope — Fine-tuning Approaches (GRPO, DPO) + +Gradient-based approaches (GRPO, DPO) address a **different problem**: fine-tuning the inference model after the dataset is built. This ADR concerns dataset synthesis algorithm design. Fine-tuning the inference model is a separate architectural decision, tracked separately, and is not evaluated here. + +Per-iteration fine-tuning of the generator (training the generator on its own outputs between batches) is explicitly rejected as a design choice. Iteratively training a model on its own outputs produces cumulative distribution narrowing. The generator (Claude API) and any future inference model must be trained on separate, independently validated datasets. + +--- + +### Candidate D — UCB Bandit over Coverage Regions + +**Algorithm class:** Multi-armed bandit. + +Coverage regions are arms. UCB selects which region to target via exploration-exploitation tradeoff. Theoretically well-understood convergence guarantees but does not provide construct-level specification — it targets regions, not specific combinations. Less precise than Candidate E. + +**Superseded by Candidate E** for the same computational cost with stronger guarantees. + +--- + +## Comparative Summary + +| Property | A: CW-Reward | E: MAP-Elites | F: MAP-Elites+Prior | +|---|---|---|---| +| Distribution bias risk | **High** | **None** | **None** | +| Coverage guarantee | Probabilistic | **By construction** | **By construction** | +| Production code alignment | None | None | **Yes (weighted)** | +| LLM parameter updates | No | No | No | +| GPU requirement | None | None | None | +| Works with API-only LLM | Yes | Yes | Yes | +| Interpretability | High | **Very high** | **Very high** | +| Implementation complexity | Low | Medium | **Medium-High** | +| Convergence guarantee | No | **Yes (fill rate)** | **Yes (fill rate)** | +| Residual bias | Model distribution | Cell fill difficulty | Mapping quality | +| External data required | No | No | Yes (public, free) | +| Novel contribution | Low | Medium | **High** | + +--- + +## Evaluation Protocol + +### Phase 1 — Candidate A vs Candidate E vs Candidate F + +Run all three candidates for 500 generated examples each, same LRM, same parser, same Stage 1 filter. Fixed random seed for reproducibility. + +**Primary metrics:** + +| Metric | Definition | Expected winner | +|---|---|---| +| Cell fill rate | Fraction of 9,139 cells with ≥1 example (E/F only) | E≈F by construction | +| Coverage breadth | Distinct node types covered / total | E≈F | +| Distribution uniformity | Entropy of node type frequency distribution | E (flatter = better) | +| Production alignment | KL divergence between dataset and ConstructPrior distribution | **F** (by design) | +| Mean cell quality | Average quality score across filled cells | TBD empirically | +| Parser pass rate trend | Pass rate across iterations | A (if few-shots help) | +| Downstream RAGAS | RAGAS Composite on 50 held-out AVAP queries | **Primary decision signal** | + +**Distribution uniformity** is the key metric for bias detection (A vs E). Plot node type frequency as a histogram. Candidate A will show a long-tail distribution. Candidate E should show a near-uniform distribution. Candidate F will show a production-weighted distribution (intentionally non-uniform — this is a feature, not a bug). + +**Production alignment** is the key metric for F vs E. A dataset with low KL divergence from ConstructPrior produces examples that match real developer usage patterns. If RAGAS(F) > RAGAS(E), this validates the transfer prior hypothesis. + +**Selection criterion:** +- A vs E: Candidate E wins if entropy > 3.0 bits AND RAGAS(E) ≥ RAGAS(A). +- E vs F: Candidate F wins if RAGAS(F) > RAGAS(E) by margin ≥ 0.02. +- If F wins both comparisons, F is the production algorithm. +- Fallback: if RAGAS margin F vs E < 0.02, use E (simpler, no external data dependency). + +--- + +## Weight and Hyperparameter Grids + +### Candidate A weight grid + +| Config | w_ecs | w_novelty | w_tests | Hypothesis | +|---|---|---|---|---| +| A1 | 0.50 | 0.35 | 0.15 | Balanced (baseline) | +| A2 | 0.70 | 0.20 | 0.10 | Coverage-heavy | +| A3 | 0.30 | 0.60 | 0.10 | Novelty-heavy | +| A4 | 0.85 | 0.00 | 0.15 | No novelty (ablation) | + +A4 is the critical ablation: does novelty weighting reduce distribution bias, or is ECS alone sufficient? + +### Candidate E hyperparameter grid + +| Config | Cell size | Selection strategy | α (bonus constructs) | +|---|---|---|---| +| E1 | Pairs only | Empty-first | 0.2 | +| E2 | Pairs + Trios | Empty-first | 0.2 | +| E3 | Pairs + Trios | UCB-weighted | 0.2 | +| E4 | Pairs + Trios | Empty-first | 0.5 | + +E2 is the baseline. E3 tests whether UCB cell selection improves quality over simple empty-first ordering. E4 tests whether a higher bonus for extra constructs produces richer examples. + +### Candidate F hyperparameter grid + +| Config | Prior sources | Phase 3 threshold | ε (tail minimum) | Mapping strictness | +|---|---|---|---|---| +| F1 | All 4 sources (50/30/10/10) | q > 0.7 | 0.05 | Lenient (keyword match) | +| F2 | All 4 sources (50/30/10/10) | q > 0.7 | 0.05 | Strict (AST-level match) | +| F3 | Stack only (100%) | q > 0.7 | 0.05 | Lenient | +| F4 | All 4 sources (50/30/10/10) | q > 0.5 | 0.10 | Lenient | + +F1 is the baseline. F2 tests whether strict construct mapping (requiring AST-level evidence vs keyword presence) improves prior quality. F3 is the ablation: does the multi-source mixture add value over The Stack alone? F4 tests earlier phase transition and higher minimum tail weight. + +--- + +## Open Questions for the Scientific Team + +1. **Cell selection with difficulty weighting:** Some cells may be intrinsically hard to fill (e.g., combining `go` + `avapConnector` + `ormAccessSelect` in a single coherent example). Should the cell selection strategy account for historical fill difficulty, or treat all cells equally? + +2. **Cross-cell quality:** An example generated for cell {A, B} may also be a high-quality example for cell {A, C} if it happens to use C as well. Should examples be indexed against all cells they satisfy, or only the cell they were generated for? + +3. **Minimum example length per cell:** Short examples (3–5 lines) can technically satisfy a cell specification with minimal semantic content. Should a minimum code complexity threshold (e.g., minimum AST depth, minimum number of statements) be required for cell admission? + +4. **Cell retirement:** Once a cell reaches quality score > 0.90, should it be retired from the selection pool to focus generation effort on harder cells? + +5. **Generalisation to KCL:** The KCL grammar has different node types. Does the MAP-Elites cell space need to be redefined per language, or can a universal cell structure be derived from shared construct categories (type_definition, validation, control_flow, io)? + +6. **ConstructPrior mapping quality:** The construct mapping (e.g., Python `session.query()` → AVAP `ormAccessSelect`) is heuristic. Should mapping quality be validated against a small manually annotated equivalence set before running the full generation pipeline? If the mapping is noisy, the prior weights may be misleading — a high-frequency Python pattern that maps incorrectly to a rare AVAP pattern would over-weight a non-representative cell. + +7. **Prior refresh cadence:** The Stack and CodeSearchNet are static snapshots. If AVAP adoption grows and native AVAP code becomes available, should the ConstructPrior be retrained on AVAP-native data, effectively transitioning from transfer learning to self-supervised learning? Define the minimum corpus size threshold at which native data supersedes the cross-language prior. + +--- + +## Consequences + +- `generate_mbap_v2.py` is rewritten to implement Candidate F (MAP-Elites + ConstructPrior) as the primary algorithm. Candidate E (MAP-Elites without prior) is available via `--mode map-elites`. Candidate A (CW-Reward) is available via `--mode reward`. All three modes use identical parser, stage filters, and cell definitions to ensure fair comparison. +- A `ConstructPrior` module (`construct_prior.py`) handles multi-source data download, construct extraction, language-to-AVAP mapping, and co-occurrence matrix construction. This module is isolated from the core MAP-Elites loop and can be updated independently. +- The construct mapping table (language construct → AVAP equivalent) is maintained as a versioned configuration file (`construct_map.yaml`) and must not be modified after generation begins for a given dataset version. +- Results must be documented in `research/reward/` before this ADR is closed. Required artefacts: entropy histograms for A/E/F, KL divergence plots, RAGAS Composite comparison table, cell fill rate heatmaps. +- Any change to cell definitions, quality metrics, or the construct mapping table requires full dataset regeneration. +- Per-iteration fine-tuning of the generator is rejected and will not be re-evaluated without new evidence addressing the distribution narrowing risk. diff --git a/scripts/pipelines/samples_generator/construct_prior.py b/scripts/pipelines/samples_generator/construct_prior.py new file mode 100644 index 0000000..32d81ec --- /dev/null +++ b/scripts/pipelines/samples_generator/construct_prior.py @@ -0,0 +1,1048 @@ +#!/usr/bin/env python3 + + +import argparse +import ast as pyast +import base64 +import json +import math +import os +import sys +import time +from collections import defaultdict +from datetime import datetime, timezone +from itertools import combinations +from pathlib import Path + +try: + import yaml +except ImportError: + print("ERROR: pyyaml not installed. Run: pip install pyyaml") + sys.exit(1) + +try: + import requests +except ImportError: + print("ERROR: requests not installed. Run: pip install requests") + sys.exit(1) + + +AVAP_NODE_NAMES: list[str] = [ + "addParam", "addResult", "addVar", "_status", + "getListLen", "getQueryParamList", "itemFromList", + "replace", "randomString", + "if_mode1", "if_mode2", "else", "end", + "startLoop", "endLoop", + "try", "exception", + "return", + "go", "gather", + "avapConnector", + "ormCheckTable", "ormDirect", + "ormAccessSelect", "ormAccessInsert", "ormAccessUpdate", + "variableFromJSON", "AddVariableToJSON", + "encodeSHA256", "encodeMD5", + "getTimeStamp", "getDateTime", "stampToDatetime", + "RequestGet", "RequestPost", + "function", + "import", "include", +] + +LANGUAGE_MAPPINGS: dict[str, dict] = { + "ormAccessSelect": { + "description": "ORM read/query operation — SELECT semantics", + "python_ast_calls": [ + ".fetchall", ".fetchone", ".fetchmany", + ".query", ".filter", ".filter_by", ".all", ".first", + ".execute", ".select", + ], + "go_keywords": ["db.Query(", "rows.Scan("], + "sql_keywords": ["SELECT ", "JOIN ", "WHERE "], + }, + "ormAccessInsert": { + "description": "ORM write operation — INSERT semantics", + "python_ast_calls": [".add", ".insert", ".bulk_insert_mappings", ".create"], + "go_keywords": ['db.Exec("INSERT'], + "sql_keywords": ["INSERT INTO"], + }, + "ormAccessUpdate": { + "description": "ORM update/delete operation — UPDATE/DELETE semantics", + "python_ast_calls": [".update", ".merge", ".save", ".commit"], + "go_keywords": ['db.Exec("UPDATE'], + "sql_keywords": ["UPDATE ", "DELETE FROM"], + }, + "ormCheckTable": { + "description": "Check if a table exists before operating on it", + "python_ast_calls": [ + "inspect.has_table", "engine.dialect.has_table", + "inspector.get_table_names", + ], + "go_keywords": ["db.QueryRow(\"SELECT EXISTS"], + "sql_keywords": ["SHOW TABLES", "information_schema.tables"], + }, + "ormDirect": { + "description": "Raw/direct SQL execution — bypasses ORM abstraction", + "python_ast_calls": [ + "cursor.execute", "connection.execute", + "db.execute", "session.execute", + ], + "go_keywords": ["db.Exec(", "db.QueryRow("], + "sql_keywords": ["EXECUTE ", "CALL "], + }, + "RequestGet": { + "description": "HTTP GET request", + "python_ast_calls": [ + "requests.get", "httpx.get", "session.get", "client.get", + "aiohttp.ClientSession.get", + ], + "go_keywords": ["http.Get(", "client.Get("], + "sql_keywords": [], + }, + "RequestPost": { + "description": "HTTP POST request", + "python_ast_calls": [ + "requests.post", "httpx.post", "session.post", "client.post", + "aiohttp.ClientSession.post", + ], + "go_keywords": ["http.Post(", "client.Post("], + "sql_keywords": [], + }, + "try": { + "description": "Exception handling block — try", + "python_ast_node": "ast.Try", + "go_keywords": ["if err != nil"], + "sql_keywords": ["BEGIN TRY"], + }, + "exception": { + "description": "Exception handler — except/catch clause", + "python_ast_node": "ast.ExceptHandler", + "go_keywords": ["if err != nil"], + "sql_keywords": ["BEGIN CATCH"], + }, + "startLoop": { + "description": "Loop / iteration construct", + "python_ast_node": "ast.For / ast.AsyncFor", + "go_keywords": ["for _, v := range", "for i, "], + "sql_keywords": ["CURSOR LOOP"], + }, + "endLoop": { + "description": "End of loop block (AVAP explicit close)", + "python_ast_node": "end of ast.For scope", + "go_keywords": [], + "sql_keywords": [], + }, + "function": { + "description": "Function definition", + "python_ast_node": "ast.FunctionDef / ast.AsyncFunctionDef", + "go_keywords": ["func "], + "sql_keywords": ["CREATE FUNCTION", "CREATE PROCEDURE"], + }, + "return": { + "description": "Return statement", + "python_ast_node": "ast.Return", + "go_keywords": ["return "], + "sql_keywords": ["RETURN "], + }, + "if_mode1": { + "description": "Conditional — if(var, comparison, operator) form", + "python_ast_node": "ast.If", + "go_keywords": ["if "], + "sql_keywords": ["IF ", "CASE WHEN"], + }, + "if_mode2": { + "description": "Conditional — if(None, None, expression) form", + "python_ast_node": "ast.If (complex condition)", + "go_keywords": [], + "sql_keywords": [], + }, + "else": { + "description": "Else branch of a conditional", + "python_ast_node": "ast.If.orelse", + "go_keywords": ["} else {"], + "sql_keywords": ["ELSE"], + }, + "end": { + "description": "Block terminator (AVAP explicit close)", + "python_ast_node": "end of ast.If scope", + "go_keywords": [], + "sql_keywords": ["END IF", "END"], + }, + "go": { + "description": "Async/concurrent task launch", + "python_ast_calls": [ + "asyncio.create_task", "asyncio.ensure_future", + "ThreadPoolExecutor", "executor.submit", + ], + "go_keywords": ["go func(", "go "], + "sql_keywords": [], + }, + "gather": { + "description": "Wait for concurrent tasks to complete", + "python_ast_calls": [ + "asyncio.gather", "asyncio.wait", + "executor.map", "wg.Wait", + ], + "go_keywords": ["sync.WaitGroup", "wg.Wait()"], + "sql_keywords": [], + }, + "avapConnector": { + "description": "AVAP connector — external service integration point", + "python_ast_calls": [], + "go_keywords": [], + "sql_keywords": [], + "note": "No direct mainstream language equivalent. Rare in co-occurrence.", + }, + "encodeSHA256": { + "description": "SHA-256 hashing", + "python_ast_calls": ["hashlib.sha256", "sha256", ".hexdigest"], + "go_keywords": ["sha256.New()", "sha256.Sum256("], + "sql_keywords": ["SHA2(", "HASHBYTES('SHA2_256'"], + }, + "encodeMD5": { + "description": "MD5 hashing", + "python_ast_calls": ["hashlib.md5", "md5", ".hexdigest"], + "go_keywords": ["md5.New()", "md5.Sum("], + "sql_keywords": ["MD5(", "HASHBYTES('MD5'"], + }, + "variableFromJSON": { + "description": "Parse JSON string into variable", + "python_ast_calls": ["json.loads", "json.load", "orjson.loads", "ujson.loads"], + "go_keywords": ["json.Unmarshal("], + "sql_keywords": ["JSON_VALUE(", "JSON_EXTRACT("], + }, + "AddVariableToJSON": { + "description": "Serialize variable to JSON string", + "python_ast_calls": ["json.dumps", "json.dump", "orjson.dumps", "ujson.dumps"], + "go_keywords": ["json.Marshal("], + "sql_keywords": ["JSON_OBJECT(", "FOR JSON"], + }, + "getDateTime": { + "description": "Get current date and time", + "python_ast_calls": [ + "datetime.now", "datetime.utcnow", "datetime.today", "date.today", + ], + "go_keywords": ["time.Now()"], + "sql_keywords": ["NOW()", "GETDATE()", "CURRENT_TIMESTAMP"], + }, + "getTimeStamp": { + "description": "Get current Unix timestamp", + "python_ast_calls": ["time.time", "time.monotonic"], + "go_keywords": ["time.Now().Unix()"], + "sql_keywords": ["UNIX_TIMESTAMP()", "EXTRACT(EPOCH"], + }, + "stampToDatetime": { + "description": "Convert Unix timestamp to datetime", + "python_ast_calls": ["datetime.fromtimestamp", "datetime.utcfromtimestamp"], + "go_keywords": ["time.Unix("], + "sql_keywords": ["FROM_UNIXTIME(", "DATEADD"], + }, + "randomString": { + "description": "Generate random string or token", + "python_ast_calls": [ + "secrets.token_hex", "secrets.token_urlsafe", + "uuid.uuid4", "uuid.uuid1", + "random.choices", "random.randbytes", + ], + "go_keywords": ["rand.Read(", "uuid.New("], + "sql_keywords": ["NEWID()", "UUID()"], + }, + "replace": { + "description": "String replacement operation", + "python_ast_calls": [".replace", "re.sub", "str.replace"], + "go_keywords": ["strings.Replace(", "strings.ReplaceAll("], + "sql_keywords": ["REPLACE("], + }, + "addParam": { + "description": "Declare/receive input parameter", + "python_ast_node": "function argument / request.args.get", + "go_keywords": ["r.URL.Query().Get("], + "sql_keywords": [], + }, + "addResult": { + "description": "Declare output/result variable", + "python_ast_node": "variable assignment for return value", + "go_keywords": [], + "sql_keywords": [], + }, + "addVar": { + "description": "Declare intermediate variable", + "python_ast_node": "ast.Assign", + "go_keywords": [":=", "var "], + "sql_keywords": ["DECLARE "], + }, + "_status": { + "description": "HTTP status code variable", + "python_ast_calls": ["response.status_code", ".status"], + "go_keywords": ["resp.StatusCode", "w.WriteHeader("], + "sql_keywords": [], + }, + "getListLen": { + "description": "Get length of a list", + "python_ast_calls": ["len("], + "go_keywords": ["len("], + "sql_keywords": ["COUNT("], + }, + "getQueryParamList": { + "description": "Get multiple values for a query parameter (list form)", + "python_ast_calls": ["request.args.getlist", "request.GET.getlist"], + "go_keywords": ["r.URL.Query()"], + "sql_keywords": [], + }, + "itemFromList": { + "description": "Get item by index from a list", + "python_ast_calls": [], + "python_ast_node": "ast.Subscript on list", + "go_keywords": [], + "sql_keywords": [], + }, + "import": { + "description": "Module/package import", + "python_ast_node": "ast.Import / ast.ImportFrom", + "go_keywords": ["import (", 'import "'], + "sql_keywords": [], + }, + "include": { + "description": "Include another AVAP file/module", + "python_ast_calls": ["importlib.import_module"], + "go_keywords": [], + "sql_keywords": [], + }, +} + +# ───────────────────────────────────────────────────────────────────────────── +# GITHUB SEARCH QUERIES +# Each query targets a specific capability domain. +# Queries use GitHub Code Search syntax: +# language:python — only Python files +# path:api — files in directories named "api" (common microservice layout) +# The coverage across queries ensures we sample diverse API patterns. +# ───────────────────────────────────────────────────────────────────────────── + +_GITHUB_QUERIES: list[str] = [ + # ORM + error handling (most common microservice pattern) + "language:python path:api session.query try except", + "language:python path:routes db.query fetchall", + "language:python path:handlers cursor.execute SELECT", + # HTTP clients in API endpoints + "language:python path:api requests.get requests.post", + "language:python path:api httpx.get httpx.post", + # Authentication / crypto + "language:python path:api hashlib.sha256", + "language:python path:api hashlib.md5 token", + # Async / concurrency patterns + "language:python path:api asyncio.gather asyncio.create_task", + "language:python path:api ThreadPoolExecutor executor.submit", + # JSON handling + "language:python path:api json.loads json.dumps", + # DateTime + "language:python path:api datetime.now time.time", + # UUID / token generation + "language:python path:api uuid.uuid4 secrets.token", + # ORM insert/update patterns + "language:python path:api session.add session.commit", + "language:python path:routes db.execute INSERT UPDATE", + # Flask / FastAPI endpoint patterns (ensure we get realistic API structure) + "language:python path:api @app.route @router", + "language:python path:api @app.get @app.post fastapi", +] + + +# ───────────────────────────────────────────────────────────────────────────── +# PYTHON AST DETECTOR +# Parses Python files with the standard ast module and maps AST nodes to +# AVAP semantic equivalents. This is AST-level, not keyword scanning — +# no false positives from variable names, strings, or comments. +# ───────────────────────────────────────────────────────────────────────────── + +class PythonASTDetector: + """Detects AVAP semantic equivalents in a Python file using the ast module.""" + + # Suffix patterns for ORM call detection + _ORM_SELECT_SUFFIXES = frozenset([ + ".fetchall", ".fetchone", ".fetchmany", + ".query", ".filter", ".filter_by", ".all", ".first", + ".execute", ".select", + ]) + _ORM_INSERT_SUFFIXES = frozenset([".add", ".insert", ".bulk_insert_mappings", ".create"]) + _ORM_UPDATE_SUFFIXES = frozenset([".update", ".merge", ".save", ".commit"]) + + def detect(self, code: str) -> set[str]: + """ + Parse Python source and return set of AVAP command names detected. + Falls back to keyword scanning if file has syntax errors. + """ + try: + tree = pyast.parse(code) + except SyntaxError: + return self._keyword_fallback(code) + + detected: set[str] = set() + + for node in pyast.walk(tree): + + # ── Structural nodes ────────────────────────────────────────── + if isinstance(node, (pyast.FunctionDef, pyast.AsyncFunctionDef)): + detected.add("function") + # Count function parameters as addParam proxies + if node.args.args: + detected.add("addParam") + + elif isinstance(node, pyast.Return) and node.value is not None: + detected.add("return") + + elif isinstance(node, (pyast.For, pyast.AsyncFor)): + detected.add("startLoop") + + elif isinstance(node, pyast.If): + detected.add("if_mode1") + if node.orelse: + detected.add("else") + + elif isinstance(node, (pyast.Import, pyast.ImportFrom)): + detected.add("import") + + elif isinstance(node, pyast.Try): + detected.add("try") + if node.handlers: + detected.add("exception") + + elif isinstance(node, pyast.Assign): + detected.add("addVar") + + elif isinstance(node, pyast.Subscript): + # list[index] → itemFromList proxy + if isinstance(node.ctx, pyast.Load): + detected.add("itemFromList") + + # ── Call nodes ──────────────────────────────────────────────── + elif isinstance(node, pyast.Call): + self._analyse_call(node, detected) + + return detected + + def _analyse_call(self, node: pyast.Call, detected: set[str]): + """Analyse a Call AST node and add matching AVAP constructs.""" + try: + callee = pyast.unparse(node.func) + except Exception: + return + + # ORM + if any(callee.endswith(s) for s in self._ORM_SELECT_SUFFIXES): + detected.add("ormAccessSelect") + if any(callee.endswith(s) for s in self._ORM_INSERT_SUFFIXES): + detected.add("ormAccessInsert") + if any(callee.endswith(s) for s in self._ORM_UPDATE_SUFFIXES): + detected.add("ormAccessUpdate") + + # ORM — raw SQL (cursor.execute / db.execute) + if any(callee.endswith(p) for p in ("cursor.execute", "connection.execute", + "db.execute", "session.execute")): + detected.add("ormDirect") + + # ORM — table inspection + if any(p in callee for p in ("has_table", "get_table_names", "inspector")): + detected.add("ormCheckTable") + + # HTTP GET + if callee in ("requests.get", "httpx.get") or ( + callee.endswith(".get") and self._has_url_arg(node) + ): + detected.add("RequestGet") + + # HTTP POST + if callee in ("requests.post", "httpx.post") or ( + callee.endswith(".post") and self._has_url_arg(node) + ): + detected.add("RequestPost") + + # Async concurrency + if callee in ("asyncio.gather", "asyncio.wait"): + detected.add("go") + detected.add("gather") + if callee in ("asyncio.create_task", "asyncio.ensure_future"): + detected.add("go") + if any(callee.endswith(p) for p in ("executor.submit", "executor.map", + "ThreadPoolExecutor")): + detected.add("go") + + # Crypto + if any(p in callee for p in ("sha256", "sha_256", "SHA256")): + detected.add("encodeSHA256") + if any(p in callee for p in ("md5", "MD5")) and "hmac" not in callee: + detected.add("encodeMD5") + + # JSON + if callee in ("json.loads", "json.load", "orjson.loads", "ujson.loads"): + detected.add("variableFromJSON") + if callee in ("json.dumps", "json.dump", "orjson.dumps", "ujson.dumps"): + detected.add("AddVariableToJSON") + + # DateTime + if any(p in callee for p in ("datetime.now", "datetime.utcnow", + "datetime.today", "date.today")): + detected.add("getDateTime") + if callee in ("time.time", "time.monotonic", "time.time_ns"): + detected.add("getTimeStamp") + if any(p in callee for p in ("fromtimestamp", "utcfromtimestamp")): + detected.add("stampToDatetime") + + # Random / UUID + if any(p in callee for p in ("secrets.token", "uuid.uuid", "random.choice", + "random.randbytes")): + detected.add("randomString") + + # String replace + if callee.endswith(".replace"): + detected.add("replace") + + # len() → getListLen + if callee == "len": + detected.add("getListLen") + + # Query param list (Flask/FastAPI specific) + if any(p in callee for p in ("getlist", "get_list")): + detected.add("getQueryParamList") + + # Status code + if any(p in callee for p in ("status_code", "WriteHeader", "status")): + detected.add("_status") + + def _has_url_arg(self, node: pyast.Call) -> bool: + """Heuristic: first argument looks like a URL or URL variable.""" + if not node.args: + return False + first = node.args[0] + if isinstance(first, pyast.Constant) and isinstance(first.value, str): + v = first.value + return v.startswith(("http", "/", "https")) or "{" in v + if isinstance(first, pyast.Name): + return first.id.lower() in ("url", "endpoint", "uri", "base_url") + if isinstance(first, pyast.Attribute): + try: + return "url" in pyast.unparse(first).lower() + except Exception: + return False + return False + + def _keyword_fallback(self, code: str) -> set[str]: + """Keyword-based fallback for files that fail Python parsing.""" + detected: set[str] = set() + checks = { + "def ": "function", "return ": "return", + "for ": "startLoop", "if ": "if_mode1", + "try:": "try", "except ": "exception", + "import ": "import", + "requests.get(": "RequestGet", "requests.post(": "RequestPost", + "httpx.get(": "RequestGet", "httpx.post(": "RequestPost", + ".fetchall(": "ormAccessSelect", ".query(": "ormAccessSelect", + ".add(": "ormAccessInsert", ".execute(": "ormDirect", + "json.loads(": "variableFromJSON", "json.dumps(": "AddVariableToJSON", + "hashlib.sha256": "encodeSHA256", "hashlib.md5": "encodeMD5", + "asyncio.gather": "gather", "asyncio.create_task": "go", + "datetime.now": "getDateTime", "time.time()": "getTimeStamp", + "uuid.uuid4": "randomString", + } + for pattern, avap in checks.items(): + if pattern in code: + detected.add(avap) + return detected + + +# ───────────────────────────────────────────────────────────────────────────── +# GITHUB CODEBASE FETCHER +# Queries GitHub Code Search API and downloads Python files. +# Logs every file fetched so the user can verify real codebases are queried. +# ───────────────────────────────────────────────────────────────────────────── + +class GitHubFetcher: + + SEARCH_URL = "https://api.github.com/search/code" + + def __init__(self, token: str = None, max_files: int = 100, verbose: bool = True): + self.token = token or os.environ.get("GITHUB_TOKEN") + self.max_files = max_files + self.verbose = verbose + self._files_fetched: list[dict] = [] # [{repo, path, url, avap_constructs}] + + @property + def headers(self) -> dict: + h = {"Accept": "application/vnd.github.v3+json"} + if self.token: + h["Authorization"] = f"Bearer {self.token}" + return h + + def fetch_all(self) -> list[dict]: + """ + Execute all GitHub search queries, download files, return list of + {repo, path, url, code} dicts. Logs every file to stdout. + """ + print(f"\n{'─'*60}") + print(f" GitHub Codebase Extraction") + print(f" Token: {'✓ authenticated (30 req/min)' if self.token else '✗ anonymous (10 req/min)'}") + print(f" Max files: {self.max_files}") + print(f" Queries: {len(_GITHUB_QUERIES)}") + print(f"{'─'*60}\n") + + fetched: list[dict] = [] + urls_seen: set[str] = set() + rate_sleep = 2.0 if self.token else 6.5 # seconds between queries + + for q_idx, query in enumerate(_GITHUB_QUERIES): + if len(fetched) >= self.max_files: + break + + print(f" [{q_idx+1:02d}/{len(_GITHUB_QUERIES)}] Query: {query[:70]}") + + try: + resp = requests.get( + self.SEARCH_URL, + params={"q": query, "per_page": 10}, + headers=self.headers, + timeout=15, + ) + except requests.exceptions.RequestException as e: + print(f" ⚠ Network error: {e}") + time.sleep(5) + continue + + if resp.status_code == 403: + reset_ts = int(resp.headers.get("X-RateLimit-Reset", time.time() + 60)) + wait_sec = max(reset_ts - int(time.time()), 10) + print(f" ⚠ Rate limit — waiting {wait_sec}s...") + time.sleep(wait_sec) + # Retry once + try: + resp = requests.get( + self.SEARCH_URL, + params={"q": query, "per_page": 10}, + headers=self.headers, + timeout=15, + ) + except Exception: + continue + + if resp.status_code != 200: + print(f" ⚠ HTTP {resp.status_code}") + time.sleep(2) + continue + + items = resp.json().get("items", []) + print(f" → {len(items)} results from GitHub") + + for item in items: + if len(fetched) >= self.max_files: + break + + raw_url = ( + item.get("html_url", "") + .replace("https://github.com/", "https://raw.githubusercontent.com/") + .replace("/blob/", "/") + ) + if not raw_url or raw_url in urls_seen: + continue + urls_seen.add(raw_url) + + repo = item.get("repository", {}).get("full_name", "?") + path = item.get("path", "?") + + try: + content_resp = requests.get(raw_url, timeout=10) + if content_resp.status_code != 200: + continue + code = content_resp.text + except Exception as e: + print(f" ⚠ Download error ({repo}/{path}): {e}") + continue + + fetched.append({"repo": repo, "path": path, "url": raw_url, "code": code}) + print(f" ✓ {repo} / {path} ({len(code):,} chars)") + + time.sleep(rate_sleep) + + print(f"\n {'─'*40}") + print(f" Total files fetched from GitHub: {len(fetched)}") + print(f" {'─'*40}\n") + self._files_fetched = fetched + return fetched + + @property + def fetch_log(self) -> list[dict]: + """Returns log of all fetched files (without code content).""" + return [ + {"repo": f["repo"], "path": f["path"], "url": f["url"]} + for f in self._files_fetched + ] + + +# ───────────────────────────────────────────────────────────────────────────── +# COOCCURRENCE EXTRACTOR +# Processes fetched files through the AST detector and builds +# pair/trio co-occurrence counts and normalized weights. +# ───────────────────────────────────────────────────────────────────────────── + +class CooccurrenceExtractor: + + def __init__(self): + self.detector = PythonASTDetector() + self._pair_counts: dict[tuple, int] = defaultdict(int) + self._trio_counts: dict[tuple, int] = defaultdict(int) + self._file_results: list[dict] = [] # per-file detection results + + def process_files(self, files: list[dict]) -> None: + """Process a list of {repo, path, code} dicts and accumulate counts.""" + print(f" Processing {len(files)} files through Python AST detector...\n") + + for i, f in enumerate(files): + code = f.get("code", "") + detected = self.detector.detect(code) + + if len(detected) < 2: + print(f" [{i+1:03d}] {f['repo']}/{f['path']} — " + f"{len(detected)} constructs (skipped, need ≥2)") + continue + + sorted_d = sorted(detected) + pairs = list(combinations(sorted_d, 2)) + trios = list(combinations(sorted_d, 3)) + + for p in pairs: + self._pair_counts[p] += 1 + for t in trios: + self._trio_counts[t] += 1 + + self._file_results.append({ + "repo": f["repo"], + "path": f["path"], + "constructs": sorted_d, + "pairs": len(pairs), + "trios": len(trios), + }) + + if self.verbose_log(i): + print(f" [{i+1:03d}] {f['repo']}/{f['path']}") + print(f" Constructs ({len(detected)}): {', '.join(sorted_d)}") + print(f" Pairs: {len(pairs)} Trios: {len(trios)}") + + print(f"\n ─────────────────────────────────────────────────────") + print(f" Files with ≥2 constructs: {len(self._file_results)}") + print(f" Unique pair co-occurrences: {len(self._pair_counts)}") + print(f" Unique trio co-occurrences: {len(self._trio_counts)}") + print(f" ─────────────────────────────────────────────────────\n") + + def verbose_log(self, idx: int) -> bool: + """Log every file for full traceability.""" + return True # Always show — the user needs to verify real files are analyzed + + def normalized_pair_weights(self) -> dict[str, float]: + """Normalize pair counts to [0, 1]. Key = 'a+b' sorted alphabetically.""" + if not self._pair_counts: + return {} + max_c = max(self._pair_counts.values()) + return { + "+".join(k): round(v / max_c, 6) + for k, v in sorted( + self._pair_counts.items(), key=lambda x: -x[1] + ) + } + + def normalized_trio_weights(self) -> dict[str, float]: + """Normalize trio counts to [0, 1]. Key = 'a+b+c' sorted alphabetically.""" + if not self._trio_counts: + return {} + max_c = max(self._trio_counts.values()) + return { + "+".join(k): round(v / max_c, 6) + for k, v in sorted( + self._trio_counts.items(), key=lambda x: -x[1] + ) + } + + def top_pairs(self, n: int = 20) -> list[tuple]: + """Return top-n pairs by count.""" + return sorted(self._pair_counts.items(), key=lambda x: -x[1])[:n] + + def top_trios(self, n: int = 20) -> list[tuple]: + """Return top-n trios by count.""" + return sorted(self._trio_counts.items(), key=lambda x: -x[1])[:n] + + def per_file_results(self) -> list[dict]: + return self._file_results + + + +def generate_construct_map( + extractor: CooccurrenceExtractor, + fetcher: GitHubFetcher, + output_path: Path, +) -> None: + #print(extractor) + pair_weights = extractor.normalized_pair_weights() + trio_weights = extractor.normalized_trio_weights() + + doc = { + "meta": { + "description": ( + "Auto-generated by construct_prior.py. " + "Weights derived from real GitHub codebases via Python AST analysis. " + "DO NOT EDIT MANUALLY — regenerate with: python construct_prior.py --generate-map" + ), + "generated_at": datetime.now(timezone.utc).isoformat(), + "generator_version": "2.0", + "avap_node_count": len(AVAP_NODE_NAMES), + "avap_node_names": AVAP_NODE_NAMES, + "source_stats": { + "github_files_analyzed": len(extractor.per_file_results()), + "github_files_fetched": len(fetcher.fetch_log), + "total_pair_cooccurrences": len(pair_weights), + "total_trio_cooccurrences": len(trio_weights), + }, + }, + "language_mappings": LANGUAGE_MAPPINGS, + "fetch_log": fetcher.fetch_log, + "pair_weights": pair_weights, + "trio_weights": trio_weights, + } + + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + yaml.dump(doc, f, default_flow_style=False, allow_unicode=True, + sort_keys=False, width=120) + + print(f"construct_map.yaml written to: {output_path}") + print(f"Pair weights: {len(pair_weights)}") + print(f"Trio weights: {len(trio_weights)}") + print() + + print("Top-20 construct pairs by co-occurrence frequency:") + for (a, b), count in extractor.top_pairs(20): + w = pair_weights.get(f"{a}+{b}", pair_weights.get(f"{b}+{a}", 0)) + print(f" {w:.4f} {a} + {b} (n={count})") + print() + print("Top-10 construct trios by co-occurrence frequency:") + for trio, count in extractor.top_trios(10): + w = trio_weights.get("+".join(trio), 0) + print(f" {w:.4f} {' + '.join(trio)} (n={count})") + + +_DEFAULT_EPSILON = 0.05 + +class ConstructPrior: + + + def __init__(self, weights: dict[frozenset, float], epsilon: float = _DEFAULT_EPSILON, + source_stats: dict = None): + self._weights = weights + self.epsilon = epsilon + self._source_stats = source_stats or {} + self._propagate_subset_weights() + #print(self._source_stats ) + @classmethod + def from_yaml(cls, path: Path, epsilon: float = _DEFAULT_EPSILON) -> "ConstructPrior": + if not path.exists(): + raise FileNotFoundError( + f"construct_map.yaml not found at {path}.\n" + f"Generate it first: python construct_prior.py --generate-map" + ) + with open(path, encoding="utf-8") as f: + doc = yaml.safe_load(f) + + weights: dict[frozenset, float] = {} + + for key_str, w in doc.get("pair_weights", {}).items(): + parts = key_str.split("+") + if len(parts) == 2: + weights[frozenset(parts)] = float(w) + + for key_str, w in doc.get("trio_weights", {}).items(): + parts = key_str.split("+") + if len(parts) == 3: + weights[frozenset(parts)] = float(w) + + source_stats = doc.get("meta", {}).get("source_stats", {}) + print(f"ConstructPrior loaded from {path.name}") + print(f"Files analyzed: {source_stats.get('github_files_analyzed', '?')}") + print(f"-Pair weights: {len([k for k in weights if len(k)==2])}") + print(f"-Trio weights: {len([k for k in weights if len(k)==3])}") + + return cls(weights=weights, epsilon=epsilon, source_stats=source_stats) + + @classmethod + def from_static_fallback(cls, epsilon: float = _DEFAULT_EPSILON) -> "ConstructPrior": + print("[WARN] Using static fallback prior (no construct_map.yaml).") + print("Run: python construct_prior.py --generate-map") + static: list[tuple[tuple, float]] = [ + (("try", "exception"),1.00), + (("function", "return"),0.98), + (("function", "try", "return"),0.95), + (("ormAccessSelect", "try"),0.90), + (("ormAccessSelect", "try", "exception"),0.88), + (("RequestGet", "try"),0.85), + (("RequestPost", "try"),0.84), + (("if_mode1", "return"),0.82), + (("function", "if_mode1", "return"),0.80), + (("ormAccessSelect", "return"), 0.78), + (("ormAccessInsert", "try"),0.75), + (("ormAccessUpdate", "try"),0.72), + (("RequestGet", "variableFromJSON"),0.70), + (("RequestPost", "variableFromJSON"),0.68), + (("variableFromJSON", "return"),0.65), + (("startLoop", "return"),0.62), + (("startLoop", "ormAccessSelect"),0.60), + (("function", "import"),0.58), + (("if_mode1", "ormAccessSelect"),0.55), + (("ormAccessSelect", "ormAccessInsert"),0.52), + (("go", "gather"),0.48), + (("go", "RequestGet"),0.45), + (("go", "RequestPost"),0.43), + (("go", "gather", "return"),0.42), + (("encodeSHA256", "return"),0.40), + (("encodeSHA256", "if_mode1"),0.38), + (("encodeMD5", "return"),0.36), + (("AddVariableToJSON", "return"),0.35), + (("variableFromJSON", "AddVariableToJSON"),0.33), + (("getDateTime", "ormAccessInsert"),0.30), + (("getTimeStamp", "return"),0.28), + (("startLoop", "if_mode1"),0.27), + (("startLoop", "if_mode1", "return"),0.25), + (("randomString", "return"),0.22), + (("randomString", "encodeSHA256"),0.20), + (("replace", "return"),0.16), + (("avapConnector", "try"),0.14), + (("go", "ormAccessSelect"),0.10), + (("go", "gather", "ormAccessSelect"),0.09), + ] + weights = {frozenset(k): v for k, v in static} + return cls(weights=weights, epsilon=epsilon, + source_stats={"mode": "static_fallback"}) + + def cell_weight(self, cell: frozenset) -> float: + #print(cell) + #www = self._weights.get(cell, self.epsilon) + #print(www) + return max(self._weights.get(cell, self.epsilon), self.epsilon) + + def kl_divergence(self, dataset_freq: dict[str, int]) -> float: + total_d = sum(dataset_freq.values()) + total_p = sum(self._weights.values()) + if total_d == 0 or total_p == 0: + return float("inf") + kl = 0.0 + for nt in AVAP_NODE_NAMES: + p = dataset_freq.get(nt, 0) / total_d + q_raw = sum(w for cell, w in self._weights.items() if nt in cell) + q = max(q_raw / total_p, 1e-9) + if p > 0: + kl += p * math.log2(p / q) + return round(kl, 4) + + def coverage_summary(self) -> str: + n = len(self._weights) + avg = sum(self._weights.values()) / max(n, 1) + stats = " ".join(f"{k}={v}" for k, v in self._source_stats.items()) + return ( + f"ConstructPrior: {n} cells | mean={avg:.3f} | epsilon={self.epsilon}" + + (f" | {stats}" if stats else "") + ) + + def _propagate_subset_weights(self): + pairs = [(cell, w) for cell, w in self._weights.items() if len(cell) == 2] + for pair_cell, pair_w in pairs: + inherited = pair_w * 0.60 + for trio_cell in list(self._weights): + if len(trio_cell) == 3 and pair_cell.issubset(trio_cell): + #print(trio_cell) + if inherited > self._weights.get(trio_cell, 0): + self._weights[trio_cell] = inherited + +def main(): + parser = argparse.ArgumentParser( + description="Generate construct_map.yaml from real GitHub codebases", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Fetch 200 files for a richer prior: + python construct_prior.py --generate-map --max-files 200 --github-token ghp_... + """, + ) + parser.add_argument( + "--generate-map", action="store_true", + help="Fetch real codebases from GitHub and generate construct_map.yaml", + ) + parser.add_argument( + "--verify", action="store_true", + help="Load and print stats for an existing construct_map.yaml", + ) + parser.add_argument( + "--github-token", default=None, + help="GitHub personal access token (or set GITHUB_TOKEN env var)", + ) + parser.add_argument( + "--max-files", type=int, default=100, + help="Maximum number of GitHub files to analyze (default: 100)", + ) + parser.add_argument( + "--output", default="construct_map.yaml", + help="Output path for construct_map.yaml (default: construct_map.yaml)", + ) + parser.add_argument( + "--map", default="construct_map.yaml", + help="Path to existing construct_map.yaml (for --verify)", + ) + args = parser.parse_args() + + if args.generate_map: + print("\n====================================================") + print(" ConstructPrior — Codebase Extraction") + print(" Querying REAL GitHub codebases. No hardcoded data.") + print("=====================================================") + + fetcher= GitHubFetcher( + token=args.github_token, + max_files=args.max_files, + verbose=True, + ) + files = fetcher.fetch_all() + + if not files: + print("\nERROR: No files fetched from GitHub.") + print("Check your internet connection and GitHub token.") + sys.exit(1) + + extractor = CooccurrenceExtractor() + extractor.process_files(files) + + output_path = Path(args.output) + generate_construct_map(extractor, fetcher, output_path) + + print("\n================================================") + print(f" construct_map.yaml generated successfully.") + print(f" Files analyzed from real GitHub codebases: {len(extractor.per_file_results())}") + print(f" Use in generator: --prior-map {output_path}") + print("==================================================\n") + + elif args.verify: + map_path = Path(args.map) + try: + prior = ConstructPrior.from_yaml(map_path) + print(f"\n{prior.coverage_summary()}") + + with open(map_path, encoding="utf-8") as f: + doc = yaml.safe_load(f) + meta = doc.get("meta", {}) + stats = meta.get("source_stats", {}) + print(f"\n Generated: {meta.get('generated_at', '?')}") + print(f" AVAP commands: {meta.get('avap_node_count', '?')}") + print(f" Files analyzed: {stats.get('github_files_analyzed', '?')}") + print(f" Pair weights: {stats.get('total_pair_cooccurrences', '?')}") + print(f" Trio weights: {stats.get('total_trio_cooccurrences', '?')}") + + except FileNotFoundError as e: + print(f" ERROR: {e}") + sys.exit(1) + + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/scripts/pipelines/samples_generator/generate_mbap_v2.py b/scripts/pipelines/samples_generator/generate_mbap_v2.py new file mode 100644 index 0000000..bb77055 --- /dev/null +++ b/scripts/pipelines/samples_generator/generate_mbap_v2.py @@ -0,0 +1,884 @@ +#!/usr/bin/env python3 +""" +AVAP Dataset Generator v2 — MAP-Elites Quality-Diversity Pipeline +================================================================== + +View reference + +""" + +import argparse +import json +import math +import os +import sys +import time +from collections import defaultdict +from itertools import combinations +from pathlib import Path + +import anthropic +import requests + +from construct_prior import ConstructPrior, AVAP_NODE_NAMES + +AVAP_NODE_TYPES = { + "addParam": ["addParam("], + "addResult": ["addResult("], + "_status": ["_status"], + "addVar": ["addVar("], + "getListLen": ["getListLen("], + "getQueryParamList": ["getQueryParamList("], + "itemFromList": ["itemFromList("], + "replace": ["replace("], + "randomString": ["randomString("], + "if_mode1": ["if("], + "if_mode2": ["if(None, None,"], + "else": ["else()"], + "end": ["end()"], + "startLoop": ["startLoop("], + "endLoop": ["endLoop()"], + "try": ["try()"], + "exception": ["exception()"], + "return": ["return("], + "go": ["go("], + "gather": ["gather("], + "avapConnector": ["avapConnector("], + "ormCheckTable": ["ormCheckTable("], + "ormDirect": ["ormDirect("], + "ormAccessSelect": ["ormAccessSelect("], + "ormAccessInsert": ["ormAccessInsert("], + "ormAccessUpdate": ["ormAccessUpdate("], + "variableFromJSON": ["variableFromJSON("], + "AddVariableToJSON": ["AddVariableToJSON("], + "encodeSHA256": ["encodeSHA256("], + "encodeMD5": ["encodeMD5("], + "getTimeStamp": ["getTimeStamp("], + "getDateTime": ["getDateTime("], + "stampToDatetime": ["stampToDatetime("], + "RequestGet": ["RequestGet("], + "RequestPost": ["RequestPost("], + "function": ["function "], + "import": ["import "], + "include": ["include("], +} + +NODE_TYPE_NAMES = AVAP_NODE_NAMES +_PRIOR_EPSILON = 0.05 + +class CellValidator: + + def __init__(self, parser_url: str, parser_timeout: int = 5): + self.parser_url = parser_url.rstrip("/") + self.parser_timeout = parser_timeout + self._parser_available = True + + + def parse(self, code: str) -> tuple[bool, dict, str]: + + if not self._parser_available: + return None, {}, "parser_unavailable" + try: + resp = requests.post( + f"{self.parser_url}/parse", + json={"code": code}, + timeout=self.parser_timeout, + ) + data = resp.json() + if data.get("valid", False): + return True, data.get("ast", {}), "" + return False, {}, data.get("error", "parse error") + except requests.exceptions.ConnectionError: + self._parser_available = False + return None, {}, "parser_unavailable" + except Exception as e: + return False, {}, str(e) + def detect_constructs(self, code: str, ast: dict) -> set: + if ast: + return self._from_ast(ast) + return self._from_source(code) + + def _from_ast(self, ast: dict) -> set: + found = set() + if isinstance(ast, dict): + if "type" in ast: + found.add(ast["type"]) + for v in ast.values(): + found |= self._from_ast(v) + elif isinstance(ast, list): + for item in ast: + found |= self._from_ast(item) + return found + + def _from_source(self, code: str) -> set: + found = set() + if "if(None, None," in code: + found.add("if_mode2") + elif "if(" in code: + found.add("if_mode1") + for name, patterns in AVAP_NODE_TYPES.items(): + if name in ("if_mode1", "if_mode2"): + continue # already handled + for pat in patterns: + if pat in code: + found.add(name) + break + return found + + def cell_quality( + self, + code: str, + ast: dict, + test_list: list, + cell: frozenset, + alpha: float = 0.3, + beta: float = 0.2, + gamma: float = 0.1, + ) -> tuple[float, dict]: + + detected = self.detect_constructs(code, ast) + all_types = set(NODE_TYPE_NAMES) + + cell_constructs = set(cell) + present_required = cell_constructs & detected + fidelity = len(present_required) / max(len(cell_constructs), 1) + + extra = detected - cell_constructs + bonus_ratio = len(extra) / max(len(all_types) - len(cell_constructs), 1) + + tq = sum( + 1 for t in test_list + if isinstance(t, str) and "re.match(" in t and len(t.strip()) > 10 + ) / max(len(test_list), 1) + + lines = [l.strip() for l in code.split("\n") if l.strip()] + richness = min(len(lines) / 30.0, 1.0) # cap at 30 lines = 1.0 + + quality = fidelity + alpha * bonus_ratio + beta * tq + gamma * richness + + return quality, { + "fidelity": round(fidelity, 3), + "bonus_ratio": round(bonus_ratio, 3), + "test_quality": round(tq, 3), + "richness": round(richness, 3), + "quality": round(quality, 3), + "detected": sorted(detected), + "cell": sorted(cell), + "extra": sorted(extra), + } + + +class CoverageMap: + + + def __init__(self, cell_size: int = 3): + + self.cell_size = cell_size + self._map: dict[frozenset, tuple[dict, float, dict]] = {} + self._attempts: dict[frozenset, int] = defaultdict(int) + self._all_cells = self._build_cells() + + def _build_cells(self) -> list[frozenset]: + cells = [] + for size in range(2, self.cell_size + 1): + for combo in combinations(NODE_TYPE_NAMES, size): + cells.append(frozenset(combo)) + return cells + + @property + def total_cells(self) -> int: + return len(self._all_cells) + + @property + def filled_cells(self) -> int: + return len(self._map) + + @property + def fill_rate(self) -> float: + return self.filled_cells / max(self.total_cells, 1) + + def update( + self, + cell: frozenset, + example: dict, + quality: float, + components: dict, + ) -> bool: + self._attempts[cell] += 1 + current = self._map.get(cell) + if current is None or quality > current[1]: + self._map[cell] = (example, quality, components) + return True + return False + + def get_empty_cells(self) -> list[frozenset]: + return [c for c in self._all_cells if c not in self._map] + + def get_low_quality_cells(self, threshold: float = 0.7) -> list[frozenset]: + return [ + c for c, (_, q, _) in self._map.items() + if q < threshold + ] + + def get_example(self, cell: frozenset) -> dict | None: + entry = self._map.get(cell) + return entry[0] if entry else None + + def all_examples(self) -> list[dict]: + return [ex for ex, _, _ in self._map.values()] + + def node_type_frequency(self) -> dict[str, int]: + + freq = defaultdict(int) + for cell in self._map: + for nt in cell: + freq[nt] += 1 + return dict(freq) + + def distribution_entropy(self) -> float: + + freq = self.node_type_frequency() + total = sum(freq.values()) + if total == 0: + return 0.0 + entropy = 0.0 + for count in freq.values(): + p = count / total + if p > 0: + entropy -= p * math.log2(p) + return round(entropy, 3) + + def fill_summary(self) -> str: + empty = len(self.get_empty_cells()) + low = len(self.get_low_quality_cells()) + entropy = self.distribution_entropy() + return ( + f"Cells: {self.filled_cells}/{self.total_cells} filled " + f"({100*self.fill_rate:.1f}%) | " + f"Low quality: {low} | " + f"Empty: {empty} | " + f"Entropy: {entropy:.2f} bits" + ) + +class CellSelector: + + + def __init__( + self, + coverage_map: CoverageMap, + quality_threshold: float = 0.80, + ucb_c: float = 1.0, + ): + self.map = coverage_map + self.quality_threshold = quality_threshold + self.ucb_c = ucb_c + self._total_calls = 0 + import random + self._rng = random.Random(42) + + def select(self) -> frozenset: + self._total_calls += 1 + empty = self.map.get_empty_cells() + if empty: + return self._rng.choice(empty) + + low = self.map.get_low_quality_cells(self.quality_threshold) + if low: + return self._rng.choice(low) + + return self._ucb_select() + + def _ucb_select(self) -> frozenset: + best_cell = None + best_score = -float("inf") + total = max(self._total_calls, 1) + + for cell in self.map._all_cells: + attempts = max(self.map._attempts.get(cell, 0), 1) + entry = self.map._map.get(cell) + quality = entry[1] if entry else 0.0 + score = quality + self.ucb_c * math.sqrt(math.log(total) / attempts) + if score > best_score: + best_score = score + best_cell = cell + + return best_cell + +class CellSelectorPrior(CellSelector): + + def __init__( + self, + coverage_map: CoverageMap, + prior: ConstructPrior, + quality_threshold: float = 0.80, + ucb_c: float = 1.0, + phase3_threshold: float = 0.70, + ): + super().__init__(coverage_map, quality_threshold, ucb_c) + self.prior = prior + self.phase3_threshold = phase3_threshold + self._tail_cells: set[frozenset] = set() + self._phase3_active = False + + def select(self) -> frozenset: + self._total_calls += 1 + empty = self.map.get_empty_cells() + + if empty: + high_prior_empty = [ + c for c in empty + if self.prior.cell_weight(c) > self.prior.epsilon * 1.5 + ] + if high_prior_empty: + return self._weighted_sample(high_prior_empty) + return self._weighted_sample(empty) + + low = self.map.get_low_quality_cells(self.quality_threshold) + if low: + return self._ucb_prior_select(low) + + return self._ucb_prior_select(self.map._all_cells) + + def _weighted_sample(self, cells: list[frozenset]) -> frozenset: + weights = [self.prior.cell_weight(c) for c in cells] + total = sum(weights) + if total == 0: + return self._rng.choice(cells) + r = self._rng.random() * total + cumsum = 0.0 + for cell, w in zip(cells, weights): + cumsum += w + if r <= cumsum: + return cell + return cells[-1] + + def _ucb_prior_select(self, cells) -> frozenset: + + best_cell = None + best_score = -float("inf") + total = max(self._total_calls, 1) + + for cell in cells: + attempts = max(self.map._attempts.get(cell, 0), 1) + entry = self.map._map.get(cell) + quality = entry[1] if entry else 0.0 + prior_w = self.prior.cell_weight(cell) + ucb_term = self.ucb_c * math.sqrt(math.log(total) / attempts) + score = prior_w * (quality + ucb_term) + if score > best_score: + best_score = score + best_cell = cell + + return best_cell + +SYSTEM_PROMPT = """Eres un experto en el lenguaje AVAP. +Se te proporciona el Language Reference Manual (LRM) completo de AVAP. +Tu tarea es generar UN problema de benchmark estilo MBPP para evaluar +modelos de lenguaje en su capacidad de generar código AVAP correcto. + +REGLAS ESTRICTAS para el código AVAP generado: +1. Una instrucción por línea. EOL es el terminador absoluto. +2. Sin indentación significativa (es solo decorativa). +3. Bloques: if()...else()...end(), startLoop()...endLoop(), try()...exception()...end() +4. Funciones: function name(args) { ... return(val) } +5. if() Modo 1: if(var_o_literal, var_o_literal, "operador") +6. if() Modo 2: if(None, None, `expresion_completa_como_string`) +7. _status se asigna con: addVar(_status, 404) +8. ormAccessSelect firma: ormAccessSelect(campos, "tabla", selector, varTarget) +9. ormCheckTable firma: ormCheckTable(nombre_tabla, varTarget) +10. ormDirect firma: ormDirect("SELECT ... %s" % var, varTarget) +11. getQueryParamList firma: getQueryParamList(param_name, varTarget) +12. NUNCA uses registerEndpoint(), NUNCA uses mainHandler(). +13. El código se ejecuta DIRECTAMENTE, línea a línea. + +FORMATO DE SALIDA: responde ÚNICAMENTE con UN objeto JSON válido (no array). +Sin texto adicional, sin bloques de código markdown. +{ + "task_id": 1, + "text": "", + "code": "", + "test_inputs": { "": }, + "test_list": ["re.match(r'', )", ...] +} + +test_list: USA ÚNICAMENTE re.match(). NUNCA comparaciones directas (==, !=). +""" + + +def build_cell_prompt( + lrm: str, + cell: frozenset, + existing_example: dict | None, + map_summary: str, +) -> str: + constructs_list = ", ".join(f"`{c}`" for c in sorted(cell)) + + improvement_note = "" + if existing_example: + improvement_note = f""" +El siguiente ejemplo YA existe para esta combinación con calidad mejorable. +Genera algo DISTINTO y MÁS COMPLEJO que lo supere: + +``` +{existing_example.get('code', '')} +``` +""" + + return f"""# LRM AVAP — Language Reference Manual + +{lrm} + +--- + +# ESTADO DEL MAPA DE COBERTURA + +{map_summary} + +--- + +# TAREA — ESPECIFICACIÓN OBLIGATORIA + +Genera UN ejemplo AVAP que use OBLIGATORIAMENTE TODOS estos constructs: + +**{constructs_list}** + +El ejemplo DEBE contener todos los constructs listados arriba. +Si tu código no los usa todos, la tarea fracasa. + +Adicionalmente: +- Combina los constructs requeridos en un escenario realista de microservicio HTTP +- Añade constructs adicionales donde sea natural (aumenta la puntuación) +- Código complejo y rico — no ejemplos triviales de 3 líneas +- 2-3 aserciones re.match() en test_list +{improvement_note} +Responde ÚNICAMENTE con el objeto JSON. Sin texto antes ni después. +""" + + +def call_api( + client: anthropic.Anthropic, + lrm: str, + cell: frozenset, + task_id: int, + existing_example: dict | None, + map_summary: str, + retries: int = 3, +) -> dict | None: + + for attempt in range(1, retries + 1): + try: + message = client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=4000, + system=SYSTEM_PROMPT, + messages=[{ + "role": "user", + "content": build_cell_prompt(lrm, cell, existing_example, map_summary), + }], + ) + raw = message.content[0].text.strip() + + if raw.startswith("```"): + lines = raw.splitlines() + raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) + + problem = json.loads(raw) + if not isinstance(problem, dict): + raise ValueError("Response is not a JSON object") + for field in ("text", "code", "test_list"): + if field not in problem: + raise ValueError(f"Missing field '{field}'") + if "test_inputs" not in problem: + problem["test_inputs"] = {} + problem["task_id"] = task_id + return problem + + except (json.JSONDecodeError, ValueError) as e: + print(f"\n Attempt {attempt}/{retries} — parse error: {e}") + if attempt < retries: + time.sleep(2 ** attempt) + except anthropic.RateLimitError: + wait = 30 * attempt + print(f"\n Rate limit — waiting {wait}s...") + time.sleep(wait) + except anthropic.APIError as e: + print(f"\n API error at attempt {attempt}: {e}") + if attempt < retries: + time.sleep(5) + + return None + + +def run_map_elites(args, client, lrm, output_path): + + validator = CellValidator(parser_url=args.parser) + cmap = CoverageMap(cell_size=args.cell_size) + selector = CellSelector(cmap, quality_threshold=args.quality_threshold) + dataset = [] + task_id = 1 + call_count = 0 + valid_count = 0 + cell_updates = 0 + + print(f"\n MAP-Elites mode | cells: {cmap.total_cells} | target: {args.problems} examples") + print(f" Cell size: {args.cell_size} | Quality threshold: {args.quality_threshold}") + print("─" * 65) + + max_calls = args.problems * 4 + + while len(dataset) < args.problems and call_count < max_calls: + + cell = selector.select() + existing = cmap.get_example(cell) + call_count += 1 + + print( + f" [{call_count:04d}] Cell {sorted(cell)} " + f"| filled={cmap.filled_cells}/{cmap.total_cells} " + f"| dataset={len(dataset)} ... ", + end="", flush=True, + ) + + problem = call_api( + client, lrm, cell, task_id, + existing_example=existing, + map_summary=cmap.fill_summary(), + ) + + if problem is None: + print("SKIP (generation failed)") + continue + + code = problem["code"] + test_list = problem.get("test_list", []) + + is_valid, ast, error_msg = validator.parse(code) + + if is_valid is None: + is_valid, ast = True, {} + if call_count == 1: + print(f"\n Parser unavailable — using keyword fallback", flush=True) + + if is_valid is False: + print(f"INVALID ({error_msg[:40]})") + problem["_validation"] = {"valid": False, "error": error_msg} + continue + + valid_count += 1 + + # Compute cell quality + quality, components = validator.cell_quality( + code, ast, test_list, cell, + alpha=args.alpha, beta=args.beta, gamma=args.gamma, + ) + problem["_cell"] = sorted(cell) + problem["_quality"] = components + + if components["fidelity"] < 1.0: + missing = set(cell) - set(components["detected"]) + print(f"MISSING constructs: {sorted(missing)}") + continue + + updated = cmap.update(cell, problem, quality, components) + if updated: + cell_updates += 1 + + dataset.append(problem) + task_id += 1 + + print( + f"OK quality={quality:.3f} " + f"fidelity={components['fidelity']:.2f} " + f"extra={len(components['extra'])}" + ) + + if len(dataset) % 50 == 0: + _save(dataset, output_path, cmap) + freq = cmap.node_type_frequency() + entropy = cmap.distribution_entropy() + print(f"\n ── Checkpoint ──────────────────────────────────") + print(f" Dataset: {len(dataset)} | Valid: {valid_count}/{call_count}") + print(f" {cmap.fill_summary()}") + print(f" Top-5 most frequent: {sorted(freq, key=freq.get, reverse=True)[:5]}") + print(f" Top-5 least frequent: {sorted(freq, key=freq.get)[:5]}") + print(f" ────────────────────────────────────────────────\n") + + time.sleep(0.5) + + _save(dataset, output_path, cmap) + return dataset, cmap, valid_count, call_count + +def run_map_elites_prior(args, client, lrm, output_path): + + print("\n Loading ConstructPrior...", flush=True) + prior_map = getattr(args, "prior_map","construct_map.yaml") + epsilon = getattr(args, "prior_epsilon", _PRIOR_EPSILON) + yaml_path = Path(prior_map) + + if yaml_path.exists(): + prior = ConstructPrior.from_yaml(yaml_path, epsilon=epsilon) + else: + # Fallback: yaml not found — use static prior and warn + print(f" [WARN] construct_map.yaml not found at '{yaml_path}'.") + print(f" [WARN] Using static fallback prior. Generate the real prior with:") + print(f" [WARN] python construct_prior.py --generate-map --github-token TOKEN") + prior = ConstructPrior.from_static_fallback(epsilon=epsilon) + + print(f" {prior.coverage_summary()}") + + validator = CellValidator(parser_url=args.parser) + cmap = CoverageMap(cell_size=args.cell_size) + selector = CellSelectorPrior( + cmap, prior, + quality_threshold=args.quality_threshold, + phase3_threshold=getattr(args, "prior_phase3_threshold", 0.70), + ) + dataset = [] + task_id = 1 + call_count = 0 + valid_count = 0 + cell_updates = 0 + + print(f"\n MAP-Elites+Prior mode | cells: {cmap.total_cells} | target: {args.problems} examples") + print(f" Cell size: {args.cell_size} | Quality threshold: {args.quality_threshold}") + print("─" * 65) + + max_calls = args.problems * 4 + + while len(dataset) < args.problems and call_count < max_calls: + + cell = selector.select() + existing = cmap.get_example(cell) + prior_w = prior.cell_weight(cell) + call_count += 1 + + print( + f" [{call_count:04d}] Cell {sorted(cell)} " + f"| prior={prior_w:.3f} " + f"| filled={cmap.filled_cells}/{cmap.total_cells} " + f"| dataset={len(dataset)} ... ", + end="", flush=True, + ) + + problem = call_api( + client, lrm, cell, task_id, + existing_example=existing, + map_summary=cmap.fill_summary(), + ) + + if problem is None: + print("SKIP (generation failed)") + continue + + code = problem["code"] + test_list = problem.get("test_list", []) + + is_valid, ast, error_msg = validator.parse(code) + + if is_valid is None: + is_valid, ast = True, {} + if call_count == 1: + print(f"\n Parser unavailable — using keyword fallback", flush=True) + + if is_valid is False: + print(f"INVALID ({error_msg[:40]})") + problem["_validation"] = {"valid": False, "error": error_msg} + continue + + valid_count += 1 + + quality, components = validator.cell_quality( + code, ast, test_list, cell, + alpha=args.alpha, beta=args.beta, gamma=args.gamma, + ) + problem["_cell"] = sorted(cell) + problem["_prior_weight"] = round(prior_w, 4) + problem["_quality"] = components + + if components["fidelity"] < 1.0: + missing = set(cell) - set(components["detected"]) + print(f"MISSING constructs: {sorted(missing)}") + continue + + updated = cmap.update(cell, problem, quality, components) + if updated: + cell_updates += 1 + + dataset.append(problem) + task_id += 1 + + print( + f"OK quality={quality:.3f} " + f"fidelity={components['fidelity']:.2f} " + f"prior={prior_w:.3f} " + f"extra={len(components['extra'])}" + ) + + if len(dataset) % 50 == 0: + _save(dataset, output_path, cmap, prior=prior) + freq = cmap.node_type_frequency() + entropy = cmap.distribution_entropy() + kl = prior.kl_divergence(freq) + print(f"\n ── Checkpoint ──────────────────────────────────") + print(f" Dataset: {len(dataset)} | Valid: {valid_count}/{call_count}") + print(f" {cmap.fill_summary()}") + print(f" KL(dataset ‖ prior): {kl:.4f} (lower = closer to production patterns)") + print(f" Top-5 most frequent: {sorted(freq, key=freq.get, reverse=True)[:5]}") + print(f" Top-5 least frequent: {sorted(freq, key=freq.get)[:5]}") + print(f" ────────────────────────────────────────────────\n") + + time.sleep(0.5) + + _save(dataset, output_path, cmap, prior=prior) + return dataset, cmap, valid_count, call_count, prior + + +def _save(dataset: list, path: Path, cmap: CoverageMap, prior: ConstructPrior = None): + with open(path, "w", encoding="utf-8") as f: + json.dump(dataset, f, ensure_ascii=False, indent=2) + + # Save coverage map statistics alongside dataset + stats_path = path.with_name(path.stem + "_coverage_stats.json") + freq = cmap.node_type_frequency() + stats = { + "total_cells": cmap.total_cells, + "filled_cells": cmap.filled_cells, + "fill_rate": round(cmap.fill_rate, 4), + "distribution_entropy": cmap.distribution_entropy(), + "node_type_frequency": freq, + "low_quality_cells": len(cmap.get_low_quality_cells()), + "empty_cells": len(cmap.get_empty_cells()), + } + if prior is not None: + stats["kl_divergence_dataset_vs_prior"] = prior.kl_divergence(freq) + stats["prior_summary"] = prior.coverage_summary() + with open(stats_path, "w", encoding="utf-8") as f: + json.dump(stats, f, ensure_ascii=False, indent=2) + +def main(): + parser = argparse.ArgumentParser( + description="AVAP Dataset Generator v2 — MAP-Elites Quality-Diversity Pipeline" + ) + parser.add_argument("--lrm", default="avap.md") + parser.add_argument("--output", default="output/mbpp_avap_v2.json") + parser.add_argument("--problems", type=int, default=5000) + parser.add_argument("--parser", default="http://localhost:8080", + help="AVAP parser URL") + parser.add_argument("--cell-size", type=int, default=3, + help="Max constructs per cell: 2=pairs, 3=pairs+trios (default: 3)") + parser.add_argument("--quality-threshold", type=float, default=0.80, + help="Min quality to consider a cell 'good' (default: 0.80)") + parser.add_argument("--alpha", type=float, default=0.30, + help="Weight for bonus constructs in cell quality (default: 0.30)") + parser.add_argument("--beta", type=float, default=0.20, + help="Weight for test quality in cell quality (default: 0.20)") + parser.add_argument("--gamma", type=float, default=0.10, + help="Weight for code richness in cell quality (default: 0.10)") + parser.add_argument( + "--mode", + choices=["map-elites-prior", "map-elites", "reward"], + default="map-elites-prior", + help=( + "map-elites-prior: Candidate F — MAP-Elites + ConstructPrior (default)\n" + "map-elites: Candidate E — MAP-Elites, uniform cell weighting\n" + "reward: Candidate A — CW-Reward pool (comparison baseline)" + ), + ) + parser.add_argument( + "--prior-map", + default="construct_map.yaml", + metavar="FILE", + help=( + "Path to construct_map.yaml generated by construct_prior.py.\n" + "Generate it first: python construct_prior.py --generate-map\n" + "Default: construct_map.yaml (in current directory)" + ), + ) + parser.add_argument( + "--prior-epsilon", + type=float, + default=_PRIOR_EPSILON, + help=f"Minimum prior weight for tail cells (default: {_PRIOR_EPSILON})", + ) + parser.add_argument( + "--prior-phase3-threshold", + type=float, + default=0.70, + help=( + "Quality threshold above which Phase 2 ends and tail (low-prior) " + "cells become the focus. Default: 0.70" + ), + ) + parser.add_argument("--api-key", default=None) + args = parser.parse_args() + + api_key = args.api_key or os.environ.get("ANTHROPIC_API_KEY") + if not api_key: + sys.exit("ERROR: ANTHROPIC_API_KEY not set.") + + lrm_path = Path(args.lrm) + if not lrm_path.exists(): + sys.exit(f"ERROR: LRM '{lrm_path}' not found.") + lrm = lrm_path.read_text(encoding="utf-8") + + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + + client = anthropic.Anthropic(api_key=api_key) + + mode_label = { + "map-elites-prior": "Candidate F — MAP-Elites + ConstructPrior", + "map-elites": "Candidate E — MAP-Elites (uniform)", + "reward": "Candidate A — CW-Reward pool", + }[args.mode] + + print("=" * 65) + print(" AVAP Dataset Generator v2 — MAP-Elites Pipeline") + print("=" * 65) + print(f" Mode : {mode_label}") + print(f" LRM : {lrm_path}") + print(f" Output : {output_path}") + print(f" Target examples: {args.problems}") + print(f" Parser URL : {args.parser}") + print(f" Cell size : {args.cell_size}") + print(f" Quality thresh : {args.quality_threshold}") + if args.mode == "map-elites-prior": + yaml_exists = Path(args.prior_map).exists() + print(f" Prior map : {args.prior_map} ({'✓ found' if yaml_exists else '✗ not found — will use static fallback'})") + print(f" Prior epsilon : {args.prior_epsilon}") + print("=" * 65) + + prior = None + + if args.mode == "map-elites-prior": + result = run_map_elites_prior(args, client, lrm, output_path) + dataset, cmap, valid_count, call_count, prior = result + elif args.mode == "map-elites": + dataset, cmap, valid_count, call_count = run_map_elites(args, client, lrm, output_path) + else: + sys.exit("ERROR: --mode reward (Candidate A) is not yet implemented in v2. " + "Use generate_mbap.py for the v1 reward baseline.") + + # Final report + freq = cmap.node_type_frequency() + entropy = cmap.distribution_entropy() + + print("\n" + "=" * 65) + print(" Pipeline complete") + print(f" Mode : {mode_label}") + print(f" Total API calls : {call_count}") + print(f" Valid examples : {valid_count} ({100*valid_count/max(call_count,1):.1f}%)") + print(f" Dataset size : {len(dataset)}") + print(f" {cmap.fill_summary()}") + print(f" Distribution entropy : {entropy:.3f} bits (max={math.log2(len(NODE_TYPE_NAMES)):.2f})") + if prior is not None: + kl = prior.kl_divergence(freq) + print(f" KL(dataset ‖ prior) : {kl:.4f} (0 = perfect alignment with production code)") + print(f" Most covered : {sorted(freq, key=freq.get, reverse=True)[:5]}") + print(f" Least covered : {sorted(freq, key=freq.get)[:5]}") + print(f" Output : {output_path}") + print("=" * 65) + + +if __name__ == "__main__": + main()