887 lines
30 KiB
Python
887 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AVAP Dataset Generator v2 — MAP-Elites Quality-Diversity Pipeline
|
|
==================================================================
|
|
|
|
View reference
|
|
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from itertools import combinations
|
|
from pathlib import Path
|
|
|
|
import anthropic
|
|
import requests
|
|
|
|
from construct_prior import ConstructPrior, AVAP_NODE_NAMES
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
AVAP_NODE_TYPES = {
|
|
"addParam": ["addParam("],
|
|
"addResult": ["addResult("],
|
|
"_status": ["_status"],
|
|
"addVar": ["addVar("],
|
|
"getListLen": ["getListLen("],
|
|
"getQueryParamList": ["getQueryParamList("],
|
|
"itemFromList": ["itemFromList("],
|
|
"replace": ["replace("],
|
|
"randomString": ["randomString("],
|
|
"if_mode1": ["if("],
|
|
"if_mode2": ["if(None, None,"],
|
|
"else": ["else()"],
|
|
"end": ["end()"],
|
|
"startLoop": ["startLoop("],
|
|
"endLoop": ["endLoop()"],
|
|
"try": ["try()"],
|
|
"exception": ["exception()"],
|
|
"return": ["return("],
|
|
"go": ["go("],
|
|
"gather": ["gather("],
|
|
"avapConnector": ["avapConnector("],
|
|
"ormCheckTable": ["ormCheckTable("],
|
|
"ormDirect": ["ormDirect("],
|
|
"ormAccessSelect": ["ormAccessSelect("],
|
|
"ormAccessInsert": ["ormAccessInsert("],
|
|
"ormAccessUpdate": ["ormAccessUpdate("],
|
|
"variableFromJSON": ["variableFromJSON("],
|
|
"AddVariableToJSON": ["AddVariableToJSON("],
|
|
"encodeSHA256": ["encodeSHA256("],
|
|
"encodeMD5": ["encodeMD5("],
|
|
"getTimeStamp": ["getTimeStamp("],
|
|
"getDateTime": ["getDateTime("],
|
|
"stampToDatetime": ["stampToDatetime("],
|
|
"RequestGet": ["RequestGet("],
|
|
"RequestPost": ["RequestPost("],
|
|
"function": ["function "],
|
|
"import": ["import "],
|
|
"include": ["include("],
|
|
}
|
|
|
|
NODE_TYPE_NAMES = AVAP_NODE_NAMES
|
|
_PRIOR_EPSILON = 0.05
|
|
|
|
class CellValidator:
|
|
|
|
def __init__(self, parser_url: str, parser_timeout: int = 5):
|
|
self.parser_url = parser_url.rstrip("/")
|
|
self.parser_timeout = parser_timeout
|
|
self._parser_available = True
|
|
|
|
|
|
def parse(self, code: str) -> tuple[bool, dict, str]:
|
|
|
|
if not self._parser_available:
|
|
return None, {}, "parser_unavailable"
|
|
try:
|
|
resp = requests.post(
|
|
f"{self.parser_url}/parse",
|
|
json={"code": code},
|
|
timeout=self.parser_timeout,
|
|
)
|
|
data = resp.json()
|
|
if data.get("valid", False):
|
|
return True, data.get("ast", {}), ""
|
|
return False, {}, data.get("error", "parse error")
|
|
except requests.exceptions.ConnectionError:
|
|
self._parser_available = False
|
|
return None, {}, "parser_unavailable"
|
|
except Exception as e:
|
|
return False, {}, str(e)
|
|
def detect_constructs(self, code: str, ast: dict) -> set:
|
|
if ast:
|
|
return self._from_ast(ast)
|
|
return self._from_source(code)
|
|
|
|
def _from_ast(self, ast: dict) -> set:
|
|
found = set()
|
|
if isinstance(ast, dict):
|
|
if "type" in ast:
|
|
found.add(ast["type"])
|
|
for v in ast.values():
|
|
found |= self._from_ast(v)
|
|
elif isinstance(ast, list):
|
|
for item in ast:
|
|
found |= self._from_ast(item)
|
|
return found
|
|
|
|
def _from_source(self, code: str) -> set:
|
|
found = set()
|
|
if "if(None, None," in code:
|
|
found.add("if_mode2")
|
|
elif "if(" in code:
|
|
found.add("if_mode1")
|
|
for name, patterns in AVAP_NODE_TYPES.items():
|
|
if name in ("if_mode1", "if_mode2"):
|
|
continue # already handled
|
|
for pat in patterns:
|
|
if pat in code:
|
|
found.add(name)
|
|
break
|
|
return found
|
|
|
|
def cell_quality(
|
|
self,
|
|
code: str,
|
|
ast: dict,
|
|
test_list: list,
|
|
cell: frozenset,
|
|
alpha: float = 0.3,
|
|
beta: float = 0.2,
|
|
gamma: float = 0.1,
|
|
) -> tuple[float, dict]:
|
|
|
|
detected = self.detect_constructs(code, ast)
|
|
all_types = set(NODE_TYPE_NAMES)
|
|
|
|
cell_constructs = set(cell)
|
|
present_required = cell_constructs & detected
|
|
fidelity = len(present_required) / max(len(cell_constructs), 1)
|
|
|
|
extra = detected - cell_constructs
|
|
bonus_ratio = len(extra) / max(len(all_types) - len(cell_constructs), 1)
|
|
|
|
tq = sum(
|
|
1 for t in test_list
|
|
if isinstance(t, str) and "re.match(" in t and len(t.strip()) > 10
|
|
) / max(len(test_list), 1)
|
|
|
|
lines = [l.strip() for l in code.split("\n") if l.strip()]
|
|
richness = min(len(lines) / 30.0, 1.0) # cap at 30 lines = 1.0
|
|
|
|
quality = fidelity + alpha * bonus_ratio + beta * tq + gamma * richness
|
|
|
|
return quality, {
|
|
"fidelity": round(fidelity, 3),
|
|
"bonus_ratio": round(bonus_ratio, 3),
|
|
"test_quality": round(tq, 3),
|
|
"richness": round(richness, 3),
|
|
"quality": round(quality, 3),
|
|
"detected": sorted(detected),
|
|
"cell": sorted(cell),
|
|
"extra": sorted(extra),
|
|
}
|
|
|
|
|
|
class CoverageMap:
|
|
|
|
|
|
def __init__(self, cell_size: int = 3):
|
|
|
|
self.cell_size = cell_size
|
|
self._map: dict[frozenset, tuple[dict, float, dict]] = {}
|
|
self._attempts: dict[frozenset, int] = defaultdict(int)
|
|
self._all_cells = self._build_cells()
|
|
|
|
def _build_cells(self) -> list[frozenset]:
|
|
cells = []
|
|
for size in range(2, self.cell_size + 1):
|
|
for combo in combinations(NODE_TYPE_NAMES, size):
|
|
cells.append(frozenset(combo))
|
|
return cells
|
|
|
|
@property
|
|
def total_cells(self) -> int:
|
|
return len(self._all_cells)
|
|
|
|
@property
|
|
def filled_cells(self) -> int:
|
|
return len(self._map)
|
|
|
|
@property
|
|
def fill_rate(self) -> float:
|
|
return self.filled_cells / max(self.total_cells, 1)
|
|
|
|
def update(
|
|
self,
|
|
cell: frozenset,
|
|
example: dict,
|
|
quality: float,
|
|
components: dict,
|
|
) -> bool:
|
|
self._attempts[cell] += 1
|
|
current = self._map.get(cell)
|
|
if current is None or quality > current[1]:
|
|
self._map[cell] = (example, quality, components)
|
|
return True
|
|
return False
|
|
|
|
def get_empty_cells(self) -> list[frozenset]:
|
|
return [c for c in self._all_cells if c not in self._map]
|
|
|
|
def get_low_quality_cells(self, threshold: float = 0.7) -> list[frozenset]:
|
|
return [
|
|
c for c, (_, q, _) in self._map.items()
|
|
if q < threshold
|
|
]
|
|
|
|
def get_example(self, cell: frozenset) -> dict | None:
|
|
entry = self._map.get(cell)
|
|
return entry[0] if entry else None
|
|
|
|
def all_examples(self) -> list[dict]:
|
|
return [ex for ex, _, _ in self._map.values()]
|
|
|
|
def node_type_frequency(self) -> dict[str, int]:
|
|
|
|
freq = defaultdict(int)
|
|
for cell in self._map:
|
|
for nt in cell:
|
|
freq[nt] += 1
|
|
return dict(freq)
|
|
|
|
def distribution_entropy(self) -> float:
|
|
|
|
freq = self.node_type_frequency()
|
|
total = sum(freq.values())
|
|
if total == 0:
|
|
return 0.0
|
|
entropy = 0.0
|
|
for count in freq.values():
|
|
p = count / total
|
|
if p > 0:
|
|
entropy -= p * math.log2(p)
|
|
return round(entropy, 3)
|
|
|
|
def fill_summary(self) -> str:
|
|
empty = len(self.get_empty_cells())
|
|
low = len(self.get_low_quality_cells())
|
|
entropy = self.distribution_entropy()
|
|
return (
|
|
f"Cells: {self.filled_cells}/{self.total_cells} filled "
|
|
f"({100*self.fill_rate:.1f}%) | "
|
|
f"Low quality: {low} | "
|
|
f"Empty: {empty} | "
|
|
f"Entropy: {entropy:.2f} bits"
|
|
)
|
|
|
|
class CellSelector:
|
|
|
|
|
|
def __init__(
|
|
self,
|
|
coverage_map: CoverageMap,
|
|
quality_threshold: float = 0.80,
|
|
ucb_c: float = 1.0,
|
|
):
|
|
self.map = coverage_map
|
|
self.quality_threshold = quality_threshold
|
|
self.ucb_c = ucb_c
|
|
self._total_calls = 0
|
|
import random
|
|
self._rng = random.Random(42)
|
|
|
|
def select(self) -> frozenset:
|
|
self._total_calls += 1
|
|
empty = self.map.get_empty_cells()
|
|
if empty:
|
|
return self._rng.choice(empty)
|
|
|
|
low = self.map.get_low_quality_cells(self.quality_threshold)
|
|
if low:
|
|
return self._rng.choice(low)
|
|
|
|
return self._ucb_select()
|
|
|
|
def _ucb_select(self) -> frozenset:
|
|
best_cell = None
|
|
best_score = -float("inf")
|
|
total = max(self._total_calls, 1)
|
|
|
|
for cell in self.map._all_cells:
|
|
attempts = max(self.map._attempts.get(cell, 0), 1)
|
|
entry = self.map._map.get(cell)
|
|
quality = entry[1] if entry else 0.0
|
|
score = quality + self.ucb_c * math.sqrt(math.log(total) / attempts)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_cell = cell
|
|
|
|
return best_cell
|
|
|
|
class CellSelectorPrior(CellSelector):
|
|
|
|
def __init__(
|
|
self,
|
|
coverage_map: CoverageMap,
|
|
prior: ConstructPrior,
|
|
quality_threshold: float = 0.80,
|
|
ucb_c: float = 1.0,
|
|
phase3_threshold: float = 0.70,
|
|
):
|
|
super().__init__(coverage_map, quality_threshold, ucb_c)
|
|
self.prior = prior
|
|
self.phase3_threshold = phase3_threshold
|
|
self._tail_cells: set[frozenset] = set()
|
|
self._phase3_active = False
|
|
|
|
def select(self) -> frozenset:
|
|
self._total_calls += 1
|
|
empty = self.map.get_empty_cells()
|
|
|
|
if empty:
|
|
high_prior_empty = [
|
|
c for c in empty
|
|
if self.prior.cell_weight(c) > self.prior.epsilon * 1.5
|
|
]
|
|
if high_prior_empty:
|
|
return self._weighted_sample(high_prior_empty)
|
|
return self._weighted_sample(empty)
|
|
|
|
low = self.map.get_low_quality_cells(self.quality_threshold)
|
|
if low:
|
|
return self._ucb_prior_select(low)
|
|
|
|
return self._ucb_prior_select(self.map._all_cells)
|
|
|
|
def _weighted_sample(self, cells: list[frozenset]) -> frozenset:
|
|
weights = [self.prior.cell_weight(c) for c in cells]
|
|
total = sum(weights)
|
|
if total == 0:
|
|
return self._rng.choice(cells)
|
|
r = self._rng.random() * total
|
|
cumsum = 0.0
|
|
for cell, w in zip(cells, weights):
|
|
cumsum += w
|
|
if r <= cumsum:
|
|
return cell
|
|
return cells[-1]
|
|
|
|
def _ucb_prior_select(self, cells) -> frozenset:
|
|
|
|
best_cell = None
|
|
best_score = -float("inf")
|
|
total = max(self._total_calls, 1)
|
|
|
|
for cell in cells:
|
|
attempts = max(self.map._attempts.get(cell, 0), 1)
|
|
entry = self.map._map.get(cell)
|
|
quality = entry[1] if entry else 0.0
|
|
prior_w = self.prior.cell_weight(cell)
|
|
ucb_term = self.ucb_c * math.sqrt(math.log(total) / attempts)
|
|
score = prior_w * (quality + ucb_term)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_cell = cell
|
|
|
|
return best_cell
|
|
|
|
SYSTEM_PROMPT = """Eres un experto en el lenguaje AVAP.
|
|
Se te proporciona el Language Reference Manual (LRM) completo de AVAP.
|
|
Tu tarea es generar UN problema de benchmark estilo MBPP para evaluar
|
|
modelos de lenguaje en su capacidad de generar código AVAP correcto.
|
|
|
|
REGLAS ESTRICTAS para el código AVAP generado:
|
|
1. Una instrucción por línea. EOL es el terminador absoluto.
|
|
2. Sin indentación significativa (es solo decorativa).
|
|
3. Bloques: if()...else()...end(), startLoop()...endLoop(), try()...exception()...end()
|
|
4. Funciones: function name(args) { ... return(val) }
|
|
5. if() Modo 1: if(var_o_literal, var_o_literal, "operador")
|
|
6. if() Modo 2: if(None, None, `expresion_completa_como_string`)
|
|
7. _status se asigna con: addVar(_status, 404)
|
|
8. ormAccessSelect firma: ormAccessSelect(campos, "tabla", selector, varTarget)
|
|
9. ormCheckTable firma: ormCheckTable(nombre_tabla, varTarget)
|
|
10. ormDirect firma: ormDirect("SELECT ... %s" % var, varTarget)
|
|
11. getQueryParamList firma: getQueryParamList(param_name, varTarget)
|
|
12. NUNCA uses registerEndpoint(), NUNCA uses mainHandler().
|
|
13. El código se ejecuta DIRECTAMENTE, línea a línea.
|
|
|
|
FORMATO DE SALIDA: responde ÚNICAMENTE con UN objeto JSON válido (no array).
|
|
Sin texto adicional, sin bloques de código markdown.
|
|
{
|
|
"task_id": 1,
|
|
"text": "<enunciado del problema en español>",
|
|
"code": "<código AVAP con saltos de línea como \\n>",
|
|
"test_inputs": { "<param1>": <valor1> },
|
|
"test_list": ["re.match(r'<patrón>', <variable>)", ...]
|
|
}
|
|
|
|
test_list: USA ÚNICAMENTE re.match(). NUNCA comparaciones directas (==, !=).
|
|
"""
|
|
|
|
|
|
def build_cell_prompt(
|
|
lrm: str,
|
|
cell: frozenset,
|
|
existing_example: dict | None,
|
|
map_summary: str,
|
|
) -> str:
|
|
constructs_list = ", ".join(f"`{c}`" for c in sorted(cell))
|
|
|
|
improvement_note = ""
|
|
if existing_example:
|
|
improvement_note = f"""
|
|
El siguiente ejemplo YA existe para esta combinación con calidad mejorable.
|
|
Genera algo DISTINTO y MÁS COMPLEJO que lo supere:
|
|
|
|
```
|
|
{existing_example.get('code', '')}
|
|
```
|
|
"""
|
|
|
|
return f"""# LRM AVAP — Language Reference Manual
|
|
|
|
{lrm}
|
|
|
|
---
|
|
|
|
# ESTADO DEL MAPA DE COBERTURA
|
|
|
|
{map_summary}
|
|
|
|
---
|
|
|
|
# TAREA — ESPECIFICACIÓN OBLIGATORIA
|
|
|
|
Genera UN ejemplo AVAP que use OBLIGATORIAMENTE TODOS estos constructs:
|
|
|
|
**{constructs_list}**
|
|
|
|
El ejemplo DEBE contener todos los constructs listados arriba.
|
|
Si tu código no los usa todos, la tarea fracasa.
|
|
|
|
Adicionalmente:
|
|
- Combina los constructs requeridos en un escenario realista de microservicio HTTP
|
|
- Añade constructs adicionales donde sea natural (aumenta la puntuación)
|
|
- Código complejo y rico — no ejemplos triviales de 3 líneas
|
|
- 2-3 aserciones re.match() en test_list
|
|
{improvement_note}
|
|
Responde ÚNICAMENTE con el objeto JSON. Sin texto antes ni después.
|
|
"""
|
|
|
|
|
|
def call_api(
|
|
client: anthropic.Anthropic,
|
|
lrm: str,
|
|
cell: frozenset,
|
|
task_id: int,
|
|
existing_example: dict | None,
|
|
map_summary: str,
|
|
retries: int = 3,
|
|
) -> dict | None:
|
|
|
|
for attempt in range(1, retries + 1):
|
|
try:
|
|
message = client.messages.create(
|
|
model="claude-sonnet-4-20250514",
|
|
max_tokens=4000,
|
|
system=SYSTEM_PROMPT,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": build_cell_prompt(lrm, cell, existing_example, map_summary),
|
|
}],
|
|
)
|
|
raw = message.content[0].text.strip()
|
|
|
|
if raw.startswith("```"):
|
|
lines = raw.splitlines()
|
|
raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
|
|
|
|
problem = json.loads(raw)
|
|
if not isinstance(problem, dict):
|
|
raise ValueError("Response is not a JSON object")
|
|
for field in ("text", "code", "test_list"):
|
|
if field not in problem:
|
|
raise ValueError(f"Missing field '{field}'")
|
|
if "test_inputs" not in problem:
|
|
problem["test_inputs"] = {}
|
|
problem["task_id"] = task_id
|
|
return problem
|
|
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
print(f"\n Attempt {attempt}/{retries} — parse error: {e}")
|
|
if attempt < retries:
|
|
time.sleep(2 ** attempt)
|
|
except anthropic.RateLimitError:
|
|
wait = 30 * attempt
|
|
print(f"\n Rate limit — waiting {wait}s...")
|
|
time.sleep(wait)
|
|
except anthropic.APIError as e:
|
|
print(f"\n API error at attempt {attempt}: {e}")
|
|
if attempt < retries:
|
|
time.sleep(5)
|
|
|
|
return None
|
|
|
|
|
|
def run_map_elites(args, client, lrm, output_path):
|
|
|
|
validator = CellValidator(parser_url=args.parser)
|
|
cmap = CoverageMap(cell_size=args.cell_size)
|
|
selector = CellSelector(cmap, quality_threshold=args.quality_threshold)
|
|
dataset = []
|
|
task_id = 1
|
|
call_count = 0
|
|
valid_count = 0
|
|
cell_updates = 0
|
|
|
|
print(f"\n MAP-Elites mode | cells: {cmap.total_cells} | target: {args.problems} examples")
|
|
print(f" Cell size: {args.cell_size} | Quality threshold: {args.quality_threshold}")
|
|
print("─" * 65)
|
|
|
|
max_calls = args.problems * 4
|
|
|
|
while len(dataset) < args.problems and call_count < max_calls:
|
|
|
|
cell = selector.select()
|
|
existing = cmap.get_example(cell)
|
|
call_count += 1
|
|
|
|
print(
|
|
f" [{call_count:04d}] Cell {sorted(cell)} "
|
|
f"| filled={cmap.filled_cells}/{cmap.total_cells} "
|
|
f"| dataset={len(dataset)} ... ",
|
|
end="", flush=True,
|
|
)
|
|
|
|
problem = call_api(
|
|
client, lrm, cell, task_id,
|
|
existing_example=existing,
|
|
map_summary=cmap.fill_summary(),
|
|
)
|
|
|
|
if problem is None:
|
|
print("SKIP (generation failed)")
|
|
continue
|
|
|
|
code = problem["code"]
|
|
test_list = problem.get("test_list", [])
|
|
|
|
is_valid, ast, error_msg = validator.parse(code)
|
|
|
|
if is_valid is None:
|
|
is_valid, ast = True, {}
|
|
if call_count == 1:
|
|
print(f"\n Parser unavailable — using keyword fallback", flush=True)
|
|
|
|
if is_valid is False:
|
|
print(f"INVALID ({error_msg[:40]})")
|
|
problem["_validation"] = {"valid": False, "error": error_msg}
|
|
continue
|
|
|
|
valid_count += 1
|
|
|
|
# Compute cell quality
|
|
quality, components = validator.cell_quality(
|
|
code, ast, test_list, cell,
|
|
alpha=args.alpha, beta=args.beta, gamma=args.gamma,
|
|
)
|
|
problem["_cell"] = sorted(cell)
|
|
problem["_quality"] = components
|
|
|
|
if components["fidelity"] < 1.0:
|
|
missing = set(cell) - set(components["detected"])
|
|
print(f"MISSING constructs: {sorted(missing)}")
|
|
continue
|
|
|
|
updated = cmap.update(cell, problem, quality, components)
|
|
if updated:
|
|
cell_updates += 1
|
|
|
|
dataset.append(problem)
|
|
task_id += 1
|
|
|
|
print(
|
|
f"OK quality={quality:.3f} "
|
|
f"fidelity={components['fidelity']:.2f} "
|
|
f"extra={len(components['extra'])}"
|
|
)
|
|
|
|
if len(dataset) % 50 == 0:
|
|
_save(dataset, output_path, cmap)
|
|
freq = cmap.node_type_frequency()
|
|
entropy = cmap.distribution_entropy()
|
|
print(f"\n ── Checkpoint ──────────────────────────────────")
|
|
print(f" Dataset: {len(dataset)} | Valid: {valid_count}/{call_count}")
|
|
print(f" {cmap.fill_summary()}")
|
|
print(f" Top-5 most frequent: {sorted(freq, key=freq.get, reverse=True)[:5]}")
|
|
print(f" Top-5 least frequent: {sorted(freq, key=freq.get)[:5]}")
|
|
print(f" ────────────────────────────────────────────────\n")
|
|
|
|
time.sleep(0.5)
|
|
|
|
_save(dataset, output_path, cmap)
|
|
return dataset, cmap, valid_count, call_count
|
|
|
|
def run_map_elites_prior(args, client, lrm, output_path):
|
|
|
|
print("\n Loading ConstructPrior...", flush=True)
|
|
prior_map = getattr(args, "prior_map","construct_map.yaml")
|
|
epsilon = getattr(args, "prior_epsilon", _PRIOR_EPSILON)
|
|
yaml_path = Path(prior_map)
|
|
|
|
if yaml_path.exists():
|
|
prior = ConstructPrior.from_yaml(yaml_path, epsilon=epsilon)
|
|
else:
|
|
# Fallback: yaml not found — use static prior and warn
|
|
print(f" [WARN] construct_map.yaml not found at '{yaml_path}'.")
|
|
print(f" [WARN] Using static fallback prior. Generate the real prior with:")
|
|
print(f" [WARN] python construct_prior.py --generate-map --github-token TOKEN")
|
|
prior = ConstructPrior.from_static_fallback(epsilon=epsilon)
|
|
|
|
print(f" {prior.coverage_summary()}")
|
|
|
|
validator = CellValidator(parser_url=args.parser)
|
|
cmap = CoverageMap(cell_size=args.cell_size)
|
|
selector = CellSelectorPrior(
|
|
cmap, prior,
|
|
quality_threshold=args.quality_threshold,
|
|
phase3_threshold=getattr(args, "prior_phase3_threshold", 0.70),
|
|
)
|
|
dataset = []
|
|
task_id = 1
|
|
call_count = 0
|
|
valid_count = 0
|
|
cell_updates = 0
|
|
|
|
print(f"\n MAP-Elites+Prior mode | cells: {cmap.total_cells} | target: {args.problems} examples")
|
|
print(f" Cell size: {args.cell_size} | Quality threshold: {args.quality_threshold}")
|
|
print("─" * 65)
|
|
|
|
max_calls = args.problems * 4
|
|
|
|
while len(dataset) < args.problems and call_count < max_calls:
|
|
|
|
cell = selector.select()
|
|
existing = cmap.get_example(cell)
|
|
prior_w = prior.cell_weight(cell)
|
|
call_count += 1
|
|
|
|
print(
|
|
f" [{call_count:04d}] Cell {sorted(cell)} "
|
|
f"| prior={prior_w:.3f} "
|
|
f"| filled={cmap.filled_cells}/{cmap.total_cells} "
|
|
f"| dataset={len(dataset)} ... ",
|
|
end="", flush=True,
|
|
)
|
|
|
|
problem = call_api(
|
|
client, lrm, cell, task_id,
|
|
existing_example=existing,
|
|
map_summary=cmap.fill_summary(),
|
|
)
|
|
|
|
if problem is None:
|
|
print("SKIP (generation failed)")
|
|
continue
|
|
|
|
code = problem["code"]
|
|
test_list = problem.get("test_list", [])
|
|
|
|
is_valid, ast, error_msg = validator.parse(code)
|
|
|
|
if is_valid is None:
|
|
is_valid, ast = True, {}
|
|
if call_count == 1:
|
|
print(f"\n Parser unavailable — using keyword fallback", flush=True)
|
|
|
|
if is_valid is False:
|
|
print(f"INVALID ({error_msg[:40]})")
|
|
problem["_validation"] = {"valid": False, "error": error_msg}
|
|
continue
|
|
|
|
valid_count += 1
|
|
|
|
quality, components = validator.cell_quality(
|
|
code, ast, test_list, cell,
|
|
alpha=args.alpha, beta=args.beta, gamma=args.gamma,
|
|
)
|
|
problem["_cell"] = sorted(cell)
|
|
problem["_prior_weight"] = round(prior_w, 4)
|
|
problem["_quality"] = components
|
|
|
|
if components["fidelity"] < 1.0:
|
|
missing = set(cell) - set(components["detected"])
|
|
print(f"MISSING constructs: {sorted(missing)}")
|
|
continue
|
|
|
|
updated = cmap.update(cell, problem, quality, components)
|
|
if updated:
|
|
cell_updates += 1
|
|
|
|
dataset.append(problem)
|
|
task_id += 1
|
|
|
|
print(
|
|
f"OK quality={quality:.3f} "
|
|
f"fidelity={components['fidelity']:.2f} "
|
|
f"prior={prior_w:.3f} "
|
|
f"extra={len(components['extra'])}"
|
|
)
|
|
|
|
if len(dataset) % 50 == 0:
|
|
_save(dataset, output_path, cmap, prior=prior)
|
|
freq = cmap.node_type_frequency()
|
|
entropy = cmap.distribution_entropy()
|
|
kl = prior.kl_divergence(freq)
|
|
print(f"\n ── Checkpoint ──────────────────────────────────")
|
|
print(f" Dataset: {len(dataset)} | Valid: {valid_count}/{call_count}")
|
|
print(f" {cmap.fill_summary()}")
|
|
print(f" KL(dataset ‖ prior): {kl:.4f} (lower = closer to production patterns)")
|
|
print(f" Top-5 most frequent: {sorted(freq, key=freq.get, reverse=True)[:5]}")
|
|
print(f" Top-5 least frequent: {sorted(freq, key=freq.get)[:5]}")
|
|
print(f" ────────────────────────────────────────────────\n")
|
|
|
|
time.sleep(0.5)
|
|
|
|
_save(dataset, output_path, cmap, prior=prior)
|
|
return dataset, cmap, valid_count, call_count, prior
|
|
|
|
|
|
def _save(dataset: list, path: Path, cmap: CoverageMap, prior: ConstructPrior = None):
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(dataset, f, ensure_ascii=False, indent=2)
|
|
|
|
# Save coverage map statistics alongside dataset
|
|
stats_path = path.with_name(path.stem + "_coverage_stats.json")
|
|
freq = cmap.node_type_frequency()
|
|
stats = {
|
|
"total_cells": cmap.total_cells,
|
|
"filled_cells": cmap.filled_cells,
|
|
"fill_rate": round(cmap.fill_rate, 4),
|
|
"distribution_entropy": cmap.distribution_entropy(),
|
|
"node_type_frequency": freq,
|
|
"low_quality_cells": len(cmap.get_low_quality_cells()),
|
|
"empty_cells": len(cmap.get_empty_cells()),
|
|
}
|
|
if prior is not None:
|
|
stats["kl_divergence_dataset_vs_prior"] = prior.kl_divergence(freq)
|
|
stats["prior_summary"] = prior.coverage_summary()
|
|
with open(stats_path, "w", encoding="utf-8") as f:
|
|
json.dump(stats, f, ensure_ascii=False, indent=2)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="AVAP Dataset Generator v2 — MAP-Elites Quality-Diversity Pipeline"
|
|
)
|
|
parser.add_argument("--lrm", default="avap.md")
|
|
parser.add_argument("--output", default="output/mbpp_avap_v2.json")
|
|
parser.add_argument("--problems", type=int, default=5000)
|
|
parser.add_argument("--parser", default="http://localhost:8080",
|
|
help="AVAP parser URL")
|
|
parser.add_argument("--cell-size", type=int, default=3,
|
|
help="Max constructs per cell: 2=pairs, 3=pairs+trios (default: 3)")
|
|
parser.add_argument("--quality-threshold", type=float, default=0.80,
|
|
help="Min quality to consider a cell 'good' (default: 0.80)")
|
|
parser.add_argument("--alpha", type=float, default=0.30,
|
|
help="Weight for bonus constructs in cell quality (default: 0.30)")
|
|
parser.add_argument("--beta", type=float, default=0.20,
|
|
help="Weight for test quality in cell quality (default: 0.20)")
|
|
parser.add_argument("--gamma", type=float, default=0.10,
|
|
help="Weight for code richness in cell quality (default: 0.10)")
|
|
parser.add_argument(
|
|
"--mode",
|
|
choices=["map-elites-prior", "map-elites", "reward"],
|
|
default="map-elites-prior",
|
|
help=(
|
|
"map-elites-prior: Candidate F — MAP-Elites + ConstructPrior (default)\n"
|
|
"map-elites: Candidate E — MAP-Elites, uniform cell weighting\n"
|
|
"reward: Candidate A — CW-Reward pool (comparison baseline)"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--prior-map",
|
|
default="construct_map.yaml",
|
|
metavar="FILE",
|
|
help=(
|
|
"Path to construct_map.yaml generated by construct_prior.py.\n"
|
|
"Generate it first: python construct_prior.py --generate-map\n"
|
|
"Default: construct_map.yaml (in current directory)"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--prior-epsilon",
|
|
type=float,
|
|
default=_PRIOR_EPSILON,
|
|
help=f"Minimum prior weight for tail cells (default: {_PRIOR_EPSILON})",
|
|
)
|
|
parser.add_argument(
|
|
"--prior-phase3-threshold",
|
|
type=float,
|
|
default=0.70,
|
|
help=(
|
|
"Quality threshold above which Phase 2 ends and tail (low-prior) "
|
|
"cells become the focus. Default: 0.70"
|
|
),
|
|
)
|
|
parser.add_argument("--api-key", default=None)
|
|
args = parser.parse_args()
|
|
|
|
api_key = args.api_key or os.environ.get("ANTHROPIC_API_KEY")
|
|
if not api_key:
|
|
sys.exit("ERROR: ANTHROPIC_API_KEY not set.")
|
|
|
|
lrm_path = Path(args.lrm)
|
|
if not lrm_path.exists():
|
|
sys.exit(f"ERROR: LRM '{lrm_path}' not found.")
|
|
lrm = lrm_path.read_text(encoding="utf-8")
|
|
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
client = anthropic.Anthropic(api_key=api_key)
|
|
|
|
mode_label = {
|
|
"map-elites-prior": "Candidate F — MAP-Elites + ConstructPrior",
|
|
"map-elites": "Candidate E — MAP-Elites (uniform)",
|
|
"reward": "Candidate A — CW-Reward pool",
|
|
}[args.mode]
|
|
|
|
print("=" * 65)
|
|
print(" AVAP Dataset Generator v2 — MAP-Elites Pipeline")
|
|
print("=" * 65)
|
|
print(f" Mode : {mode_label}")
|
|
print(f" LRM : {lrm_path}")
|
|
print(f" Output : {output_path}")
|
|
print(f" Target examples: {args.problems}")
|
|
print(f" Parser URL : {args.parser}")
|
|
print(f" Cell size : {args.cell_size}")
|
|
print(f" Quality thresh : {args.quality_threshold}")
|
|
if args.mode == "map-elites-prior":
|
|
yaml_exists = Path(args.prior_map).exists()
|
|
print(f" Prior map : {args.prior_map} ({'✓ found' if yaml_exists else '✗ not found — will use static fallback'})")
|
|
print(f" Prior epsilon : {args.prior_epsilon}")
|
|
print("=" * 65)
|
|
|
|
prior = None
|
|
|
|
if args.mode == "map-elites-prior":
|
|
result = run_map_elites_prior(args, client, lrm, output_path)
|
|
dataset, cmap, valid_count, call_count, prior = result
|
|
elif args.mode == "map-elites":
|
|
dataset, cmap, valid_count, call_count = run_map_elites(args, client, lrm, output_path)
|
|
else:
|
|
sys.exit("ERROR: --mode reward (Candidate A) is not yet implemented in v2. "
|
|
"Use generate_mbap.py for the v1 reward baseline.")
|
|
|
|
# Final report
|
|
freq = cmap.node_type_frequency()
|
|
entropy = cmap.distribution_entropy()
|
|
|
|
print("\n" + "=" * 65)
|
|
print(" Pipeline complete")
|
|
print(f" Mode : {mode_label}")
|
|
print(f" Total API calls : {call_count}")
|
|
print(f" Valid examples : {valid_count} ({100*valid_count/max(call_count,1):.1f}%)")
|
|
print(f" Dataset size : {len(dataset)}")
|
|
print(f" {cmap.fill_summary()}")
|
|
print(f" Distribution entropy : {entropy:.3f} bits (max={math.log2(len(NODE_TYPE_NAMES)):.2f})")
|
|
if prior is not None:
|
|
kl = prior.kl_divergence(freq)
|
|
print(f" KL(dataset ‖ prior) : {kl:.4f} (0 = perfect alignment with production code)")
|
|
print(f" Most covered : {sorted(freq, key=freq.get, reverse=True)[:5]}")
|
|
print(f" Least covered : {sorted(freq, key=freq.get)[:5]}")
|
|
print(f" Output : {output_path}")
|
|
print("=" * 65)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|