import re import hashlib from typing import Any from enum import Enum import typer import logging import os from pathlib import Path from loguru import logger from elasticsearch import Elasticsearch from langchain_core.documents import Document from langchain_elasticsearch import ElasticsearchStore from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_experimental.text_splitter import SemanticChunker from src.utils.emb_factory import create_embedding_model from scripts.pipelines.tasks.chunk import scrape_avap_docs app = typer.Typer() ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL") class DistanceStrategy(str, Enum): euclidean = "EUCLIDEAN_DISTANCE" max_inner_product = "MAX_INNER_PRODUCT" dot_product = "DOT_PRODUCT" jaccard = "JACCARD" cosine = "COSINE" def clean_text(text: str) -> str: text = text.replace("\u00a0", " ") text = re.sub(r"\s+", " ", text).strip() return text def build_documents_from_folder( folder_path: str, ) -> list[Document]: folder = Path(folder_path) if not folder.exists() or not folder.is_dir(): raise ValueError(f"Invalid folder path: {folder_path}") all_documents: list[Document] = [] for file_path in folder.glob("*.txt"): doc_text = file_path.read_text(encoding="utf-8") if not doc_text.strip(): continue metadata: dict[str, Any] = { "source": file_path.name, } doc_text = clean_text(doc_text) document = Document( id=hashlib.md5(file_path.name.encode()).hexdigest(), page_content=doc_text, metadata={**metadata} ) all_documents.append(document) return all_documents @app.command() def elasticsearch_ingestion( docs_folder_path: str = "ingestion/docs", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, distance_strategy: DistanceStrategy = DistanceStrategy.cosine, ): logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Using docs folder path: {docs_folder_path}") documents = build_documents_from_folder(folder_path=docs_folder_path) logger.info("Connecting to Elasticsearch...") try: es = Elasticsearch( ELASTICSEARCH_LOCAL_URL, request_timeout=es_request_timeout, max_retries=es_max_retries, retry_on_timeout=es_retry_on_timeout, ) except: logger.exception("Failed to connect to Elasticsearch.") raise logger.info("Instantiating embeddings model...") try: embeddings = create_embedding_model( provider="ollama", model=OLLAMA_EMB_MODEL_NAME, base_url=OLLAMA_LOCAL_URL, ) except: logger.exception("Failed to instantiate embeddings model.") raise logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") ElasticsearchStore.from_documents( documents, embeddings, client=es, index_name=ELASTICSEARCH_INDEX, distance_strategy=distance_strategy.value, ) logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) try: app() except Exception as exc: logger.exception(exc) raise