import re import hashlib from typing import Any from enum import Enum import typer import logging import os from pathlib import Path from elasticsearch import Elasticsearch from langchain_core.documents import Document from langchain_elasticsearch import ElasticsearchStore from src.utils.emb_factory import create_embedding_model logger = logging.getLogger(__name__) app = typer.Typer() ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") class DistanceStrategy(str, Enum): euclidean = "EUCLIDEAN_DISTANCE" max_inner_product = "MAX_INNER_PRODUCT" dot_product = "DOT_PRODUCT" jaccard = "JACCARD" cosine = "COSINE" def clean_text(text: str) -> str: text = text.replace("\u00a0", " ") text = re.sub(r"\s+", " ", text).strip() return text def build_documents_from_folder( folder_path: str, ) -> list[Document]: folder = Path(folder_path) if not folder.exists() or not folder.is_dir(): raise ValueError(f"Invalid folder path: {folder_path}") all_documents: list[Document] = [] for file_path in folder.glob("*.txt"): doc_text = file_path.read_text(encoding="utf-8") if not doc_text.strip(): continue metadata: dict[str, Any] = { "source": file_path.name, } doc_text = clean_text(doc_text) document = Document( id=hashlib.md5(file_path.name.encode()).hexdigest(), page_content=doc_text, metadata={**metadata} ) all_documents.append(document) return all_documents @app.command() def elasticsearch_ingestion( docs_folder_path: str = "ingestion/docs", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, distance_strategy: DistanceStrategy = DistanceStrategy.cosine, ): logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Using docs folder path: {docs_folder_path}") documents = build_documents_from_folder(folder_path=docs_folder_path) logger.info("Connecting to Elasticsearch...") try: es = Elasticsearch( ELASTICSEARCH_LOCAL_URL, request_timeout=es_request_timeout, max_retries=es_max_retries, retry_on_timeout=es_retry_on_timeout, ) except: logger.exception("Failed to connect to Elasticsearch.") raise logger.info("Instantiating embeddings model...") try: embeddings = create_embedding_model( provider="ollama", model=OLLAMA_EMB_MODEL_NAME, base_url=OLLAMA_LOCAL_URL, ) except: logger.exception("Failed to instantiate embeddings model.") raise logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") ElasticsearchStore.from_documents( documents, embeddings, client=es, index_name=ELASTICSEARCH_INDEX, distance_strategy=distance_strategy.value, ) logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) try: app() except Exception as exc: logger.exception(exc) raise