239 lines
7.0 KiB
Python
239 lines
7.0 KiB
Python
import io
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import typer
|
|
from loguru import logger
|
|
|
|
from src.config import settings
|
|
|
|
app = typer.Typer()
|
|
|
|
DEFAULT_DATASETS = [
|
|
"output/candidate_A_reward_10_v3.json",
|
|
]
|
|
|
|
|
|
def load_tasks(dataset_path: Path) -> list[dict]:
|
|
"""Load tasks from a synthetic dataset JSON file.
|
|
|
|
Args:
|
|
dataset_path: Path to the JSON file containing tasks.
|
|
|
|
Returns:
|
|
List of task dictionaries.
|
|
"""
|
|
with dataset_path.open("r", encoding="utf-8") as f:
|
|
tasks: list[dict] = json.load(f)
|
|
logger.info(f"Loaded {len(tasks)} tasks from {dataset_path.name}")
|
|
return tasks
|
|
|
|
|
|
def execute_task(task: dict, api_url: str, timeout: int) -> dict:
|
|
"""Send a single task to the parser API and return the result.
|
|
|
|
Args:
|
|
task: Task dictionary with code and test data.
|
|
api_url: URL of the parser/validation API endpoint.
|
|
timeout: Timeout in seconds for the request.
|
|
|
|
Returns:
|
|
Parsed response dictionary from the API.
|
|
"""
|
|
payload = json.dumps([task]).encode("utf-8")
|
|
file_obj = io.BytesIO(payload)
|
|
response = requests.post(
|
|
api_url,
|
|
files={"file": ("task.json", file_obj, "application/json")},
|
|
timeout=timeout,
|
|
)
|
|
return _parse_response(response.text)
|
|
|
|
|
|
def _parse_response(raw: str) -> dict:
|
|
"""Parse the raw API response into a structured dict.
|
|
|
|
Args:
|
|
raw: Raw response text from the API.
|
|
|
|
Returns:
|
|
Parsed result dictionary.
|
|
"""
|
|
raw = raw.strip()
|
|
if not raw:
|
|
return {"success": False, "error": "Empty response from API"}
|
|
|
|
decoder = json.JSONDecoder()
|
|
objects: list[dict] = []
|
|
idx = 0
|
|
while idx < len(raw):
|
|
try:
|
|
obj, end_idx = decoder.raw_decode(raw, idx)
|
|
objects.append(obj)
|
|
idx = end_idx
|
|
except json.JSONDecodeError:
|
|
idx += 1
|
|
while idx < len(raw) and raw[idx] in " \t\n\r":
|
|
idx += 1
|
|
|
|
if not objects:
|
|
return {"success": False, "error": f"Could not parse response: {raw[:200]}"}
|
|
|
|
for obj in objects:
|
|
if not obj.get("success"):
|
|
return obj
|
|
if "result_sequence" in obj and obj["result_sequence"]:
|
|
return obj["result_sequence"][0]
|
|
|
|
return objects[0]
|
|
|
|
|
|
def build_analysis_record(
|
|
source_file: str,
|
|
task: dict,
|
|
result: dict,
|
|
) -> dict:
|
|
"""Build a consolidated analysis record for a single task.
|
|
|
|
Args:
|
|
source_file: Name of the source JSON file.
|
|
task: Original task dictionary.
|
|
result: Parsed API response for the task.
|
|
|
|
Returns:
|
|
Consolidated record with task data and execution results.
|
|
"""
|
|
passed = result.get("success", False) and result.get(
|
|
"assertion_result", True
|
|
)
|
|
error = result.get("error", "") if not passed else ""
|
|
|
|
return {
|
|
"source_file": source_file,
|
|
"task_id": task.get("task_id"),
|
|
"text": task.get("text", ""),
|
|
"code": task.get("code", ""),
|
|
"test_inputs": task.get("test_inputs", {}),
|
|
"test_list": task.get("test_list", []),
|
|
"execution_message": result,
|
|
"passed": passed,
|
|
"error": error,
|
|
"Local_Language_Server_Execution": "",
|
|
}
|
|
|
|
|
|
def analyze_datasets(
|
|
dataset_paths: list[Path],
|
|
api_url: str,
|
|
timeout: int,
|
|
) -> list[dict]:
|
|
"""Run parser analysis on every task across multiple dataset files.
|
|
|
|
Args:
|
|
dataset_paths: List of resolved paths to dataset JSON files.
|
|
api_url: URL of the parser/validation API endpoint.
|
|
timeout: Timeout in seconds per request.
|
|
|
|
Returns:
|
|
Consolidated list of analysis records.
|
|
"""
|
|
records: list[dict] = []
|
|
errors: list[str] = []
|
|
|
|
for dataset_path in dataset_paths:
|
|
source_file = dataset_path.name
|
|
tasks = load_tasks(dataset_path)
|
|
|
|
for idx, task in enumerate(tasks):
|
|
task_id = task.get("task_id", idx)
|
|
try:
|
|
result = execute_task(task, api_url, timeout)
|
|
record = build_analysis_record(source_file, task, result)
|
|
records.append(record)
|
|
status = "PASSED" if record["passed"] else "FAILED"
|
|
logger.info(f"[{source_file}] Task {task_id}: {status}")
|
|
except requests.RequestException as exc:
|
|
error_result = {"success": False, "error": str(exc)}
|
|
record = build_analysis_record(source_file, task, error_result)
|
|
records.append(record)
|
|
msg = f"[{source_file}] Task {task_id}: Request failed — {exc}"
|
|
errors.append(msg)
|
|
logger.error(msg)
|
|
|
|
passed_count = sum(1 for r in records if r["passed"])
|
|
total = len(records)
|
|
logger.info(f"Analysis complete: {passed_count}/{total} tasks passed")
|
|
|
|
if errors:
|
|
logger.error(
|
|
f"\n{'=' * 60}\n"
|
|
f"ERROR SUMMARY — {len(errors)} task(s) failed:\n"
|
|
+ "\n".join(f" - {e}" for e in errors)
|
|
+ f"\n{'=' * 60}"
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def save_analysis(records: list[dict], output_path: Path) -> None:
|
|
"""Write the consolidated analysis to a JSON file.
|
|
|
|
Args:
|
|
records: List of analysis record dicts.
|
|
output_path: Destination file path.
|
|
"""
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with output_path.open("w", encoding="utf-8") as f:
|
|
json.dump(records, f, ensure_ascii=False, indent=2)
|
|
logger.info(f"Saved parser analysis to {output_path}")
|
|
|
|
|
|
@app.command()
|
|
def generate_parser_analysis(
|
|
dataset_paths: list[str] = typer.Argument(
|
|
None,
|
|
help="Paths to dataset JSON files (relative to project root). "
|
|
"Defaults to all files in synthetic_datasets/.",
|
|
),
|
|
output_path: str = typer.Option(
|
|
"output/parser_analysis_candidate_A_v3.json",
|
|
help="Output path for the consolidated analysis JSON.",
|
|
),
|
|
api_url: str = typer.Option(
|
|
settings.parser_url,
|
|
help="URL of the parser/validation API endpoint.",
|
|
),
|
|
timeout: int = typer.Option(
|
|
120,
|
|
help="Timeout in seconds for each API request.",
|
|
),
|
|
) -> None:
|
|
"""Run parser analysis on one or more synthetic dataset files.
|
|
|
|
Sends each task to the parser API, collects execution results,
|
|
and writes a consolidated JSON report with per-task outcomes
|
|
and a blank Local_Language_Server_Execution field.
|
|
|
|
Example usage:
|
|
python generate_parser_analysis.py \\
|
|
synthetic_datasets/mbap_avap_A.json \\
|
|
synthetic_datasets/mbpp_avap.json \\
|
|
--output-path output/parser_analysis.json
|
|
"""
|
|
if not dataset_paths:
|
|
dataset_paths = DEFAULT_DATASETS
|
|
resolved_paths = [settings.proj_root / p for p in dataset_paths]
|
|
resolved_output = settings.proj_root / output_path
|
|
|
|
records = analyze_datasets(resolved_paths, api_url, timeout)
|
|
save_analysis(records, resolved_output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
app()
|
|
except Exception as exc:
|
|
logger.exception(exc)
|
|
raise
|