diff --git a/Docker/requirements.txt b/Docker/requirements.txt index cbbd2bf..78c9ce1 100644 --- a/Docker/requirements.txt +++ b/Docker/requirements.txt @@ -26,6 +26,10 @@ certifi==2026.1.4 # requests charset-normalizer==3.4.4 # via requests +chonkie==1.5.6 + # via assistance-engine +chonkie-core==0.9.2 + # via chonkie click==8.3.1 # via nltk colorama==0.4.6 ; sys_platform == 'win32' @@ -81,12 +85,16 @@ idna==3.11 # httpx # requests # yarl +jinja2==3.1.6 + # via model2vec jmespath==1.1.0 # via # boto3 # botocore joblib==1.5.3 - # via nltk + # via + # model2vec + # nltk jsonpatch==1.33 # via langchain-core jsonpointer==3.0.0 @@ -137,8 +145,16 @@ langsmith==0.7.6 # langchain-core loguru==0.7.3 # via assistance-engine +markdown-it-py==4.0.0 + # via rich +markupsafe==3.0.3 + # via jinja2 marshmallow==3.26.2 # via dataclasses-json +mdurl==0.1.2 + # via markdown-it-py +model2vec==0.7.0 + # via chonkie multidict==6.7.1 # via # aiohttp @@ -150,9 +166,12 @@ nltk==3.9.3 numpy==2.4.2 # via # assistance-engine + # chonkie + # chonkie-core # elasticsearch # langchain-aws # langchain-community + # model2vec # pandas ollama==0.6.1 # via langchain-ollama @@ -192,6 +211,8 @@ pydantic-core==2.41.5 # via pydantic pydantic-settings==2.13.1 # via langchain-community +pygments==2.19.2 + # via rich python-dateutil==2.9.0.post0 # via # botocore @@ -220,10 +241,16 @@ requests==2.32.5 # requests-toolbelt requests-toolbelt==1.0.0 # via langsmith +rich==14.3.3 + # via model2vec s3transfer==0.16.0 # via boto3 +safetensors==0.7.0 + # via model2vec setuptools==82.0.0 - # via grpcio-tools + # via + # grpcio-tools + # model2vec simsimd==6.5.13 # via elasticsearch six==1.17.0 @@ -234,14 +261,20 @@ sqlalchemy==2.0.46 # langchain-community tenacity==9.1.4 # via + # chonkie # langchain-community # langchain-core tokenizers==0.22.2 - # via langchain-huggingface + # via + # chonkie + # langchain-huggingface + # model2vec tqdm==4.67.3 # via # assistance-engine + # chonkie # huggingface-hub + # model2vec # nltk typing-extensions==4.15.0 # via diff --git a/README.md b/README.md index 35f6ab1..2f4d351 100644 --- a/README.md +++ b/README.md @@ -47,26 +47,26 @@ graph TD ├── changelog # Version tracking and release history ├── pyproject.toml # Python project configuration ├── Docker/ +│ ├── protos/ +│ │ └── brunix.proto # Protocol Buffers: The source of truth for the API +│ ├── src/ +│ │ ├── graph.py # Workflow graph orchestration +│ │ ├── prompts.py # Centralized prompt definitions +│ │ ├── server.py # gRPC Server & RAG Orchestration +│ │ ├── state.py # Shared state management +│ │ └── utils/ # Utility modules │ ├── Dockerfile # Container definition for the Engine │ ├── docker-compose.yaml # Local orchestration for dev environment -│ ├── requirements.txt # Python dependencies for Docker -│ ├── protos/ -│ │ └── brunix.proto # Protocol Buffers: The source of truth for the API -│ └── src/ -│ ├── graph.py # Workflow graph orchestration -│ ├── prompts.py # Centralized prompt definitions -│ ├── server.py # gRPC Server & RAG Orchestration -│ ├── state.py # Shared state management -│ └── utils/ # Utility modules -├── ingestion/ -│ └── docs/ # AVAP documentation chunks +│ ├── .dockerignore # Files to ignore by Docker +│ └── requirements.txt # Python dependencies for Docker +├── docs/ # AVAP documentation ├── kubernetes/ │ └── kubeconfig.yaml # Kubernetes cluster configuration -├── scripts/ -│ └── pipelines/ -│ └── flows/ # Data processing flows -└── src/ - └── server.py # Core Logic: gRPC Server & RAG Orchestration +└── scripts/ + └── pipelines/ + ├── flows/ # Data processing flows + ├── tasks/ # Function modules used by the flows + └── input/ # Inputs used by the flows ``` --- @@ -136,6 +136,7 @@ OLLAMA_URL=http://host.docker.internal:11434 OLLAMA_LOCAL_URL=http://localhost:11434 OLLAMA_MODEL_NAME=qwen2.5:1.5b OLLAMA_EMB_MODEL_NAME=qwen3-0.6B-emb:latest +HF_EMB_MODEL_NAME=Qwen/Qwen3-Embedding-0.6B ``` | Variable | Required | Description | Example | @@ -151,6 +152,8 @@ OLLAMA_EMB_MODEL_NAME=qwen3-0.6B-emb:latest | `OLLAMA_LOCAL_URL` | Yes | Ollama endpoint used for text generation/embeddings in local | `http://localhost:11434` | | `OLLAMA_MODEL_NAME` | Yes | Ollama model name for generation | `qwen2.5:1.5b` | | `OLLAMA_EMB_MODEL_NAME` | Yes | Ollama embeddings model name | `qwen3-0.6B-emb:latest` | +| `HF_TOKEN` | Yes | Hugginface secret token | `hf_...` | +| `HF_EMB_MODEL_NAME` | Yes | Hugginface embeddings model name | `Qwen/Qwen3-Embedding-0.6B` | > Never commit real secret values. Use placeholder values when sharing configuration examples. diff --git a/changelog b/changelog index f060d26..334699a 100644 --- a/changelog +++ b/changelog @@ -4,16 +4,36 @@ All notable changes to the **Brunix Assistance Engine** will be documented in th --- +## [1.4.0] - 2026-03-11 + +### Added +- IMPLEMENTED: + - `scripts/pipelines/flows/translate_mbpp.py`: pipeline to generate synthethic dataset from mbpp dataset. + - `scripts/input/prompts.py`: module containing prompts for pipelines. + - `scripts/tasks/chunk.py`: module containing functions related to chunk management. + - `synthethic_datasets`: folder containing generated synthethic datasets. + - `src/config.py`: environment variables configuration file. + +### Changed +- REFACTORED: `scripts/pipelines/flows/elasticsearch_ingestion.py` now uses `docs` documents instead of pre chunked files. +- RENAMED `docs/AVAP Language: Core Commands & Functional Specification` to `docs/avap_language_github_docs`. +- REMOVED: `Makefile` file. +- REMOVED: `scripts/start-tunnels.sh` script. +- REMOVED `ingestion` folder. +- DEPENDENCIES: `requirements.txt` updated with new libraries required by the new modules. + + + ## [1.3.0] - 2026-03-05 ### Added - IMPLEMENTED: - - `src/utils/emb_factory`: factory modules created for embedding model generation. - - `src/utils/llm_factory`: factory modules created for LLM generation. - - `src/graph.py`: workflow graph orchestration module added. - - `src/prompts.py`: centralized prompt definitions added. - - `src/state.py`: shared state management module added. - - `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. + - `Docker/src/utils/emb_factory`: factory modules created for embedding model generation. + - `Docker/src/utils/llm_factory`: factory modules created for LLM generation. + - `Docker/src/graph.py`: workflow graph orchestration module added. + - `Docker/src/prompts.py`: centralized prompt definitions added. + - `Docker/src/state.py`: shared state management module added. + - `scripts/pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. - `ingestion/docs`: folder containing all chunked AVAP documents. ### Changed diff --git a/scratches/acano/elasticsearch_ingestion.py b/scratches/acano/elasticsearch_ingestion.py new file mode 100644 index 0000000..a961302 --- /dev/null +++ b/scratches/acano/elasticsearch_ingestion.py @@ -0,0 +1,129 @@ +import re +import hashlib +from typing import Any +from enum import Enum +import typer +import logging +import os +from pathlib import Path + +from loguru import logger +from elasticsearch import Elasticsearch +from langchain_core.documents import Document +from langchain_elasticsearch import ElasticsearchStore +from langchain_community.embeddings import HuggingFaceEmbeddings +from langchain_experimental.text_splitter import SemanticChunker + +from src.utils.emb_factory import create_embedding_model +from scripts.pipelines.tasks.chunk import scrape_avap_docs + +app = typer.Typer() + +ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") +OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") +ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") +OLLAMA_URL = os.getenv("OLLAMA_URL") +OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") +AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL") + +class DistanceStrategy(str, Enum): + euclidean = "EUCLIDEAN_DISTANCE" + max_inner_product = "MAX_INNER_PRODUCT" + dot_product = "DOT_PRODUCT" + jaccard = "JACCARD" + cosine = "COSINE" + +def clean_text(text: str) -> str: + text = text.replace("\u00a0", " ") + text = re.sub(r"\s+", " ", text).strip() + return text + +def build_documents_from_folder( + folder_path: str, +) -> list[Document]: + + folder = Path(folder_path) + + if not folder.exists() or not folder.is_dir(): + raise ValueError(f"Invalid folder path: {folder_path}") + + all_documents: list[Document] = [] + + for file_path in folder.glob("*.txt"): + doc_text = file_path.read_text(encoding="utf-8") + + if not doc_text.strip(): + continue + + metadata: dict[str, Any] = { + "source": file_path.name, + } + + doc_text = clean_text(doc_text) + document = Document( + id=hashlib.md5(file_path.name.encode()).hexdigest(), + page_content=doc_text, + metadata={**metadata} + ) + + all_documents.append(document) + + return all_documents + + +@app.command() +def elasticsearch_ingestion( + docs_folder_path: str = "ingestion/docs", + es_request_timeout: int = 120, + es_max_retries: int = 5, + es_retry_on_timeout: bool = True, + distance_strategy: DistanceStrategy = DistanceStrategy.cosine, +): + logger.info("Starting Elasticsearch ingestion pipeline...") + logger.info(f"Using docs folder path: {docs_folder_path}") + documents = build_documents_from_folder(folder_path=docs_folder_path) + + logger.info("Connecting to Elasticsearch...") + try: + es = Elasticsearch( + ELASTICSEARCH_LOCAL_URL, + request_timeout=es_request_timeout, + max_retries=es_max_retries, + retry_on_timeout=es_retry_on_timeout, + ) + except: + logger.exception("Failed to connect to Elasticsearch.") + raise + + logger.info("Instantiating embeddings model...") + try: + embeddings = create_embedding_model( + provider="ollama", + model=OLLAMA_EMB_MODEL_NAME, + base_url=OLLAMA_LOCAL_URL, + ) + except: + logger.exception("Failed to instantiate embeddings model.") + raise + + logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") + ElasticsearchStore.from_documents( + documents, + embeddings, + client=es, + index_name=ELASTICSEARCH_INDEX, + distance_strategy=distance_strategy.value, + ) + logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + ) + try: + app() + except Exception as exc: + logger.exception(exc) + raise diff --git a/scripts/pipelines/flows/test_chunker.ipynb b/scratches/acano/test_chunker.ipynb similarity index 70% rename from scripts/pipelines/flows/test_chunker.ipynb rename to scratches/acano/test_chunker.ipynb index addcf1e..ca7fb4a 100644 --- a/scripts/pipelines/flows/test_chunker.ipynb +++ b/scratches/acano/test_chunker.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "f8ea7a75", "metadata": {}, "outputs": [], @@ -24,7 +24,7 @@ "from chonkie import SemanticChunker\n", "\n", "from src.utils.emb_factory import create_embedding_model\n", - "from scripts.pipelines.tasks.chunk import read_concat_files, get_chunk_docs, chunks_to_document\n", + "from scripts.pipelines.tasks.chunk import read_files, get_chunk_docs, convert_chunks_to_document\n", "from src.config import PROJ_ROOT\n", "\n", "ELASTICSEARCH_LOCAL_URL = os.getenv(\"ELASTICSEARCH_LOCAL_URL\")\n", @@ -37,7 +37,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "a8b8de3f", "metadata": {}, "outputs": [ @@ -45,8 +45,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "\u001b[32m2026-03-10 13:58:32.657\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m3\u001b[0m - \u001b[1mStarting Elasticsearch ingestion pipeline...\u001b[0m\n", - "\u001b[32m2026-03-10 13:58:32.658\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m4\u001b[0m - \u001b[1mReading and concatenating files from folder: docs/developer.avapframework.com\u001b[0m\n" + "\u001b[32m2026-03-10 15:15:53.994\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m3\u001b[0m - \u001b[1mStarting Elasticsearch ingestion pipeline...\u001b[0m\n", + "\u001b[32m2026-03-10 15:15:53.996\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m4\u001b[0m - \u001b[1mReading and concatenating files from folder: docs/developer.avapframework.com\u001b[0m\n" ] }, { @@ -55,7 +55,7 @@ "text": [ "1\n", "1\n", - "14\n", + "0\n", "24\n" ] } @@ -65,8 +65,8 @@ "\n", "logger.info(\"Starting Elasticsearch ingestion pipeline...\")\n", "logger.info(f\"Reading and concatenating files from folder: {docs_folder_path}/developer.avapframework.com\")\n", - "avap_github_docs = read_concat_files(PROJ_ROOT / f\"{docs_folder_path}/avap_language_github_docs\", \"AVAP\", concatenate=False)\n", - "avap_web_docs_intro = read_concat_files(PROJ_ROOT / f\"{docs_folder_path}/developer.avapframework.com\", \"intro\", concatenate=True)\n", + "avap_github_docs = read_files(PROJ_ROOT / f\"{docs_folder_path}/avap_language_github_docs\", \"AVAP\", concatenate=False)\n", + "avap_web_docs_intro = read_files(PROJ_ROOT / f\"{docs_folder_path}/developer.avapframework.com\", \"intro\", concatenate=True)\n", "\n", "# Check chapters in developer.avapframework.com folder and read and concatenate files for each chapter\n", "chapters = sorted({\n", @@ -77,14 +77,14 @@ "avap_web_docs_chapters = [\n", " item\n", " for chapter in chapters\n", - " for item in read_concat_files(\n", + " for item in read_files(\n", " f\"{docs_folder_path}/developer.avapframework.com\",\n", " f\"{chapter}_\",\n", " concatenate=True\n", " )\n", "]\n", - "avap_web_docs_appendices = read_concat_files(PROJ_ROOT / f\"{docs_folder_path}/developer.avapframework.com\", \"appendices_\", concatenate=False)\n", - "avap_examples_docs = read_concat_files(PROJ_ROOT / f\"{docs_folder_path}/samples\", concatenate=False)\n", + "avap_web_docs_appendices = read_files(PROJ_ROOT / f\"{docs_folder_path}/developer.avapframework.com\", \"appendices_\", concatenate=False)\n", + "avap_examples_docs = read_files(PROJ_ROOT / f\"{docs_folder_path}/samples\", concatenate=False)\n", "\n", "print(len(avap_github_docs))\n", "print(len(avap_web_docs_intro))\n", @@ -94,7 +94,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 3, "id": "36abc025", "metadata": {}, "outputs": [ @@ -167,7 +167,7 @@ " 'title': 'validacion_in_pertenece_a_lista.avap'}]" ] }, - "execution_count": 12, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } @@ -194,7 +194,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 4, "id": "27e5774d", "metadata": {}, "outputs": [ @@ -202,7 +202,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "\u001b[32m2026-03-10 13:58:34.531\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m1\u001b[0m - \u001b[1mChunking documents...\u001b[0m\n" + "\u001b[32m2026-03-10 15:15:54.053\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m1\u001b[0m - \u001b[1mChunking documents...\u001b[0m\n" ] } ], @@ -218,7 +218,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 5, "id": "a5ce984e", "metadata": {}, "outputs": [ @@ -226,39 +226,25 @@ "name": "stderr", "output_type": "stream", "text": [ - "\u001b[32m2026-03-10 13:58:51.740\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m1\u001b[0m - \u001b[1mChunking AVAP GitHub docs...\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:00.535\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking AVAP.md\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:00.536\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m4\u001b[0m - \u001b[1mChunking AVAP web docs chapters...\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:09.128\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter1_\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:12.763\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter2_\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:42.995\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter3_\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:48.772\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter4_\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:48.772\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter5_\u001b[0m\n", - "\u001b[32m2026-03-10 14:01:48.773\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter6_\u001b[0m\n", - "\u001b[32m2026-03-10 14:02:06.408\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter7_\u001b[0m\n", - "\u001b[32m2026-03-10 14:02:21.501\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter8_\u001b[0m\n", - "\u001b[32m2026-03-10 14:07:27.158\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter9_\u001b[0m\n", - "\u001b[32m2026-03-10 14:07:48.389\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter10_\u001b[0m\n", - "\u001b[32m2026-03-10 14:08:10.823\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter11_\u001b[0m\n", - "\u001b[32m2026-03-10 14:08:27.335\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter12_\u001b[0m\n", - "\u001b[32m2026-03-10 14:08:55.010\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter13_\u001b[0m\n", - "\u001b[32m2026-03-10 14:09:10.211\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mchunk_docs\u001b[0m:\u001b[36m181\u001b[0m - \u001b[1mFinished chunking chapter14_\u001b[0m\n", - "\u001b[32m2026-03-10 14:09:10.211\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m7\u001b[0m - \u001b[1mCreating Langchain Document to index...\u001b[0m\n" + "\u001b[32m2026-03-10 15:16:04.305\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m1\u001b[0m - \u001b[1mChunking AVAP GitHub docs...\u001b[0m\n", + "\u001b[32m2026-03-10 15:20:42.896\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mscripts.pipelines.tasks.chunk\u001b[0m:\u001b[36mget_chunk_docs\u001b[0m:\u001b[36m102\u001b[0m - \u001b[1mFinished chunking AVAP.md\u001b[0m\n", + "\u001b[32m2026-03-10 15:20:42.897\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m4\u001b[0m - \u001b[1mChunking AVAP web docs chapters...\u001b[0m\n", + "\u001b[32m2026-03-10 15:20:42.897\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36m__main__\u001b[0m:\u001b[36m\u001b[0m:\u001b[36m7\u001b[0m - \u001b[1mCreating Langchain Document to index...\u001b[0m\n" ] } ], "source": [ "logger.info(\"Chunking AVAP GitHub docs...\")\n", - "avap_github_docs_chunks = chunk_docs(avap_github_docs, chunker)\n", + "avap_github_docs_chunks = get_chunk_docs(avap_github_docs, chunker)\n", "\n", "logger.info(\"Chunking AVAP web docs chapters...\")\n", - "avap_web_docs_chapters_chunks = chunk_docs(avap_web_docs_chapters, chunker)\n", + "# avap_web_docs_chapters_chunks = get_chunk_docs(avap_web_docs_chapters, chunker)\n", "\n", "logger.info(\"Creating Langchain Document to index...\")\n", - "avap_github_langchain_docs = chunks_to_document(avap_github_docs_chunks)\n", - "avap_web_chapters_langchain_docs = chunks_to_document(avap_web_docs_chapters_chunks)\n", - "avap_web_intro_langchain_docs = chunks_to_document(avap_web_docs_intro)\n", - "avap_web_appendices_langchain_docs = chunks_to_document(avap_web_docs_appendices)" + "avap_github_langchain_docs = convert_chunks_to_document(avap_github_docs_chunks)\n", + "# avap_web_chapters_langchain_docs = convert_chunks_to_document(avap_web_docs_chapters_chunks)\n", + "avap_web_intro_langchain_docs = convert_chunks_to_document(avap_web_docs_intro)\n", + "avap_web_appendices_langchain_docs = convert_chunks_to_document(avap_web_docs_appendices)" ] }, { diff --git a/scripts/pipelines/flows/elasticsearch_ingestion.py b/scripts/pipelines/flows/elasticsearch_ingestion.py index a961302..b27b61a 100644 --- a/scripts/pipelines/flows/elasticsearch_ingestion.py +++ b/scripts/pipelines/flows/elasticsearch_ingestion.py @@ -1,6 +1,3 @@ -import re -import hashlib -from typing import Any from enum import Enum import typer import logging @@ -9,13 +6,15 @@ from pathlib import Path from loguru import logger from elasticsearch import Elasticsearch -from langchain_core.documents import Document from langchain_elasticsearch import ElasticsearchStore -from langchain_community.embeddings import HuggingFaceEmbeddings -from langchain_experimental.text_splitter import SemanticChunker +from chonkie import SemanticChunker from src.utils.emb_factory import create_embedding_model -from scripts.pipelines.tasks.chunk import scrape_avap_docs +from scripts.pipelines.tasks.chunk import ( + read_files, + get_chunk_docs, + convert_chunks_to_document +) app = typer.Typer() @@ -25,6 +24,7 @@ ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL") +HF_EMB_MODEL_NAME = os.getenv("HF_EMB_MODEL_NAME") class DistanceStrategy(str, Enum): euclidean = "EUCLIDEAN_DISTANCE" @@ -33,55 +33,65 @@ class DistanceStrategy(str, Enum): jaccard = "JACCARD" cosine = "COSINE" -def clean_text(text: str) -> str: - text = text.replace("\u00a0", " ") - text = re.sub(r"\s+", " ", text).strip() - return text - -def build_documents_from_folder( - folder_path: str, -) -> list[Document]: - - folder = Path(folder_path) - - if not folder.exists() or not folder.is_dir(): - raise ValueError(f"Invalid folder path: {folder_path}") - - all_documents: list[Document] = [] - - for file_path in folder.glob("*.txt"): - doc_text = file_path.read_text(encoding="utf-8") - - if not doc_text.strip(): - continue - - metadata: dict[str, Any] = { - "source": file_path.name, - } - - doc_text = clean_text(doc_text) - document = Document( - id=hashlib.md5(file_path.name.encode()).hexdigest(), - page_content=doc_text, - metadata={**metadata} - ) - - all_documents.append(document) - - return all_documents - @app.command() def elasticsearch_ingestion( - docs_folder_path: str = "ingestion/docs", + docs_folder_path: str = "docs", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, distance_strategy: DistanceStrategy = DistanceStrategy.cosine, -): + chunk_size: int = 2048, + chunk_threshold: float = 0.5, + chunk_similarity_window: int = 3, + chunk_skip_window: int = 1, +): logger.info("Starting Elasticsearch ingestion pipeline...") - logger.info(f"Using docs folder path: {docs_folder_path}") - documents = build_documents_from_folder(folder_path=docs_folder_path) + logger.info(f"Reading and concatenating files from folder: {docs_folder_path}/developer.avapframework.com") + avap_github_docs = read_files(f"{docs_folder_path}/avap_language_github_docs", concatenate=False) + avap_web_docs_intro = read_files(f"{docs_folder_path}/developer.avapframework.com", "intro", concatenate=True) + + # Check chapters in developer.avapframework.com folder and read and concatenate files for each chapter + chapters = sorted({ + p.name.split("_")[0] + for p in Path(f"{docs_folder_path}/developer.avapframework.com").glob("chapter*.md") + }) + + avap_web_docs_chapters = [ + item + for chapter in chapters + for item in read_files( + f"{docs_folder_path}/developer.avapframework.com", + f"{chapter}_", + concatenate=True + ) + ] + + avap_web_docs_appendices = read_files(f"{docs_folder_path}/developer.avapframework.com", "appendices_", concatenate=False) + avap_samples_docs = read_files(f"{docs_folder_path}/samples", concatenate=False) + + logger.info("Instantiating semantic chunker...") + chunker = SemanticChunker( + embedding_model=HF_EMB_MODEL_NAME, + chunk_size=chunk_size, + threshold=chunk_threshold, + similarity_window=chunk_similarity_window, + skip_window=chunk_skip_window + ) + + logger.info("Chunking AVAP GitHub docs...") + avap_github_docs_chunks = get_chunk_docs(avap_github_docs, chunker) + + logger.info("Chunking AVAP web docs chapters...") + avap_web_docs_chapters_chunks = get_chunk_docs(avap_web_docs_chapters, chunker) + + logger.info("Creating Langchain Document to index...") + avap_github_langchain_docs = convert_chunks_to_document(avap_github_docs_chunks) + avap_web_chapters_langchain_docs = convert_chunks_to_document(avap_web_docs_chapters_chunks) + avap_web_intro_langchain_docs = convert_chunks_to_document(avap_web_docs_intro) + avap_web_appendices_langchain_docs = convert_chunks_to_document(avap_web_docs_appendices) + avap_samples_langchain_docs = convert_chunks_to_document(avap_samples_docs) + avap_documents = avap_github_langchain_docs + avap_web_chapters_langchain_docs + avap_web_intro_langchain_docs + avap_web_appendices_langchain_docs + avap_samples_langchain_docs logger.info("Connecting to Elasticsearch...") try: @@ -105,10 +115,14 @@ def elasticsearch_ingestion( except: logger.exception("Failed to instantiate embeddings model.") raise + + logger.info(f"Checking if index {ELASTICSEARCH_INDEX} exists and deleting if it does...") + if es.indices.exists(index=ELASTICSEARCH_INDEX): + es.indices.delete(index=ELASTICSEARCH_INDEX) logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") ElasticsearchStore.from_documents( - documents, + avap_documents, embeddings, client=es, index_name=ELASTICSEARCH_INDEX, diff --git a/scripts/pipelines/flows/elasticsearch_ingestion_from_docs.py b/scripts/pipelines/flows/elasticsearch_ingestion_from_docs.py deleted file mode 100644 index b27b61a..0000000 --- a/scripts/pipelines/flows/elasticsearch_ingestion_from_docs.py +++ /dev/null @@ -1,143 +0,0 @@ -from enum import Enum -import typer -import logging -import os -from pathlib import Path - -from loguru import logger -from elasticsearch import Elasticsearch -from langchain_elasticsearch import ElasticsearchStore -from chonkie import SemanticChunker - -from src.utils.emb_factory import create_embedding_model -from scripts.pipelines.tasks.chunk import ( - read_files, - get_chunk_docs, - convert_chunks_to_document -) - -app = typer.Typer() - -ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") -OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") -ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") -OLLAMA_URL = os.getenv("OLLAMA_URL") -OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") -AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL") -HF_EMB_MODEL_NAME = os.getenv("HF_EMB_MODEL_NAME") - -class DistanceStrategy(str, Enum): - euclidean = "EUCLIDEAN_DISTANCE" - max_inner_product = "MAX_INNER_PRODUCT" - dot_product = "DOT_PRODUCT" - jaccard = "JACCARD" - cosine = "COSINE" - - -@app.command() -def elasticsearch_ingestion( - docs_folder_path: str = "docs", - es_request_timeout: int = 120, - es_max_retries: int = 5, - es_retry_on_timeout: bool = True, - distance_strategy: DistanceStrategy = DistanceStrategy.cosine, - chunk_size: int = 2048, - chunk_threshold: float = 0.5, - chunk_similarity_window: int = 3, - chunk_skip_window: int = 1, -): - logger.info("Starting Elasticsearch ingestion pipeline...") - logger.info(f"Reading and concatenating files from folder: {docs_folder_path}/developer.avapframework.com") - avap_github_docs = read_files(f"{docs_folder_path}/avap_language_github_docs", concatenate=False) - avap_web_docs_intro = read_files(f"{docs_folder_path}/developer.avapframework.com", "intro", concatenate=True) - - # Check chapters in developer.avapframework.com folder and read and concatenate files for each chapter - chapters = sorted({ - p.name.split("_")[0] - for p in Path(f"{docs_folder_path}/developer.avapframework.com").glob("chapter*.md") - }) - - avap_web_docs_chapters = [ - item - for chapter in chapters - for item in read_files( - f"{docs_folder_path}/developer.avapframework.com", - f"{chapter}_", - concatenate=True - ) - ] - - avap_web_docs_appendices = read_files(f"{docs_folder_path}/developer.avapframework.com", "appendices_", concatenate=False) - avap_samples_docs = read_files(f"{docs_folder_path}/samples", concatenate=False) - - logger.info("Instantiating semantic chunker...") - chunker = SemanticChunker( - embedding_model=HF_EMB_MODEL_NAME, - chunk_size=chunk_size, - threshold=chunk_threshold, - similarity_window=chunk_similarity_window, - skip_window=chunk_skip_window - ) - - logger.info("Chunking AVAP GitHub docs...") - avap_github_docs_chunks = get_chunk_docs(avap_github_docs, chunker) - - logger.info("Chunking AVAP web docs chapters...") - avap_web_docs_chapters_chunks = get_chunk_docs(avap_web_docs_chapters, chunker) - - logger.info("Creating Langchain Document to index...") - avap_github_langchain_docs = convert_chunks_to_document(avap_github_docs_chunks) - avap_web_chapters_langchain_docs = convert_chunks_to_document(avap_web_docs_chapters_chunks) - avap_web_intro_langchain_docs = convert_chunks_to_document(avap_web_docs_intro) - avap_web_appendices_langchain_docs = convert_chunks_to_document(avap_web_docs_appendices) - avap_samples_langchain_docs = convert_chunks_to_document(avap_samples_docs) - avap_documents = avap_github_langchain_docs + avap_web_chapters_langchain_docs + avap_web_intro_langchain_docs + avap_web_appendices_langchain_docs + avap_samples_langchain_docs - - logger.info("Connecting to Elasticsearch...") - try: - es = Elasticsearch( - ELASTICSEARCH_LOCAL_URL, - request_timeout=es_request_timeout, - max_retries=es_max_retries, - retry_on_timeout=es_retry_on_timeout, - ) - except: - logger.exception("Failed to connect to Elasticsearch.") - raise - - logger.info("Instantiating embeddings model...") - try: - embeddings = create_embedding_model( - provider="ollama", - model=OLLAMA_EMB_MODEL_NAME, - base_url=OLLAMA_LOCAL_URL, - ) - except: - logger.exception("Failed to instantiate embeddings model.") - raise - - logger.info(f"Checking if index {ELASTICSEARCH_INDEX} exists and deleting if it does...") - if es.indices.exists(index=ELASTICSEARCH_INDEX): - es.indices.delete(index=ELASTICSEARCH_INDEX) - - logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") - ElasticsearchStore.from_documents( - avap_documents, - embeddings, - client=es, - index_name=ELASTICSEARCH_INDEX, - distance_strategy=distance_strategy.value, - ) - logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") - - -if __name__ == "__main__": - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", - ) - try: - app() - except Exception as exc: - logger.exception(exc) - raise