import json from enum import Enum from typing import Optional from datasets import load_dataset import boto3 import typer from loguru import logger from botocore.config import Config from pathlib import Path from langchain_core.messages import SystemMessage, HumanMessage from src.utils.llm_factory import create_chat_model from scripts.pipelines.tasks.prompts import ( get_prompt_mbpp, get_prompt_human_eval, ) # System prompt from generate_mbap.py, optimized for both Claude and Bedrock SYSTEM_PROMPT = """Eres un experto en el lenguaje AVAP. Se te proporciona el Language Reference Manual (LRM) completo de AVAP. Tu tarea es generar problemas de benchmark estilo MBPP para evaluar modelos de lenguaje en su capacidad de generar código AVAP correcto. REGLAS ESTRICTAS para el código AVAP generado: 1. Una instrucción por línea. EOL es el terminador absoluto. 2. Sin indentación significativa (es solo decorativa). 3. Bloques de control: if()...else()...end(), startLoop()...endLoop(), try()...exception()...end() 4. Funciones: function name(args) { ... return(val) } 5. if() Modo 1: if(var_o_literal, var_o_literal, "operador") — los argumentos NO pueden ser expresiones de acceso como dict['key']; hay que extraer el valor a una variable propia primero. 6. if() Modo 2: if(None, None, `expresion_completa_como_string`) 7. _status se asigna con: addVar(_status, 404) 8. ormAccessSelect firma: ormAccessSelect(campos, "tabla", selector, varTarget) — selector puede ser cadena vacía. 9. Acceso a campos de dict: val = dict['campo'] (línea propia, luego se usa val). 10. Genera ÚNICAMENTE código AVAP válido según el LRM. Sin Python, sin pseudocódigo. MODO DE EJECUCIÓN — MUY IMPORTANTE: - El código se ejecuta DIRECTAMENTE, línea a línea, sin servidor ni registro de endpoints. - NUNCA uses registerEndpoint(), NUNCA uses mainHandler(), NUNCA envuelvas el código en funciones solo para ejecutarlo. - El código correcto es simplemente las instrucciones en línea, por ejemplo: result = "Hello World" addResult(result) - Si el problema requiere una función auxiliar reutilizable, defínela con function...{} y llámala directamente después: function double(n) { return(n * 2) } addParam("n", n) result = double(n) addResult(result) - NUNCA termines el código con registerEndpoint ni con ninguna llamada de registro. FORMATO DE SALIDA: responde ÚNICAMENTE con un array JSON válido. Sin texto adicional, sin bloques de código markdown, sin explicaciones. Estructura exacta de cada elemento: { "task_id": , "text": "", "code": "", "test_inputs": { "": , "": }, "test_list": ["", ""] } FORMATO DE test_inputs — MUY IMPORTANTE: - Es un objeto JSON con un valor fijo para cada variable que el código recibe via addParam(). - Los nombres de las claves deben coincidir EXACTAMENTE con el nombre de variable usado en addParam(). - Los valores deben ser concretos y representativos del problema (no genéricos como "test" o 123). - Si el código no tiene ningún addParam(), el campo test_inputs debe ser un objeto vacío: {} - Estos valores son los que el evaluador inyectará en el stack antes de ejecutar el código, de modo que las aserciones de test_list puedan validar las variables de salida resultantes. Ejemplo con addParam: código: addParam("password", password)\\nencodeSHA256(password, hashed)\\naddResult(hashed) test_inputs: { "password": "secret123" } test_list: ["re.match(r'^[a-f0-9]{64}$', hashed)"] Ejemplo sin addParam: código: randomString(16, token)\\naddResult(token) test_inputs: {} test_list: ["re.match(r'^[a-zA-Z0-9]{16}$', token)"] FORMATO DE test_list — MUY IMPORTANTE: Cada aserción debe ser una expresión Python con re.match() evaluable directamente sobre las variables del stack AVAP (disponibles como variables Python locales). El módulo 're' está siempre disponible. La expresión debe devolver un match object (truthy) si el test pasa. Reglas estrictas: - USA ÚNICAMENTE re.match(r'', ) - NO combines expresiones re.match en una aserción, cada asercion tiene que ser un unico re.match(r'', ) - Convierte a string si es necesario: re.match(r'^\\d+$', str(result)) - Puedes encadenar con 'and': re.match(r'^[a-zA-Z0-9]{32}$', token) and re.match(r'.{32}', token) - Las variables referenciadas deben existir en el stack tras ejecutar el código. - NUNCA uses comparaciones directas (==, !=, >, <). - NUNCA uses isinstance(), len(), assert, ni texto descriptivo. - NUNCA uses nada que no sea re.match(). Ejemplos correctos de test_list: "re.match(r'^[a-f0-9]{64}$', hashed)" "re.match(r'^[a-zA-Z0-9]{32}$', token)" "re.match(r'^\\d{4}-\\d{2}-\\d{2}$', date_str)" "re.match(r'^-?\\d+(\\.\\d+)?$', str(result))" "re.match(r'^(par|impar)$', result)" "re.match(r'^40[134]$', str(_status))" "re.match(r'^\\d+$', str(length))" """ app = typer.Typer() class Provider(str, Enum): bedrock = "bedrock" openai = "openai" ollama = "ollama" class Dataset(str, Enum): mbpp = "mbpp" human_eval = "openai_humaneval" def build_generation_prompt(lrm: str, num_problems: int, problems_per_category: int = 10) -> str: """Build user prompt for generating new problems from scratch.""" return f"""# LRM AVAP — Language Reference Manual {lrm} --- # TAREA Genera exactamente {num_problems} problemas de benchmark MBPP-AVAP nuevos. Requisitos: - Los task_id deben comenzar en 1 y ser consecutivos. - Distribuye los problemas entre diferentes categorías de funcionalidad AVAP: * HTTP params / addParam / addResult / _status * Variables y strings / addVar / replace / randomString * Condicionales / if() Modo 1 y Modo 2 / else() / end() * Bucles y listas / startLoop / itemFromList / getListLen * JSON / variableFromJSON / AddVariableToJSON * ORM / ormAccessSelect / ormAccessInsert / ormAccessUpdate * Criptografía / encodeSHA256 / encodeMD5 * Fechas / getTimeStamp / getDateTime / stampToDatetime * Conectores externos / avapConnector + métodos dinámicos * Concurrencia / go + gather * Funciones y scope / function / return() * Manejo de errores / try() / exception() * HTTP externo / RequestGet / RequestPost * Modularidad / import / include + casos de uso complejos - Cada problema debe cubrir un aspecto distinto de AVAP. - Dificultad variada: algunos simples, algunos intermedios, alguno avanzado. - El código debe ser realista como endpoint de microservicio HTTP en AVAP. - Incluye 2-3 aserciones descriptivas en test_list por problema. Responde ÚNICAMENTE con el array JSON. Sin texto antes ni después. """ def parse_response(raw: str) -> list[dict]: """Parse LLM response as JSON, handling markdown code blocks.""" text = raw.strip() if text.startswith("```"): lines = text.splitlines() inner = lines[1:] if inner and inner[-1].strip() == "```": inner = inner[:-1] text = "\n".join(inner).strip() problems = json.loads(text) if not isinstance(problems, list): raise ValueError("response is not a JSON array") for p in problems: for field in ("task_id", "text", "code", "test_list"): if field not in p: raise ValueError(f"Field missing '{field}' in task_id={p.get('task_id','?')}.") if "test_inputs" not in p: p["test_inputs"] = {} if not isinstance(p["test_inputs"], dict): raise ValueError(f"'test_inputs' must be a JSON Object (task_id={p.get('task_id','?')}).") return problems @app.command() def generate_synthetic_dataset( provider: Provider = Provider.bedrock, model: str = "global.anthropic.claude-sonnet-4-6", temperature: float = 0.0, num_samples: int = 30, seed: int = 67, context_docs_path: str = "docs/LRM/avap.md", synthetic_output_path: str = "synthetic_datasets", dataset: Optional[Dataset] = None, problems_per_category: int = 10, ) -> None: """ Generate synthetic AVAP dataset. Modes: - With --dataset {mbpp|human_eval}: Translate existing dataset to AVAP - Without --dataset: Generate new problems from scratch using the prompt """ logger.info("🚀 Starting synthetic dataset generation pipeline") logger.info( f"Configuration - Provider: {provider}, Model: {model}, Temperature: {temperature}, " f"Samples: {num_samples}, Seed: {seed}, Dataset: {dataset or 'generation mode'}" ) config = Config( connect_timeout=10, read_timeout=600, ) client = boto3.client("bedrock-runtime", config=config) logger.info("✓ Bedrock client initialized successfully") # Create LLM instance with specified parameters logger.debug(f"Creating LLM instance with provider: {provider}") llm = create_chat_model( provider=provider, client=client, model=model, temperature=temperature, ) logger.info(f"✓ LLM initialized: {model}") # Load AVAP documentation logger.debug(f"Loading AVAP documentation from {context_docs_path}") with open(context_docs_path, "r") as f: avap_docs = f.read() logger.info(f"✓ AVAP documentation loaded ({len(avap_docs)} characters)") # Choose mode: translation or generation if dataset: logger.info(f"🔄 Translation mode: Converting {dataset.value} dataset to AVAP") _generate_from_dataset( llm=llm, avap_docs=avap_docs, dataset_name=dataset.value, num_samples=num_samples, seed=seed, output_path=synthetic_output_path, provider=provider.value, ) else: logger.info("✨ Generation mode: Creating new problems from prompt") _generate_from_prompt( llm=llm, avap_docs=avap_docs, num_samples=num_samples, output_path=synthetic_output_path, provider=provider.value, problems_per_category=problems_per_category, ) def _generate_from_dataset( llm, avap_docs: str, dataset_name: str, num_samples: int, seed: int, output_path: str, provider: str, ) -> None: """Generate by translating an existing dataset to AVAP.""" # Load dataset logger.debug(f"Loading {dataset_name} dataset") dataset_full = load_dataset(dataset_name) logger.info(f"✓ {dataset_name} dataset loaded successfully") # Select random test samples logger.debug(f"Selecting {num_samples} random samples from {dataset_name}") random_test_samples = dataset_full["test"].shuffle(seed=seed).select( range(min(num_samples, len(dataset_full["test"]))) ) logger.info(f"✓ Selected {len(random_test_samples)} samples") # Prepare samples dictionary logger.debug("Preparing samples dictionary") if dataset_name == "mbpp": test_samples_dict = { str(i): {"text": sample["text"], "code": sample["code"]} for i, sample in enumerate(random_test_samples) } prompt_func = get_prompt_mbpp(avap_docs=avap_docs) output_suffix = "mbpp" elif dataset_name == "openai_humaneval": test_samples_dict = { str(sample["task_id"]): { "task_id": sample["task_id"], "prompt": sample["prompt"], "canonical_solution": sample["canonical_solution"], "test": sample["test"], "entry_point": sample["entry_point"], } for i, sample in enumerate(random_test_samples) } prompt_func = get_prompt_human_eval(avap_docs=avap_docs) output_suffix = "human_eval" else: raise ValueError(f"Unsupported dataset: {dataset_name}") logger.info(f"✓ Prepared {len(test_samples_dict)} samples for processing") # Generate prompt logger.debug("Generating prompt with AVAP context") logger.debug("✓ Prompt generated successfully") # Invoke LLM logger.info("Invoking LLM to generate synthetic dataset...") llm_response = llm.invoke( [prompt_func, HumanMessage(content=str(test_samples_dict))] ) logger.info("✓ LLM response received") logger.debug(f"LLM Response: {llm_response.content}") # Parse JSON response logger.debug("Parsing LLM response as JSON") json_str = ( llm_response.content.removeprefix("```json") .removesuffix("```") .strip() ) logger.debug(f"JSON string: {json_str}") synthetic_data = json.loads(json_str) logger.info( f"✓ Successfully parsed synthetic data with {len(synthetic_data)} samples" ) # Save output output_dir = Path(output_path) output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f"synthetic_data_{output_suffix}_{provider}.json" with output_file.open("w", encoding="utf-8") as f: json.dump(synthetic_data, f, ensure_ascii=False, indent=2) logger.info(f"✓ Output saved to {output_file}") logger.info( f"Pipeline completed successfully! Generated {len(synthetic_data)} synthetic samples" ) def _generate_from_prompt( llm, avap_docs: str, num_samples: int, output_path: str, provider: str, problems_per_category: int = 10, ) -> None: """Generate new problems from scratch using the generation prompt. Uses the same system prompt as generate_mbap.py to ensure consistency, but invokes via the LLM factory method to support Bedrock and other providers. """ logger.debug("Generating prompt for problem generation") user_prompt = build_generation_prompt( lrm=avap_docs, num_problems=num_samples, problems_per_category=problems_per_category, ) logger.debug("✓ Prompt generated successfully") # Invoke LLM with SystemMessage and HumanMessage (compatible with LangChain factory) logger.info("Invoking LLM to generate new problems...") llm_response = llm.invoke( [SystemMessage(content=SYSTEM_PROMPT), HumanMessage(content=user_prompt)] ) logger.info("✓ LLM response received") logger.debug(f"LLM Response: {llm_response.content}") # Parse JSON response logger.debug("Parsing LLM response as JSON") try: synthetic_data = parse_response(llm_response.content) logger.info( f"✓ Successfully parsed synthetic data with {len(synthetic_data)} samples" ) except (json.JSONDecodeError, ValueError) as e: logger.error(f"Failed to parse LLM response: {e}") raise # Save output output_dir = Path(output_path) output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f"synthetic_data_generated_{provider}.json" with output_file.open("w", encoding="utf-8") as f: json.dump(synthetic_data, f, ensure_ascii=False, indent=2) logger.info(f"✓ Output saved to {output_file}") logger.info( f"Pipeline completed successfully! Generated {len(synthetic_data)} synthetic samples" ) if __name__ == "__main__": logger.info("=" * 50) logger.info("Synthetic Dataset Generation Pipeline") logger.info("=" * 50) app()