assistance-engine/research/code_indexing/elasticsearch_ingestion.py

64 lines
2.5 KiB
Python

import typer
from loguru import logger
from scripts.pipelines.tasks.chunk import (
fetch_documents,
process_documents,
export_documents,
ingest_documents
)
app = typer.Typer()
@app.command()
def elasticsearch_ingestion(
docs_folder_path: str = "docs/samples",
output_path: str = "research/code_indexing/chunks/chunks_EBNF_metadata.json",
docs_extension: list[str] = [".avap"],
es_index: str = "avap-code-indexing-ebnf-metadata",
es_request_timeout: int = 120,
es_max_retries: int = 5,
es_retry_on_timeout: bool = True,
delete_es_index: bool = True
) -> None:
"""
Pipeline to ingest documents into an Elasticsearch index.
The pipeline includes fetching documents from a specified folder, processing them into chunks, and then ingesting those chunks into the specified Elasticsearch index.
Args:
docs_folder_path (str): Path to the folder containing documents to be ingested. Default is "docs/samples".
docs_extension (list[str]): List of file extensions to filter by (e.g., [".md", ".avap"]). Default is [".md", ".avap"].
es_index (str): Name of the Elasticsearch index to ingest documents into. Default is "avap-docs-test-v3".
es_request_timeout (int): Timeout in seconds for Elasticsearch requests. Default is 120 seconds.
es_max_retries (int): Maximum number of retries for Elasticsearch requests in case of failure. Default is 5 retries.
es_retry_on_timeout (bool): Whether to retry Elasticsearch requests on timeout. Default is True.
delete_es_index (bool): Whether to delete the existing Elasticsearch index before ingestion. Default is True.
Returns:
None
"""
logger.info("Starting Elasticsearch ingestion pipeline...")
logger.info(f"Fetching files from {docs_folder_path}...")
docs_path = fetch_documents(docs_folder_path, docs_extension)
logger.info("Processing docs...")
chunked_docs = process_documents(docs_path)
logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...")
elasticsearch_docs = ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries,
es_retry_on_timeout, delete_es_index)
logger.info(f"Exporting processed documents to {output_path}...")
export_documents(elasticsearch_docs, output_path)
logger.info(f"Finished ingesting in {es_index}.")
if __name__ == "__main__":
try:
app()
except Exception as exc:
logger.exception(exc)
raise