assistance-engine/scratches/pseco/synthetic_dataset/generate_parser_analysis.py

239 lines
7.0 KiB
Python

import io
import json
from pathlib import Path
import requests
import typer
from loguru import logger
from src.config import settings
app = typer.Typer()
DEFAULT_DATASETS = [
"synthetic_datasets/golden_dataset_parser_validation.json",
]
def load_tasks(dataset_path: Path) -> list[dict]:
"""Load tasks from a synthetic dataset JSON file.
Args:
dataset_path: Path to the JSON file containing tasks.
Returns:
List of task dictionaries.
"""
with dataset_path.open("r", encoding="utf-8") as f:
tasks: list[dict] = json.load(f)
logger.info(f"Loaded {len(tasks)} tasks from {dataset_path.name}")
return tasks
def execute_task(task: dict, api_url: str, timeout: int) -> dict:
"""Send a single task to the parser API and return the result.
Args:
task: Task dictionary with code and test data.
api_url: URL of the parser/validation API endpoint.
timeout: Timeout in seconds for the request.
Returns:
Parsed response dictionary from the API.
"""
payload = json.dumps([task]).encode("utf-8")
file_obj = io.BytesIO(payload)
response = requests.post(
api_url,
files={"file": ("task.json", file_obj, "application/json")},
timeout=timeout,
)
return _parse_response(response.text)
def _parse_response(raw: str) -> dict:
"""Parse the raw API response into a structured dict.
Args:
raw: Raw response text from the API.
Returns:
Parsed result dictionary.
"""
raw = raw.strip()
if not raw:
return {"success": False, "error": "Empty response from API"}
decoder = json.JSONDecoder()
objects: list[dict] = []
idx = 0
while idx < len(raw):
try:
obj, end_idx = decoder.raw_decode(raw, idx)
objects.append(obj)
idx = end_idx
except json.JSONDecodeError:
idx += 1
while idx < len(raw) and raw[idx] in " \t\n\r":
idx += 1
if not objects:
return {"success": False, "error": f"Could not parse response: {raw[:200]}"}
for obj in objects:
if not obj.get("success"):
return obj
if "result_sequence" in obj and obj["result_sequence"]:
return obj["result_sequence"][0]
return objects[0]
def build_analysis_record(
source_file: str,
task: dict,
result: dict,
) -> dict:
"""Build a consolidated analysis record for a single task.
Args:
source_file: Name of the source JSON file.
task: Original task dictionary.
result: Parsed API response for the task.
Returns:
Consolidated record with task data and execution results.
"""
passed = result.get("success", False) and result.get(
"assertion_result", True
)
error = result.get("error", "") if not passed else ""
return {
"source_file": source_file,
"task_id": task.get("task_id"),
"text": task.get("text", ""),
"code": task.get("code", ""),
"test_inputs": task.get("test_inputs", {}),
"test_list": task.get("test_list", []),
"execution_message": result,
"passed": passed,
"error": error,
"Local_Language_Server_Execution": "",
}
def analyze_datasets(
dataset_paths: list[Path],
api_url: str,
timeout: int,
) -> list[dict]:
"""Run parser analysis on every task across multiple dataset files.
Args:
dataset_paths: List of resolved paths to dataset JSON files.
api_url: URL of the parser/validation API endpoint.
timeout: Timeout in seconds per request.
Returns:
Consolidated list of analysis records.
"""
records: list[dict] = []
errors: list[str] = []
for dataset_path in dataset_paths:
source_file = dataset_path.name
tasks = load_tasks(dataset_path)
for idx, task in enumerate(tasks):
task_id = task.get("task_id", idx)
try:
result = execute_task(task, api_url, timeout)
record = build_analysis_record(source_file, task, result)
records.append(record)
status = "PASSED" if record["passed"] else "FAILED"
logger.info(f"[{source_file}] Task {task_id}: {status}")
except requests.RequestException as exc:
error_result = {"success": False, "error": str(exc)}
record = build_analysis_record(source_file, task, error_result)
records.append(record)
msg = f"[{source_file}] Task {task_id}: Request failed — {exc}"
errors.append(msg)
logger.error(msg)
passed_count = sum(1 for r in records if r["passed"])
total = len(records)
logger.info(f"Analysis complete: {passed_count}/{total} tasks passed")
if errors:
logger.error(
f"\n{'=' * 60}\n"
f"ERROR SUMMARY — {len(errors)} task(s) failed:\n"
+ "\n".join(f" - {e}" for e in errors)
+ f"\n{'=' * 60}"
)
return records
def save_analysis(records: list[dict], output_path: Path) -> None:
"""Write the consolidated analysis to a JSON file.
Args:
records: List of analysis record dicts.
output_path: Destination file path.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
logger.info(f"Saved parser analysis to {output_path}")
@app.command()
def generate_parser_analysis(
dataset_paths: list[str] = typer.Argument(
None,
help="Paths to dataset JSON files (relative to project root). "
"Defaults to all files in synthetic_datasets/.",
),
output_path: str = typer.Option(
"synthetic_datasets/validated_golden_synthetic_dataset_v2.json",
help="Output path for the consolidated analysis JSON.",
),
api_url: str = typer.Option(
settings.parser_url,
help="URL of the parser/validation API endpoint.",
),
timeout: int = typer.Option(
120,
help="Timeout in seconds for each API request.",
),
) -> None:
"""Run parser analysis on one or more synthetic dataset files.
Sends each task to the parser API, collects execution results,
and writes a consolidated JSON report with per-task outcomes
and a blank Local_Language_Server_Execution field.
Example usage:
python generate_parser_analysis.py \\
synthetic_datasets/mbap_avap_A.json \\
synthetic_datasets/mbpp_avap.json \\
--output-path output/parser_analysis.json
"""
if not dataset_paths:
dataset_paths = DEFAULT_DATASETS
resolved_paths = [settings.proj_root / p for p in dataset_paths]
resolved_output = settings.proj_root / output_path
records = analyze_datasets(resolved_paths, api_url, timeout)
save_analysis(records, resolved_output)
if __name__ == "__main__":
try:
app()
except Exception as exc:
logger.exception(exc)
raise