From aa80f60fdc001fddabe018352eaf28daaea0ab3e Mon Sep 17 00:00:00 2001 From: acano Date: Thu, 12 Mar 2026 09:51:00 +0100 Subject: [PATCH] refactor: update Elasticsearch ingestion pipeline and document processing logic --- .../flows/elasticsearch_ingestion.py | 15 ++-- scripts/pipelines/tasks/chunk.py | 79 ++++++++++++------- 2 files changed, 56 insertions(+), 38 deletions(-) diff --git a/scripts/pipelines/flows/elasticsearch_ingestion.py b/scripts/pipelines/flows/elasticsearch_ingestion.py index f9c510c..80a49d3 100644 --- a/scripts/pipelines/flows/elasticsearch_ingestion.py +++ b/scripts/pipelines/flows/elasticsearch_ingestion.py @@ -2,31 +2,28 @@ import typer import logging from loguru import logger -from chonkie import FileFetcher -from src.config import settings -from scripts.pipelines.tasks.chunk import process_documents, ingest_documents +from scripts.pipelines.tasks.chunk import fetch_documents, process_documents, ingest_documents app = typer.Typer() @app.command() def elasticsearch_ingestion( - docs_folder_path: str = "docs/LRM", - docs_extension: str = ".md", + docs_folder_path: str = "docs/samples", + docs_extension: list[str] = [".md", ".avap"], es_index: str = "avap-docs-test-v3", es_request_timeout: int = 120, es_max_retries: int = 5, es_retry_on_timeout: bool = True, - delete_es_index: bool = False + delete_es_index: bool = True ): logger.info("Starting Elasticsearch ingestion pipeline...") logger.info(f"Fetching files from {docs_folder_path}...") - fetcher = FileFetcher() - docs_path = fetcher.fetch(dir=f"{settings.proj_root}/{docs_folder_path}") + docs_path = fetch_documents(docs_folder_path, docs_extension) logger.info("Processing docs...") - chunked_docs = process_documents(docs_path, docs_extension) + chunked_docs = process_documents(docs_path) logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...") ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries, diff --git a/scripts/pipelines/tasks/chunk.py b/scripts/pipelines/tasks/chunk.py index 72c39ea..452e9a6 100644 --- a/scripts/pipelines/tasks/chunk.py +++ b/scripts/pipelines/tasks/chunk.py @@ -5,9 +5,11 @@ from pathlib import Path from chonkie import ( Chunk, ElasticHandshake, + FileFetcher, MarkdownChef, TextChef, TokenChunker, + MarkdownDocument ) from elasticsearch import Elasticsearch from loguru import logger @@ -27,16 +29,16 @@ def _get_text(element) -> str: ) -def _merge_markdown_document(doc): +def _merge_markdown_document(processed_doc: MarkdownDocument) -> MarkdownDocument: elements = [] - for chunk in doc.chunks: + for chunk in processed_doc.chunks: elements.append(("chunk", chunk.start_index, chunk.end_index, chunk)) - for code in doc.code: + for code in processed_doc.code: elements.append(("code", code.start_index, code.end_index, code)) - for table in doc.tables: + for table in processed_doc.tables: elements.append(("table", table.start_index, table.end_index, table)) elements.sort(key=lambda item: (item[1], item[2])) @@ -87,41 +89,60 @@ def _merge_markdown_document(doc): flush() - new_doc = deepcopy(doc) - new_doc.chunks = merged_chunks - new_doc.code = doc.code - new_doc.tables = doc.tables + fused_processed_doc = deepcopy(processed_doc) + fused_processed_doc.chunks = merged_chunks + fused_processed_doc.code = processed_doc.code + fused_processed_doc.tables = processed_doc.tables - return new_doc + return fused_processed_doc -def process_documents(docs_path: list[Path], docs_extension: str) -> list[Chunk]: +def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Path]: + """ + Fetch files from a folder that match the specified extensions. + + Args: + docs_folder_path (str): Path to the folder containing documents + docs_extension (list[str]): List of file extensions to filter by (e.g., [".md", ".avap"]) + + Returns: + List of Paths to the fetched documents + """ + fetcher = FileFetcher() + docs_path = fetcher.fetch(dir=f"{settings.proj_root}/{docs_folder_path}", ext=docs_extension) + return docs_path + + +def process_documents(docs_path: list[Path]) -> list[Chunk]: + """ + Process documents by applying appropriate chefs and chunking strategies based on file type. + + Args: + docs_path (list[Path]): List of Paths to the documents to be processed + + Returns: + List of processed documents ready for ingestion + """ processed_docs = [] - chunked_docs = [] custom_tokenizer = AutoTokenizer.from_pretrained(settings.hf_emb_model_name) + chef_md = MarkdownChef(tokenizer=custom_tokenizer) + chef_txt = TextChef() + chunker = TokenChunker(tokenizer=custom_tokenizer) - if docs_extension == ".md": - chef = MarkdownChef(tokenizer=custom_tokenizer) - for doc in docs_path: - processed_doc = chef.process(doc) - processed_docs.append((processed_doc, doc.name)) - - for processed_doc, filename in processed_docs: + for doc_path in docs_path: + doc_extension = doc_path.suffix.lower() + + if doc_extension == ".md": + processed_doc = chef_md.process(doc_path) fused_doc = _merge_markdown_document(processed_doc) - chunked_docs.extend(fused_doc.chunks) + processed_docs.extend(fused_doc.chunks) - elif docs_extension == ".avap": - chef = TextChef() - chunker = TokenChunker(tokenizer=custom_tokenizer) - for doc in docs_path: - processed_doc = chef.process(doc) - processed_docs.append((processed_doc, doc.name)) - - for processed_doc, filename in processed_docs: + elif doc_extension == ".avap": + processed_doc = chef_txt.process(doc_path) chunked_doc = chunker.chunk(processed_doc.content) - chunked_docs.extend(chunked_doc) + processed_docs.extend(chunked_doc) - return chunked_docs + return processed_docs def ingest_documents(