import hashlib import logging import os from pathlib import Path from elasticsearch import Elasticsearch from langchain_elasticsearch import ElasticsearchStore from langchain_ollama import OllamaEmbeddings from scripts.pipelines.tasks.chunks import build_chunks_from_folder logger = logging.getLogger(__name__) ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") PROJ_ROOT = Path(__file__).resolve().parents[3] def elasticsearch_ingestion( docs_folder_path: str = "ingestion", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, distance_strategy: str = "COSINE", ): logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Using docs folder path: {PROJ_ROOT / docs_folder_path}") chunks = build_chunks_from_folder(folder_path=PROJ_ROOT / docs_folder_path) logger.info("Connecting to Elasticsearch...") try: es = Elasticsearch( ELASTICSEARCH_LOCAL_URL, request_timeout=es_request_timeout, max_retries=es_max_retries, retry_on_timeout=es_retry_on_timeout, ) except: logger.exception("Failed to connect to Elasticsearch.") raise logger.info("Instantiating embeddings model...") try: embeddings = OllamaEmbeddings( base_url=OLLAMA_LOCAL_URL, model=OLLAMA_EMB_MODEL_NAME ) except: logger.exception("Failed to instantiate embeddings model.") raise logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") ElasticsearchStore.from_documents( chunks, embeddings, client=es, index_name=ELASTICSEARCH_INDEX, distance_strategy=distance_strategy, ) logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") logger.info(f"Total documents uploaded: {len(chunks)}.") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) try: elasticsearch_ingestion() except Exception as exc: logger.exception(exc) raise