import io import json from pathlib import Path import requests import typer from loguru import logger from src.config import settings app = typer.Typer() DEFAULT_DATASETS = [ "synthetic_datasets/golden_dataset_parser_validation.json", ] def load_tasks(dataset_path: Path) -> list[dict]: """Load tasks from a synthetic dataset JSON file. Args: dataset_path: Path to the JSON file containing tasks. Returns: List of task dictionaries. """ with dataset_path.open("r", encoding="utf-8") as f: tasks: list[dict] = json.load(f) logger.info(f"Loaded {len(tasks)} tasks from {dataset_path.name}") return tasks def execute_task(task: dict, api_url: str, timeout: int) -> dict: """Send a single task to the parser API and return the result. Args: task: Task dictionary with code and test data. api_url: URL of the parser/validation API endpoint. timeout: Timeout in seconds for the request. Returns: Parsed response dictionary from the API. """ payload = json.dumps([task]).encode("utf-8") file_obj = io.BytesIO(payload) response = requests.post( api_url, files={"file": ("task.json", file_obj, "application/json")}, timeout=timeout, ) return _parse_response(response.text) def _parse_response(raw: str) -> dict: """Parse the raw API response into a structured dict. Args: raw: Raw response text from the API. Returns: Parsed result dictionary. """ raw = raw.strip() if not raw: return {"success": False, "error": "Empty response from API"} decoder = json.JSONDecoder() objects: list[dict] = [] idx = 0 while idx < len(raw): try: obj, end_idx = decoder.raw_decode(raw, idx) objects.append(obj) idx = end_idx except json.JSONDecodeError: idx += 1 while idx < len(raw) and raw[idx] in " \t\n\r": idx += 1 if not objects: return {"success": False, "error": f"Could not parse response: {raw[:200]}"} for obj in objects: if not obj.get("success"): return obj if "result_sequence" in obj and obj["result_sequence"]: return obj["result_sequence"][0] return objects[0] def build_analysis_record( source_file: str, task: dict, result: dict, ) -> dict: """Build a consolidated analysis record for a single task. Args: source_file: Name of the source JSON file. task: Original task dictionary. result: Parsed API response for the task. Returns: Consolidated record with task data and execution results. """ passed = result.get("success", False) and result.get( "assertion_result", True ) error = result.get("error", "") if not passed else "" return { "source_file": source_file, "task_id": task.get("task_id"), "text": task.get("text", ""), "code": task.get("code", ""), "test_inputs": task.get("test_inputs", {}), "test_list": task.get("test_list", []), "execution_message": result, "passed": passed, "error": error, "Local_Language_Server_Execution": "", } def analyze_datasets( dataset_paths: list[Path], api_url: str, timeout: int, ) -> list[dict]: """Run parser analysis on every task across multiple dataset files. Args: dataset_paths: List of resolved paths to dataset JSON files. api_url: URL of the parser/validation API endpoint. timeout: Timeout in seconds per request. Returns: Consolidated list of analysis records. """ records: list[dict] = [] errors: list[str] = [] for dataset_path in dataset_paths: source_file = dataset_path.name tasks = load_tasks(dataset_path) for idx, task in enumerate(tasks): task_id = task.get("task_id", idx) try: result = execute_task(task, api_url, timeout) record = build_analysis_record(source_file, task, result) records.append(record) status = "PASSED" if record["passed"] else "FAILED" logger.info(f"[{source_file}] Task {task_id}: {status}") except requests.RequestException as exc: error_result = {"success": False, "error": str(exc)} record = build_analysis_record(source_file, task, error_result) records.append(record) msg = f"[{source_file}] Task {task_id}: Request failed — {exc}" errors.append(msg) logger.error(msg) passed_count = sum(1 for r in records if r["passed"]) total = len(records) logger.info(f"Analysis complete: {passed_count}/{total} tasks passed") if errors: logger.error( f"\n{'=' * 60}\n" f"ERROR SUMMARY — {len(errors)} task(s) failed:\n" + "\n".join(f" - {e}" for e in errors) + f"\n{'=' * 60}" ) return records def save_analysis(records: list[dict], output_path: Path) -> None: """Write the consolidated analysis to a JSON file. Args: records: List of analysis record dicts. output_path: Destination file path. """ output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as f: json.dump(records, f, ensure_ascii=False, indent=2) logger.info(f"Saved parser analysis to {output_path}") @app.command() def generate_parser_analysis( dataset_paths: list[str] = typer.Argument( None, help="Paths to dataset JSON files (relative to project root). " "Defaults to all files in synthetic_datasets/.", ), output_path: str = typer.Option( "synthetic_datasets/validated_golden_synthetic_dataset_v2.json", help="Output path for the consolidated analysis JSON.", ), api_url: str = typer.Option( settings.parser_url, help="URL of the parser/validation API endpoint.", ), timeout: int = typer.Option( 120, help="Timeout in seconds for each API request.", ), ) -> None: """Run parser analysis on one or more synthetic dataset files. Sends each task to the parser API, collects execution results, and writes a consolidated JSON report with per-task outcomes and a blank Local_Language_Server_Execution field. Example usage: python generate_parser_analysis.py \\ synthetic_datasets/mbap_avap_A.json \\ synthetic_datasets/mbpp_avap.json \\ --output-path output/parser_analysis.json """ if not dataset_paths: dataset_paths = DEFAULT_DATASETS resolved_paths = [settings.proj_root / p for p in dataset_paths] resolved_output = settings.proj_root / output_path records = analyze_datasets(resolved_paths, api_url, timeout) save_analysis(records, resolved_output) if __name__ == "__main__": try: app() except Exception as exc: logger.exception(exc) raise