import typer from loguru import logger from scripts.pipelines.tasks.chunk import ( fetch_documents, process_documents, export_documents, ingest_documents ) app = typer.Typer() @app.command() def elasticsearch_ingestion( docs_folder_path: str = "docs/samples", output_path: str = "research/code_indexing/chunks/chunks_EBNF_metadata.json", docs_extension: list[str] = [".avap"], es_index: str = "avap-code-indexing-ebnf-metadata", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, delete_es_index: bool = True ) -> None: """ Pipeline to ingest documents into an Elasticsearch index. The pipeline includes fetching documents from a specified folder, processing them into chunks, and then ingesting those chunks into the specified Elasticsearch index. Args: docs_folder_path (str): Path to the folder containing documents to be ingested. Default is "docs/samples". docs_extension (list[str]): List of file extensions to filter by (e.g., [".md", ".avap"]). Default is [".md", ".avap"]. es_index (str): Name of the Elasticsearch index to ingest documents into. Default is "avap-docs-test-v3". es_request_timeout (int): Timeout in seconds for Elasticsearch requests. Default is 120 seconds. es_max_retries (int): Maximum number of retries for Elasticsearch requests in case of failure. Default is 5 retries. es_retry_on_timeout (bool): Whether to retry Elasticsearch requests on timeout. Default is True. delete_es_index (bool): Whether to delete the existing Elasticsearch index before ingestion. Default is True. Returns: None """ logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Fetching files from {docs_folder_path}...") docs_path = fetch_documents(docs_folder_path, docs_extension) logger.info("Processing docs...") chunked_docs = process_documents(docs_path) logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...") elasticsearch_docs = ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries, es_retry_on_timeout, delete_es_index) logger.info(f"Exporting processed documents to {output_path}...") export_documents(elasticsearch_docs, output_path) logger.info(f"Finished ingesting in {es_index}.") if __name__ == "__main__": try: app() except Exception as exc: logger.exception(exc) raise