from enum import Enum import typer import logging import os from loguru import logger from elasticsearch import Elasticsearch from langchain_elasticsearch import ElasticsearchStore from chonkie import SemanticChunker, MarkdownChef from transformers import AutoTokenizer from src.utils.emb_factory import create_embedding_model from scripts.pipelines.tasks.chunk import ( read_files, get_chunk_docs, convert_chunks_to_document ) app = typer.Typer() ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL") HF_EMB_MODEL_NAME = os.getenv("HF_EMB_MODEL_NAME") class DistanceStrategy(str, Enum): euclidean = "EUCLIDEAN_DISTANCE" max_inner_product = "MAX_INNER_PRODUCT" dot_product = "DOT_PRODUCT" jaccard = "JACCARD" cosine = "COSINE" @app.command() def elasticsearch_ingestion( docs_folder_path: str = "docs", es_index: str = "avap-docs-test-v2", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, distance_strategy: DistanceStrategy = DistanceStrategy.cosine, chunk_size: int = 2048, chunk_threshold: float = 0.5, chunk_similarity_window: int = 3, chunk_skip_window: int = 1, ): logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Reading files from folder: {docs_folder_path}/LRM and {docs_folder_path}/samples...") avap_code_docs = read_files(f"{docs_folder_path}/samples") avap_language_docs = read_files(f"{docs_folder_path}/LRM") logger.info("Instantiating semantic chunker and chef...") custom_tokenizer = AutoTokenizer.from_pretrained(HF_EMB_MODEL_NAME) chef = MarkdownChef(tokenizer=custom_tokenizer) chunker = SemanticChunker( embedding_model=HF_EMB_MODEL_NAME, chunk_size=chunk_size, threshold=chunk_threshold, similarity_window=chunk_similarity_window, skip_window=chunk_skip_window ) logger.info("Processing Markdown docs with chef...") doc = chef.process(f"{docs_folder_path}/LRM/avap.md") logger.info("Chunking AVAP Language docs...") avap_language_docs_chunks = get_chunk_docs(avap_language_docs, chunker) logger.info("Creating Langchain Document to index...") avap_language_langchain_docs = convert_chunks_to_document(avap_language_docs_chunks) avap_code_langchain_docs = convert_chunks_to_document(avap_code_docs) avap_documents = avap_language_langchain_docs + avap_code_langchain_docs logger.info("Connecting to Elasticsearch...") try: es = Elasticsearch( ELASTICSEARCH_LOCAL_URL, request_timeout=es_request_timeout, max_retries=es_max_retries, retry_on_timeout=es_retry_on_timeout, ) except: logger.exception("Failed to connect to Elasticsearch.") raise logger.info("Instantiating embeddings model...") try: embeddings = create_embedding_model( provider="ollama", model=OLLAMA_EMB_MODEL_NAME, base_url=OLLAMA_LOCAL_URL, ) except: logger.exception("Failed to instantiate embeddings model.") raise logger.info(f"Checking if index {es_index} exists and deleting if it does...") if es.indices.exists(index=es_index): es.indices.delete(index=es_index) logger.info(f"Uploading documents to index {es_index}...") ElasticsearchStore.from_documents( avap_documents, embeddings, client=es, index_name=es_index, distance_strategy=distance_strategy.value, ) logger.info(f"Finished uploading documents to index {es_index}.") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) try: app() except Exception as exc: logger.exception(exc) raise