refactor: update Elasticsearch ingestion pipeline and document processing logic

This commit is contained in:
acano 2026-03-12 09:51:00 +01:00
parent de21bcb5fb
commit aa80f60fdc
2 changed files with 56 additions and 38 deletions

View File

@ -2,31 +2,28 @@ import typer
import logging
from loguru import logger
from chonkie import FileFetcher
from src.config import settings
from scripts.pipelines.tasks.chunk import process_documents, ingest_documents
from scripts.pipelines.tasks.chunk import fetch_documents, process_documents, ingest_documents
app = typer.Typer()
@app.command()
def elasticsearch_ingestion(
docs_folder_path: str = "docs/LRM",
docs_extension: str = ".md",
docs_folder_path: str = "docs/samples",
docs_extension: list[str] = [".md", ".avap"],
es_index: str = "avap-docs-test-v3",
es_request_timeout: int = 120,
es_max_retries: int = 5,
es_retry_on_timeout: bool = True,
delete_es_index: bool = False
delete_es_index: bool = True
):
logger.info("Starting Elasticsearch ingestion pipeline...")
logger.info(f"Fetching files from {docs_folder_path}...")
fetcher = FileFetcher()
docs_path = fetcher.fetch(dir=f"{settings.proj_root}/{docs_folder_path}")
docs_path = fetch_documents(docs_folder_path, docs_extension)
logger.info("Processing docs...")
chunked_docs = process_documents(docs_path, docs_extension)
chunked_docs = process_documents(docs_path)
logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...")
ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries,

View File

@ -5,9 +5,11 @@ from pathlib import Path
from chonkie import (
Chunk,
ElasticHandshake,
FileFetcher,
MarkdownChef,
TextChef,
TokenChunker,
MarkdownDocument
)
from elasticsearch import Elasticsearch
from loguru import logger
@ -27,16 +29,16 @@ def _get_text(element) -> str:
)
def _merge_markdown_document(doc):
def _merge_markdown_document(processed_doc: MarkdownDocument) -> MarkdownDocument:
elements = []
for chunk in doc.chunks:
for chunk in processed_doc.chunks:
elements.append(("chunk", chunk.start_index, chunk.end_index, chunk))
for code in doc.code:
for code in processed_doc.code:
elements.append(("code", code.start_index, code.end_index, code))
for table in doc.tables:
for table in processed_doc.tables:
elements.append(("table", table.start_index, table.end_index, table))
elements.sort(key=lambda item: (item[1], item[2]))
@ -87,41 +89,60 @@ def _merge_markdown_document(doc):
flush()
new_doc = deepcopy(doc)
new_doc.chunks = merged_chunks
new_doc.code = doc.code
new_doc.tables = doc.tables
fused_processed_doc = deepcopy(processed_doc)
fused_processed_doc.chunks = merged_chunks
fused_processed_doc.code = processed_doc.code
fused_processed_doc.tables = processed_doc.tables
return new_doc
return fused_processed_doc
def process_documents(docs_path: list[Path], docs_extension: str) -> list[Chunk]:
def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Path]:
"""
Fetch files from a folder that match the specified extensions.
Args:
docs_folder_path (str): Path to the folder containing documents
docs_extension (list[str]): List of file extensions to filter by (e.g., [".md", ".avap"])
Returns:
List of Paths to the fetched documents
"""
fetcher = FileFetcher()
docs_path = fetcher.fetch(dir=f"{settings.proj_root}/{docs_folder_path}", ext=docs_extension)
return docs_path
def process_documents(docs_path: list[Path]) -> list[Chunk]:
"""
Process documents by applying appropriate chefs and chunking strategies based on file type.
Args:
docs_path (list[Path]): List of Paths to the documents to be processed
Returns:
List of processed documents ready for ingestion
"""
processed_docs = []
chunked_docs = []
custom_tokenizer = AutoTokenizer.from_pretrained(settings.hf_emb_model_name)
chef_md = MarkdownChef(tokenizer=custom_tokenizer)
chef_txt = TextChef()
chunker = TokenChunker(tokenizer=custom_tokenizer)
if docs_extension == ".md":
chef = MarkdownChef(tokenizer=custom_tokenizer)
for doc in docs_path:
processed_doc = chef.process(doc)
processed_docs.append((processed_doc, doc.name))
for doc_path in docs_path:
doc_extension = doc_path.suffix.lower()
for processed_doc, filename in processed_docs:
if doc_extension == ".md":
processed_doc = chef_md.process(doc_path)
fused_doc = _merge_markdown_document(processed_doc)
chunked_docs.extend(fused_doc.chunks)
processed_docs.extend(fused_doc.chunks)
elif docs_extension == ".avap":
chef = TextChef()
chunker = TokenChunker(tokenizer=custom_tokenizer)
for doc in docs_path:
processed_doc = chef.process(doc)
processed_docs.append((processed_doc, doc.name))
for processed_doc, filename in processed_docs:
elif doc_extension == ".avap":
processed_doc = chef_txt.process(doc_path)
chunked_doc = chunker.chunk(processed_doc.content)
chunked_docs.extend(chunked_doc)
processed_docs.extend(chunked_doc)
return chunked_docs
return processed_docs
def ingest_documents(