refactor: Simplify Elasticsearch ingestion by removing chunk management module and integrating document building directly

This commit is contained in:
acano 2026-03-05 16:23:27 +01:00
parent 31206e8fce
commit d951868200
4 changed files with 65 additions and 62 deletions

View File

@ -64,8 +64,7 @@ graph TD
│ └── kubeconfig.yaml # Kubernetes cluster configuration │ └── kubeconfig.yaml # Kubernetes cluster configuration
├── scripts/ ├── scripts/
│ └── pipelines/ │ └── pipelines/
│ ├── flows/ # Data processing flows │ └── flows/ # Data processing flows
│ └── tasks/ # Pipeline task definitions
└── src/ └── src/
├── __init__.py ├── __init__.py
├── config.py # Application configuration ├── config.py # Application configuration

View File

@ -14,7 +14,6 @@ All notable changes to the **Brunix Assistance Engine** will be documented in th
- `src/prompts.py`: centralized prompt definitions added. - `src/prompts.py`: centralized prompt definitions added.
- `src/state.py`: shared state management module added. - `src/state.py`: shared state management module added.
- `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. - `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database.
- `pipelines/tasks/chunks.py`: module with functions related to chunk management.
- `ingestion/docs`: folder containing all chunked AVAP documents. - `ingestion/docs`: folder containing all chunked AVAP documents.
### Changed ### Changed

View File

@ -1,33 +1,82 @@
import re
import hashlib import hashlib
from typing import Any
from enum import Enum
import typer
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore from langchain_elasticsearch import ElasticsearchStore
from langchain_ollama import OllamaEmbeddings from src.utils.emb_factory import create_embedding_model
from scripts.pipelines.tasks.chunks import build_chunks_from_folder
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = typer.Typer()
ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL")
OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL")
ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX")
OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_URL = os.getenv("OLLAMA_URL")
OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME")
PROJ_ROOT = Path(__file__).resolve().parents[3]
class DistanceStrategy(str, Enum):
euclidean = "EUCLIDEAN_DISTANCE"
max_inner_product = "MAX_INNER_PRODUCT"
dot_product = "DOT_PRODUCT"
jaccard = "JACCARD"
cosine = "COSINE"
def clean_text(text: str) -> str:
text = text.replace("\u00a0", " ")
text = re.sub(r"\s+", " ", text).strip()
return text
def build_documents_from_folder(
folder_path: str,
) -> list[Document]:
folder = Path(folder_path)
if not folder.exists() or not folder.is_dir():
raise ValueError(f"Invalid folder path: {folder_path}")
all_documents: list[Document] = []
for file_path in folder.glob("*.txt"):
doc_text = file_path.read_text(encoding="utf-8")
if not doc_text.strip():
continue
metadata: dict[str, Any] = {
"source": file_path.name,
}
doc_text = clean_text(doc_text)
document = Document(
id=hashlib.md5(file_path.name.encode()).hexdigest(),
page_content=doc_text,
metadata={**metadata}
)
all_documents.append(document)
return all_documents
@app.command()
def elasticsearch_ingestion( def elasticsearch_ingestion(
docs_folder_path: str = "ingestion", docs_folder_path: str = "ingestion/docs",
es_request_timeout: int = 120, es_request_timeout: int = 120,
es_max_retries: int = 5, es_max_retries: int = 5,
es_retry_on_timeout: bool = True, es_retry_on_timeout: bool = True,
distance_strategy: str = "COSINE", distance_strategy: DistanceStrategy = DistanceStrategy.cosine,
): ):
logger.info("Starting Elasticsearch ingestion pipeline...") logger.info("Starting Elasticsearch ingestion pipeline...")
logger.info(f"Using docs folder path: {PROJ_ROOT / docs_folder_path}") logger.info(f"Using docs folder path: {docs_folder_path}")
chunks = build_chunks_from_folder(folder_path=PROJ_ROOT / docs_folder_path) documents = build_documents_from_folder(folder_path=docs_folder_path)
logger.info("Connecting to Elasticsearch...") logger.info("Connecting to Elasticsearch...")
try: try:
@ -43,8 +92,10 @@ def elasticsearch_ingestion(
logger.info("Instantiating embeddings model...") logger.info("Instantiating embeddings model...")
try: try:
embeddings = OllamaEmbeddings( embeddings = create_embedding_model(
base_url=OLLAMA_LOCAL_URL, model=OLLAMA_EMB_MODEL_NAME provider="ollama",
model=OLLAMA_EMB_MODEL_NAME,
base_url=OLLAMA_LOCAL_URL,
) )
except: except:
logger.exception("Failed to instantiate embeddings model.") logger.exception("Failed to instantiate embeddings model.")
@ -52,14 +103,13 @@ def elasticsearch_ingestion(
logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...")
ElasticsearchStore.from_documents( ElasticsearchStore.from_documents(
chunks, documents,
embeddings, embeddings,
client=es, client=es,
index_name=ELASTICSEARCH_INDEX, index_name=ELASTICSEARCH_INDEX,
distance_strategy=distance_strategy, distance_strategy=distance_strategy.value,
) )
logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.")
logger.info(f"Total documents uploaded: {len(chunks)}.")
if __name__ == "__main__": if __name__ == "__main__":
@ -68,7 +118,7 @@ if __name__ == "__main__":
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
) )
try: try:
elasticsearch_ingestion() app()
except Exception as exc: except Exception as exc:
logger.exception(exc) logger.exception(exc)
raise raise

View File

@ -1,45 +0,0 @@
import re
import hashlib
from pathlib import Path
from typing import Any
from langchain_core.documents import Document
def clean_text(text: str) -> str:
text = text.replace("\u00a0", " ")
text = re.sub(r"\s+", " ", text).strip()
return text
def build_chunks_from_folder(
folder_path: str,
) -> list[Document]:
folder = Path(folder_path)
if not folder.exists() or not folder.is_dir():
raise ValueError(f"Invalid folder path: {folder_path}")
all_chunks: list[Document] = []
for file_path in folder.glob("*.txt"):
doc_text = file_path.read_text(encoding="utf-8")
if not doc_text.strip():
continue
metadata: dict[str, Any] = {
"source": file_path.name,
}
doc_text = clean_text(doc_text)
chunk = Document(
id=hashlib.md5(file_path.name.encode()).hexdigest(),
page_content=doc_text,
metadata={**metadata,}
)
all_chunks.append(chunk)
return all_chunks