from enum import Enum import typer import logging import os from pathlib import Path from loguru import logger from elasticsearch import Elasticsearch from langchain_elasticsearch import ElasticsearchStore from chonkie import SemanticChunker from src.utils.emb_factory import create_embedding_model from scripts.pipelines.tasks.chunk import ( read_files, get_chunk_docs, convert_chunks_to_document ) app = typer.Typer() ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") OLLAMA_URL = os.getenv("OLLAMA_URL") OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL") HF_EMB_MODEL_NAME = os.getenv("HF_EMB_MODEL_NAME") class DistanceStrategy(str, Enum): euclidean = "EUCLIDEAN_DISTANCE" max_inner_product = "MAX_INNER_PRODUCT" dot_product = "DOT_PRODUCT" jaccard = "JACCARD" cosine = "COSINE" @app.command() def elasticsearch_ingestion( docs_folder_path: str = "docs", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, distance_strategy: DistanceStrategy = DistanceStrategy.cosine, chunk_size: int = 2048, chunk_threshold: float = 0.5, chunk_similarity_window: int = 3, chunk_skip_window: int = 1, ): logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Reading and concatenating files from folder: {docs_folder_path}/developer.avapframework.com") avap_github_docs = read_files(f"{docs_folder_path}/avap_language_github_docs", concatenate=False) avap_web_docs_intro = read_files(f"{docs_folder_path}/developer.avapframework.com", "intro", concatenate=True) # Check chapters in developer.avapframework.com folder and read and concatenate files for each chapter chapters = sorted({ p.name.split("_")[0] for p in Path(f"{docs_folder_path}/developer.avapframework.com").glob("chapter*.md") }) avap_web_docs_chapters = [ item for chapter in chapters for item in read_files( f"{docs_folder_path}/developer.avapframework.com", f"{chapter}_", concatenate=True ) ] avap_web_docs_appendices = read_files(f"{docs_folder_path}/developer.avapframework.com", "appendices_", concatenate=False) avap_samples_docs = read_files(f"{docs_folder_path}/samples", concatenate=False) logger.info("Instantiating semantic chunker...") chunker = SemanticChunker( embedding_model=HF_EMB_MODEL_NAME, chunk_size=chunk_size, threshold=chunk_threshold, similarity_window=chunk_similarity_window, skip_window=chunk_skip_window ) logger.info("Chunking AVAP GitHub docs...") avap_github_docs_chunks = get_chunk_docs(avap_github_docs, chunker) logger.info("Chunking AVAP web docs chapters...") avap_web_docs_chapters_chunks = get_chunk_docs(avap_web_docs_chapters, chunker) logger.info("Creating Langchain Document to index...") avap_github_langchain_docs = convert_chunks_to_document(avap_github_docs_chunks) avap_web_chapters_langchain_docs = convert_chunks_to_document(avap_web_docs_chapters_chunks) avap_web_intro_langchain_docs = convert_chunks_to_document(avap_web_docs_intro) avap_web_appendices_langchain_docs = convert_chunks_to_document(avap_web_docs_appendices) avap_samples_langchain_docs = convert_chunks_to_document(avap_samples_docs) avap_documents = avap_github_langchain_docs + avap_web_chapters_langchain_docs + avap_web_intro_langchain_docs + avap_web_appendices_langchain_docs + avap_samples_langchain_docs logger.info("Connecting to Elasticsearch...") try: es = Elasticsearch( ELASTICSEARCH_LOCAL_URL, request_timeout=es_request_timeout, max_retries=es_max_retries, retry_on_timeout=es_retry_on_timeout, ) except: logger.exception("Failed to connect to Elasticsearch.") raise logger.info("Instantiating embeddings model...") try: embeddings = create_embedding_model( provider="ollama", model=OLLAMA_EMB_MODEL_NAME, base_url=OLLAMA_LOCAL_URL, ) except: logger.exception("Failed to instantiate embeddings model.") raise logger.info(f"Checking if index {ELASTICSEARCH_INDEX} exists and deleting if it does...") if es.indices.exists(index=ELASTICSEARCH_INDEX): es.indices.delete(index=ELASTICSEARCH_INDEX) logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") ElasticsearchStore.from_documents( avap_documents, embeddings, client=es, index_name=ELASTICSEARCH_INDEX, distance_strategy=distance_strategy.value, ) logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) try: app() except Exception as exc: logger.exception(exc) raise