From d9518682005b8a830e5bdfae1ef77190e8bd62d4 Mon Sep 17 00:00:00 2001 From: acano Date: Thu, 5 Mar 2026 16:23:27 +0100 Subject: [PATCH] refactor: Simplify Elasticsearch ingestion by removing chunk management module and integrating document building directly --- README.md | 3 +- changelog | 3 +- .../flows/elasticsearch_ingestion.py | 76 +++++++++++++++---- scripts/pipelines/tasks/chunks.py | 45 ----------- 4 files changed, 65 insertions(+), 62 deletions(-) delete mode 100644 scripts/pipelines/tasks/chunks.py diff --git a/README.md b/README.md index c49c0b5..4f7edb2 100644 --- a/README.md +++ b/README.md @@ -64,8 +64,7 @@ graph TD │ └── kubeconfig.yaml # Kubernetes cluster configuration ├── scripts/ │ └── pipelines/ -│ ├── flows/ # Data processing flows -│ └── tasks/ # Pipeline task definitions +│ └── flows/ # Data processing flows └── src/ ├── __init__.py ├── config.py # Application configuration diff --git a/changelog b/changelog index d2d9e04..f060d26 100644 --- a/changelog +++ b/changelog @@ -13,8 +13,7 @@ All notable changes to the **Brunix Assistance Engine** will be documented in th - `src/graph.py`: workflow graph orchestration module added. - `src/prompts.py`: centralized prompt definitions added. - `src/state.py`: shared state management module added. - - `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. - - `pipelines/tasks/chunks.py`: module with functions related to chunk management. + - `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. - `ingestion/docs`: folder containing all chunked AVAP documents. ### Changed diff --git a/scripts/pipelines/flows/elasticsearch_ingestion.py b/scripts/pipelines/flows/elasticsearch_ingestion.py index 6ccfda5..6227e7c 100644 --- a/scripts/pipelines/flows/elasticsearch_ingestion.py +++ b/scripts/pipelines/flows/elasticsearch_ingestion.py @@ -1,33 +1,82 @@ +import re import hashlib +from typing import Any +from enum import Enum +import typer import logging import os from pathlib import Path from elasticsearch import Elasticsearch +from langchain_core.documents import Document from langchain_elasticsearch import ElasticsearchStore -from langchain_ollama import OllamaEmbeddings -from scripts.pipelines.tasks.chunks import build_chunks_from_folder +from src.utils.emb_factory import create_embedding_model logger = logging.getLogger(__name__) +app = typer.Typer() ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") -PROJ_ROOT = Path(__file__).resolve().parents[3] + +class DistanceStrategy(str, Enum): + euclidean = "EUCLIDEAN_DISTANCE" + max_inner_product = "MAX_INNER_PRODUCT" + dot_product = "DOT_PRODUCT" + jaccard = "JACCARD" + cosine = "COSINE" + +def clean_text(text: str) -> str: + text = text.replace("\u00a0", " ") + text = re.sub(r"\s+", " ", text).strip() + return text + +def build_documents_from_folder( + folder_path: str, +) -> list[Document]: + + folder = Path(folder_path) + + if not folder.exists() or not folder.is_dir(): + raise ValueError(f"Invalid folder path: {folder_path}") + + all_documents: list[Document] = [] + + for file_path in folder.glob("*.txt"): + doc_text = file_path.read_text(encoding="utf-8") + + if not doc_text.strip(): + continue + + metadata: dict[str, Any] = { + "source": file_path.name, + } + + doc_text = clean_text(doc_text) + document = Document( + id=hashlib.md5(file_path.name.encode()).hexdigest(), + page_content=doc_text, + metadata={**metadata} + ) + + all_documents.append(document) + + return all_documents +@app.command() def elasticsearch_ingestion( - docs_folder_path: str = "ingestion", + docs_folder_path: str = "ingestion/docs", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, - distance_strategy: str = "COSINE", + distance_strategy: DistanceStrategy = DistanceStrategy.cosine, ): logger.info("Starting Elasticsearch ingestion pipeline...") - logger.info(f"Using docs folder path: {PROJ_ROOT / docs_folder_path}") - chunks = build_chunks_from_folder(folder_path=PROJ_ROOT / docs_folder_path) + logger.info(f"Using docs folder path: {docs_folder_path}") + documents = build_documents_from_folder(folder_path=docs_folder_path) logger.info("Connecting to Elasticsearch...") try: @@ -43,8 +92,10 @@ def elasticsearch_ingestion( logger.info("Instantiating embeddings model...") try: - embeddings = OllamaEmbeddings( - base_url=OLLAMA_LOCAL_URL, model=OLLAMA_EMB_MODEL_NAME + embeddings = create_embedding_model( + provider="ollama", + model=OLLAMA_EMB_MODEL_NAME, + base_url=OLLAMA_LOCAL_URL, ) except: logger.exception("Failed to instantiate embeddings model.") @@ -52,14 +103,13 @@ def elasticsearch_ingestion( logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") ElasticsearchStore.from_documents( - chunks, + documents, embeddings, client=es, index_name=ELASTICSEARCH_INDEX, - distance_strategy=distance_strategy, + distance_strategy=distance_strategy.value, ) logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") - logger.info(f"Total documents uploaded: {len(chunks)}.") if __name__ == "__main__": @@ -68,7 +118,7 @@ if __name__ == "__main__": format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) try: - elasticsearch_ingestion() + app() except Exception as exc: logger.exception(exc) raise diff --git a/scripts/pipelines/tasks/chunks.py b/scripts/pipelines/tasks/chunks.py deleted file mode 100644 index b709c55..0000000 --- a/scripts/pipelines/tasks/chunks.py +++ /dev/null @@ -1,45 +0,0 @@ -import re -import hashlib -from pathlib import Path -from typing import Any - -from langchain_core.documents import Document - - -def clean_text(text: str) -> str: - text = text.replace("\u00a0", " ") - text = re.sub(r"\s+", " ", text).strip() - return text - - -def build_chunks_from_folder( - folder_path: str, -) -> list[Document]: - - folder = Path(folder_path) - - if not folder.exists() or not folder.is_dir(): - raise ValueError(f"Invalid folder path: {folder_path}") - - all_chunks: list[Document] = [] - - for file_path in folder.glob("*.txt"): - doc_text = file_path.read_text(encoding="utf-8") - - if not doc_text.strip(): - continue - - metadata: dict[str, Any] = { - "source": file_path.name, - } - - doc_text = clean_text(doc_text) - chunk = Document( - id=hashlib.md5(file_path.name.encode()).hexdigest(), - page_content=doc_text, - metadata={**metadata,} - ) - - all_chunks.append(chunk) - - return all_chunks