{ "cells": [ { "cell_type": "code", "execution_count": 39, "id": "0a8abbfa", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os\n", "import re\n", "import uuid\n", "from pathlib import Path\n", "\n", "from langchain_core.documents import Document\n", "from langchain_elasticsearch import ElasticsearchStore\n", "from langchain_ollama import OllamaEmbeddings\n", "from transformers import AutoConfig\n", "from elasticsearch import Elasticsearch\n", "import nltk\n", "\n", "from lark import Lark, Transformer\n", "from src.config import PROJ_ROOT, DATA_DIR\n", "from datetime import datetime\n", "nltk.download(\"punkt\", quiet=True)\n" ] }, { "cell_type": "code", "execution_count": 40, "id": "5c9d292b", "metadata": {}, "outputs": [], "source": [ "ELASTICSEARCH_URL = os.getenv(\"ELASTICSEARCH_LOCAL_URL\")\n", "ELASTICSEARCH_DOCS_INDEX = os.getenv(\"ELASTICSEARCH_DOCS_INDEX\")\n", "ELASTICSEARCH_CODE_INDEX = os.getenv(\"ELASTICSEARCH_CODE_INDEX\")\n", "HF_EMB_MODEL_NAME = os.getenv(\"HF_EMB_MODEL_NAME\")\n", "OLLAMA_URL = os.getenv(\"OLLAMA_URL\")\n", "OLLAMA_LOCAL_URL = os.getenv(\"OLLAMA_LOCAL_URL\")\n", "OLLAMA_MODEL_NAME = os.getenv(\"OLLAMA_MODEL_NAME\")\n", "OLLAMA_EMB_MODEL_NAME = os.getenv(\"OLLAMA_EMB_MODEL_NAME\")\n", "\n", "config = AutoConfig.from_pretrained(HF_EMB_MODEL_NAME)\n", "embedding_dim = config.hidden_size" ] }, { "cell_type": "code", "execution_count": 41, "id": "0e1cd9b9", "metadata": {}, "outputs": [], "source": [ "grammar = (DATA_DIR / \"raw\" / \"code\" / \"BNF_v3.txt\").read_text(\n", " encoding=\"utf-8\"\n", ")\n", "code = (DATA_DIR / \"raw\" / \"code\" / \"Code_Snippets_v1.txt\").read_text(\n", " encoding=\"utf-8\"\n", ")" ] }, { "cell_type": "markdown", "id": "baa779f3", "metadata": {}, "source": [ "# Functions" ] }, { "cell_type": "code", "execution_count": 42, "id": "ca43bd67", "metadata": {}, "outputs": [], "source": [ "class BnfToDict(Transformer):\n", " def NONTERMINAL(self, token):\n", " return str(token)[1:-1].strip()\n", "\n", " def QUOTED(self, token):\n", " return str(token)[1:-1]\n", "\n", " def IDENT(self, token):\n", " return str(token)\n", "\n", " def SYMBOL(self, token):\n", " return str(token)\n", "\n", " def alternative(self, items):\n", " return [str(item) for item in items]\n", "\n", " def expansion(self, items):\n", " return items\n", "\n", " def rule(self, items):\n", " lhs = items[0]\n", " rhs = items[1]\n", " return {\"lhs\": lhs, \"alternatives\": rhs}\n", "\n", " def start(self, items):\n", " return items\n", "\n", "\n", "def load_bnf_rules_for_rag(grammar, code_path: Path) -> list[dict]:\n", " parser = Lark(grammar, parser=\"lalr\")\n", " transformer = BnfToDict()\n", " rules: list[dict] = []\n", " skipped: list[str] = []\n", "\n", " for raw_line in code_path.read_text(encoding=\"utf-8\").splitlines():\n", " line = raw_line.strip()\n", " if not line or line.startswith(\"#\") or \"::=\" not in line:\n", " continue\n", " try:\n", " tree = parser.parse(line)\n", " parsed = transformer.transform(tree)\n", " rules.extend(parsed)\n", " except Exception:\n", " skipped.append(line)\n", "\n", " print(f\"Parsed rules: {len(rules)}\")\n", " print(f\"Skipped lines: {len(skipped)}\")\n", " return rules" ] }, { "cell_type": "code", "execution_count": 43, "id": "7969500e", "metadata": {}, "outputs": [], "source": [ "def bnf_rules_to_documents(rules: list[dict]) -> list[Document]:\n", " docs: list[Document] = []\n", " for rule in rules:\n", " lhs = rule[\"lhs\"]\n", " alternatives = rule[\"alternatives\"]\n", " rendered_alts = [\" \".join(alt) if alt else \"\" for alt in alternatives]\n", " content = f\"{lhs} ::= \" + \" | \".join(rendered_alts)\n", " docs.append(\n", " Document(\n", " id=str(uuid.uuid4()),\n", " page_content=content,\n", " metadata={\n", " \"source\": \"BNF.txt\",\n", " \"type\": \"grammar_rule\",\n", " \"non_terminal\": lhs,\n", " \"alternatives\": len(alternatives),\n", " },\n", " )\n", " )\n", " return docs" ] }, { "cell_type": "code", "execution_count": 44, "id": "7c67fa0b", "metadata": {}, "outputs": [], "source": [ "def extract_rag_chunks(source_code, parser):\n", " # Limpiamos comentarios y líneas de texto del ejemplo para que el parser no falle\n", " lines = [l for l in source_code.split('\\n') if not l.startswith('###') and l.strip()]\n", " \n", " chunks = []\n", " for line in lines:\n", " try:\n", " # Quitamos el número del ejemplo (ej: \"1. \")\n", " clean_line = line.split(':', 1)[-1] if ':' in line else line\n", " tree = parser.parse(clean_line.strip())\n", " \n", " chunks.append({\n", " \"code\": clean_line.strip(),\n", " \"type\": \"statement\",\n", " \"length\": len(clean_line)\n", " })\n", " except Exception as e:\n", " continue # Si la línea no cumple el BNF, la saltamos\n", " return chunks" ] }, { "cell_type": "code", "execution_count": 45, "id": "ab8c2b9b", "metadata": {}, "outputs": [], "source": [ "def extraer_codigo_puro(code, parser):\n", " # Extraemos el contenido dentro de bloques de código Markdown ``` ... ```\n", " bloques_codigo = re.findall(r'```(.*?)```', code, re.DOTALL)\n", " \n", " chunks_validados = []\n", " \n", " for bloque in bloques_codigo:\n", " lineas = bloque.strip().split('\\n')\n", " lineas_limpias = []\n", " \n", " for linea in lineas:\n", " linea = linea.strip()\n", " if not linea: continue\n", " \n", " try:\n", " # Validamos cada línea con tu BNF\n", " parser.parse(linea)\n", " lineas_limpias.append(linea)\n", " except Exception as e:\n", " print(f\"⚠️ Línea saltada (no cumple BNF): {linea}\")\n", " \n", " if lineas_limpias:\n", " chunks_validados.append(\"\\n\".join(lineas_limpias))\n", " \n", " return chunks_validados" ] }, { "cell_type": "code", "execution_count": 46, "id": "8ed54f3f", "metadata": {}, "outputs": [], "source": [ "def procesar_snippet_con_metadata(code, parser):\n", " # 1. Separar por los bloques numerados (ej: 1. **Nombre**)\n", " patron_bloque = re.compile(r'\\d+\\.\\s+\\*\\*(.*?)\\*\\*')\n", " bloques = patron_bloque.split(code)[1:] \n", " \n", " documentos_elastic = []\n", " \n", " for i in range(0, len(bloques), 2):\n", " titulo = bloques[i].strip()\n", " contenido_bruto = bloques[i+1]\n", " \n", " # 2. Extraer el código dentro de las triple comillas ```\n", " codigo_match = re.search(r'```(.*?)```', contenido_bruto, re.DOTALL)\n", " if codigo_match:\n", " codigo_bloque = codigo_match.group(1).strip()\n", " \n", " # 3. Validar con Lark cada línea\n", " lineas_validas = []\n", " for linea in codigo_bloque.split('\\n'):\n", " linea_clean = linea.strip()\n", " if linea_clean:\n", " try:\n", " parser.parse(linea_clean)\n", " lineas_validas.append(linea_clean)\n", " except:\n", " print(f\"⚠️ Error sintáctico (BNF) en: {linea_clean}\")\n", " \n", " # 4. Crear el formato DOCUMENTO para Elasticsearch\n", " if lineas_validas:\n", " codigo_final = \"\\n\".join(lineas_validas)\n", " \n", " doc = {\n", " \"_id\": str(uuid.uuid4()), # ID único para evitar colisiones\n", " \"title\": titulo,\n", " \"content\": codigo_final, # El campo principal para el RAG\n", " \"metadata\": {\n", " \"lenguaje\": \"AVAP\",\n", " \"line_count\": len(lineas_validas),\n", " \"char_count\": len(codigo_final),\n", " # \"ingested_at\": datetime.now().isoformat()\n", " }\n", " }\n", " documentos_elastic.append(doc)\n", " \n", " return documentos_elastic" ] }, { "cell_type": "code", "execution_count": 47, "id": "fb52d20b", "metadata": {}, "outputs": [], "source": [ "def procesar_snippet_a_documentos(code, parser):\n", " # 1. Separar por bloques numerados (ej: 1. **Nombre**)\n", " patron_bloque = re.compile(r'\\d+\\.\\s+\\*\\*(.*?)\\*\\*')\n", " bloques = patron_bloque.split(code)[1:] \n", " \n", " documentos_finales = [] # Aquí guardaremos objetos Document\n", " \n", " for i in range(0, len(bloques), 2):\n", " titulo = bloques[i].strip()\n", " contenido_bruto = bloques[i+1]\n", " \n", " # 2. Extraer código dentro de ``` ... ```\n", " codigo_match = re.search(r'```(.*?)```', contenido_bruto, re.DOTALL)\n", " if codigo_match:\n", " codigo_bloque = codigo_match.group(1).strip()\n", " \n", " # 3. Validar con Lark cada línea\n", " lineas_validas = []\n", " for linea in codigo_bloque.split('\\n'):\n", " linea_clean = linea.strip()\n", " if linea_clean:\n", " try:\n", " parser.parse(linea_clean)\n", " lineas_validas.append(linea_clean)\n", " except:\n", " print(f\"⚠️ Error BNF: {linea_clean}\")\n", " \n", " # 4. CREACIÓN DIRECTA DEL OBJETO DOCUMENT\n", " if lineas_validas:\n", " codigo_final = \"\\n\".join(lineas_validas)\n", " \n", " # Construimos el Documento con su ID y Metadatos integrados\n", " doc = Document(\n", " id=str(uuid.uuid4()),\n", " page_content=codigo_final,\n", " metadata={\n", " \"title\": titulo,\n", " \"language\": \"AVAP\",\n", " \"type\": \"code_snippet\",\n", " \"line_count\": len(lineas_validas)\n", " }\n", " )\n", " documentos_finales.append(doc)\n", " \n", " return documentos_finales" ] }, { "cell_type": "code", "execution_count": 48, "id": "560f9f86", "metadata": {}, "outputs": [], "source": [ "# Ensure every item is a langchain `Document` before indexing\n", "def ensure_documents(items):\n", " out: list[Document] = []\n", " for x in items:\n", " if isinstance(x, Document):\n", " out.append(x)\n", " continue\n", " if isinstance(x, dict):\n", " # 1. Extraemos el contenido buscando en todas las posibles claves de tu parser\n", " content = x.get(\"content\") or x.get(\"codigo\") or x.get(\"page_content\") or x.get(\"text\") or str(x)\n", " \n", " # 2. Buscamos el ID\n", " id_ = x.get(\"_id\") or x.get(\"id\") or str(uuid.uuid4())\n", " \n", " # 3. LÓGICA DE METADATOS: Combinamos el objeto 'metadata' con campos sueltos como 'titulo'\n", " # Empezamos con lo que haya en la clave 'metadata'\n", " metadata = x.get(\"metadata\", {}).copy()\n", " \n", " # Añadimos campos útiles que pusiste en la raíz del dict (como el título)\n", " if \"titulo\" in x:\n", " metadata[\"title\"] = x[\"titulo\"]\n", " if \"tipo_bloque\" in x:\n", " metadata[\"tipo_bloque\"] = x[\"tipo_bloque\"]\n", " \n", " # Si no hay metadatos en absoluto, ponemos una fuente por defecto\n", " if not metadata:\n", " metadata = {\"source\": \"parsed_code\"}\n", "\n", " out.append(Document(id=id_, page_content=content, metadata=metadata))\n", " continue\n", " if isinstance(x, str):\n", " out.append(Document(id=str(uuid.uuid4()), page_content=x, metadata={\"source\": \"chunked_code\"}))\n", " continue\n", " # handle objects from chunkers with `.text` attribute\n", " if hasattr(x, \"text\") or hasattr(x, \"token_count\"):\n", " txt = getattr(x, \"text\", str(x))\n", " out.append(Document(id=str(uuid.uuid4()), page_content=txt, metadata={\"source\": \"chunker\", \"token_count\": getattr(x, \"token_count\", None)}))\n", " continue\n", " # fallback: stringify the object\n", " out.append(Document(id=str(uuid.uuid4()), page_content=str(x), metadata={\"source\": \"unknown\", \"orig_type\": type(x).__name__}))\n", " return out" ] }, { "cell_type": "markdown", "id": "23a92e13", "metadata": {}, "source": [ "## BNF " ] }, { "cell_type": "code", "execution_count": 49, "id": "6e777f53", "metadata": {}, "outputs": [], "source": [ "parser = Lark(grammar)" ] }, { "cell_type": "code", "execution_count": 50, "id": "b01c3fda", "metadata": {}, "outputs": [], "source": [ "code_chunks = procesar_snippet_a_documentos(code=code, parser=parser)" ] }, { "cell_type": "markdown", "id": "77f6c552", "metadata": {}, "source": [ "## Elastic Search" ] }, { "cell_type": "code", "execution_count": 51, "id": "09ce3e29", "metadata": {}, "outputs": [], "source": [ "es = Elasticsearch(\n", " ELASTICSEARCH_URL,\n", " request_timeout=120,\n", " max_retries=5,\n", " retry_on_timeout=True,\n", ")" ] }, { "cell_type": "code", "execution_count": 52, "id": "d575c386", "metadata": {}, "outputs": [], "source": [ "if es.indices.exists(index=ELASTICSEARCH_CODE_INDEX):\n", " es.indices.delete(index=ELASTICSEARCH_CODE_INDEX)" ] }, { "cell_type": "code", "execution_count": 56, "id": "40ea0af8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "avap-code\n", "avap-docs-test\n" ] } ], "source": [ "for index in es.indices.get(index=\"*\"):\n", " print(index)" ] }, { "cell_type": "code", "execution_count": 54, "id": "4e091b39", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "OllamaEmbeddings(model='qwen3-0.6B-emb:latest', validate_model_on_init=False, base_url='http://localhost:11434', client_kwargs={}, async_client_kwargs={}, sync_client_kwargs={}, mirostat=None, mirostat_eta=None, mirostat_tau=None, num_ctx=None, num_gpu=None, keep_alive=None, num_thread=None, repeat_last_n=None, repeat_penalty=None, temperature=None, stop=None, tfs_z=None, top_k=None, top_p=None)" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "embeddings = OllamaEmbeddings(base_url=OLLAMA_LOCAL_URL, model=OLLAMA_EMB_MODEL_NAME)\n", "embeddings" ] }, { "cell_type": "code", "execution_count": 55, "id": "5aff21c0", "metadata": {}, "outputs": [], "source": [ "# index into Elasticsearch\n", "db = ElasticsearchStore.from_documents(\n", " code_chunks,\n", " embeddings,\n", " client=es,\n", " index_name=ELASTICSEARCH_CODE_INDEX,\n", " distance_strategy=\"COSINE\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "74c0a377", "metadata": {}, "outputs": [], "source": [ "response = es.search(\n", " index=ELASTICSEARCH_CODE_INDEX,\n", " body={\n", " \"query\": {\"match_all\": {}},\n", " \"size\": 10 \n", " }\n", ")\n", "\n", "for hit in response[\"hits\"][\"hits\"]:\n", " print(\"ID:\", hit[\"_id\"])\n", " print(\"Source:\", hit[\"_source\"])\n", " print(\"-\" * 40)" ] }, { "cell_type": "markdown", "id": "d823650e", "metadata": {}, "source": [ "# Retrive" ] }, { "cell_type": "code", "execution_count": null, "id": "5732a27d", "metadata": {}, "outputs": [], "source": [ "base_retriever = db.as_retriever(\n", " search_type=\"similarity\",\n", " search_kwargs={\"k\": 5}\n", " ) \n", "\n", "docs = base_retriever.invoke(\"What reserved words does AVAP have?\")\n", "docs" ] }, { "cell_type": "code", "execution_count": null, "id": "8706506f", "metadata": {}, "outputs": [], "source": [ "embeddings = OllamaEmbeddings(base_url=OLLAMA_URL, model=OLLAMA_EMB_MODEL_NAME)\n", "\n", "vector_store = ElasticsearchStore(\n", " client=es,\n", " index_name=ELASTICSEARCH_DOCS_INDEX,\n", " embedding=embeddings,\n", " query_field=\"text\",\n", " vector_query_field=\"vector\",\n", ")\n", "\n", "results = vector_store.similarity_search_with_score(\n", " query=\"What data types does AVAP have?\",\n", " k=50\n", ")\n", "\n", "results" ] } ], "metadata": { "kernelspec": { "display_name": "assistance-engine", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.11" } }, "nbformat": 4, "nbformat_minor": 5 }