feat: Enhance Elasticsearch ingestion process with metadata export

- Added output path parameter to elasticsearch_ingestion command for exporting processed documents.
- Implemented ElasticHandshakeWithMetadata class to preserve chunk metadata during ingestion.
- Updated process_documents function to include extra metadata for each chunk.
- Modified ingest_documents function to return Elasticsearch response for each chunk.
- Introduced export_documents function to save processed documents as JSON files.
This commit is contained in:
acano 2026-03-12 12:26:47 +01:00
parent 4a81ec00a2
commit ed25f15542
4 changed files with 93446 additions and 13 deletions

1045
docs/samples/avap.md Normal file

File diff suppressed because it is too large Load Diff

92295
ingestion/chunks.json Normal file

File diff suppressed because one or more lines are too long

View File

@ -3,7 +3,12 @@ import logging
from loguru import logger from loguru import logger
from scripts.pipelines.tasks.chunk import fetch_documents, process_documents, ingest_documents from scripts.pipelines.tasks.chunk import (
fetch_documents,
process_documents,
export_documents,
ingest_documents
)
app = typer.Typer() app = typer.Typer()
@ -11,6 +16,7 @@ app = typer.Typer()
@app.command() @app.command()
def elasticsearch_ingestion( def elasticsearch_ingestion(
docs_folder_path: str = "docs/samples", docs_folder_path: str = "docs/samples",
output_path: str = "ingestion/chunks.json",
docs_extension: list[str] = [".md", ".avap"], docs_extension: list[str] = [".md", ".avap"],
es_index: str = "avap-docs-test-v3", es_index: str = "avap-docs-test-v3",
es_request_timeout: int = 120, es_request_timeout: int = 120,
@ -42,8 +48,11 @@ def elasticsearch_ingestion(
chunked_docs = process_documents(docs_path) chunked_docs = process_documents(docs_path)
logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...") logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...")
ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries, elasticsearch_docs = ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries,
es_retry_on_timeout, delete_es_index) es_retry_on_timeout, delete_es_index)
logger.info(f"Exporting processed documents to {output_path}...")
export_documents(elasticsearch_docs, output_path)
logger.info(f"Finished ingesting in {es_index}.") logger.info(f"Finished ingesting in {es_index}.")

View File

@ -1,6 +1,8 @@
import json
from copy import deepcopy from copy import deepcopy
from dataclasses import replace from dataclasses import replace
from pathlib import Path from pathlib import Path
from typing import Any, Union
from chonkie import ( from chonkie import (
Chunk, Chunk,
@ -97,6 +99,58 @@ def _merge_markdown_document(processed_doc: MarkdownDocument) -> MarkdownDocumen
return fused_processed_doc return fused_processed_doc
class ElasticHandshakeWithMetadata(ElasticHandshake):
"""Extended ElasticHandshake that preserves chunk metadata in Elasticsearch."""
def _create_bulk_actions(self, chunks: list[dict]) -> list[dict[str, Any]]:
"""Generate bulk actions including metadata."""
actions = []
embeddings = self.embedding_model.embed_batch([chunk["chunk"].text for chunk in chunks])
for i, chunk in enumerate(chunks):
source = {
"text": chunk["chunk"].text,
"embedding": embeddings[i],
"start_index": chunk["chunk"].start_index,
"end_index": chunk["chunk"].end_index,
"token_count": chunk["chunk"].token_count,
}
# Include metadata if it exists
if chunk.get("extra_metadata"):
source.update(chunk["extra_metadata"])
actions.append({
"_index": self.index_name,
"_id": self._generate_id(i, chunk["chunk"]),
"_source": source,
})
return actions
def write(self, chunks: Union[Chunk, list[Chunk]]) -> list[dict[str, Any]]:
"""Write the chunks to the Elasticsearch index using the bulk API."""
if isinstance(chunks, Chunk):
chunks = [chunks]
actions = self._create_bulk_actions(chunks)
# Use the bulk helper to efficiently write the documents
from elasticsearch.helpers import bulk
success, errors = bulk(self.client, actions, raise_on_error=False)
if errors:
logger.warning(f"Encountered {len(errors)} errors during bulk indexing.") # type: ignore
# Optionally log the first few errors for debugging
for i, error in enumerate(errors[:5]): # type: ignore
logger.error(f"Error {i + 1}: {error}")
logger.info(f"Chonkie wrote {success} chunks to Elasticsearch index: {self.index_name}")
return actions
def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Path]: def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Path]:
""" """
Fetch files from a folder that match the specified extensions. Fetch files from a folder that match the specified extensions.
@ -113,7 +167,7 @@ def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Pa
return docs_path return docs_path
def process_documents(docs_path: list[Path]) -> list[Chunk]: def process_documents(docs_path: list[Path]) -> list[dict[str, Chunk | dict[str, Any]]]:
""" """
Process documents by applying appropriate chefs and chunking strategies based on file type. Process documents by applying appropriate chefs and chunking strategies based on file type.
@ -121,7 +175,7 @@ def process_documents(docs_path: list[Path]) -> list[Chunk]:
docs_path (list[Path]): List of Paths to the documents to be processed docs_path (list[Path]): List of Paths to the documents to be processed
Returns: Returns:
List of processed documents ready for ingestion List of dicts with "chunk" (Chunk object) and "metadata" (dict with file info)
""" """
processed_docs = [] processed_docs = []
custom_tokenizer = AutoTokenizer.from_pretrained(settings.hf_emb_model_name) custom_tokenizer = AutoTokenizer.from_pretrained(settings.hf_emb_model_name)
@ -131,33 +185,40 @@ def process_documents(docs_path: list[Path]) -> list[Chunk]:
for doc_path in docs_path: for doc_path in docs_path:
doc_extension = doc_path.suffix.lower() doc_extension = doc_path.suffix.lower()
filename = doc_path.name
if doc_extension == ".md": if doc_extension == ".md":
processed_doc = chef_md.process(doc_path) processed_doc = chef_md.process(doc_path)
fused_doc = _merge_markdown_document(processed_doc) fused_doc = _merge_markdown_document(processed_doc)
processed_docs.extend(fused_doc.chunks) chunked_doc = fused_doc.chunks
elif doc_extension == ".avap": elif doc_extension == ".avap":
processed_doc = chef_txt.process(doc_path) processed_doc = chef_txt.process(doc_path)
chunked_doc = chunker.chunk(processed_doc.content) chunked_doc = chunker.chunk(processed_doc.content)
processed_docs.extend(chunked_doc) else:
continue
for chunk in chunked_doc:
processed_docs.append({
"chunk": chunk,
"extra_metadata": {"file": filename}
})
return processed_docs return processed_docs
def ingest_documents( def ingest_documents(
chunked_docs: list[Chunk], chunked_docs: list[dict[str, Chunk | dict[str, Any]]],
es_index: str, es_index: str,
es_request_timeout: int, es_request_timeout: int,
es_max_retries: int, es_max_retries: int,
es_retry_on_timeout: bool, es_retry_on_timeout: bool,
delete_es_index: bool, delete_es_index: bool,
) -> None: ) -> list[dict[str, Any]]:
""" """
Ingest processed documents into an Elasticsearch index. Ingest processed documents into an Elasticsearch index.
Args: Args:
chunked_docs (list[Chunk]): List of processed document chunks to be ingested chunked_docs (list[dict[str, Any]]): List of dicts with "chunk" and "metadata" keys
es_index (str): Name of the Elasticsearch index to ingest into es_index (str): Name of the Elasticsearch index to ingest into
es_request_timeout (int): Timeout for Elasticsearch requests in seconds es_request_timeout (int): Timeout for Elasticsearch requests in seconds
es_max_retries (int): Maximum number of retries for Elasticsearch requests es_max_retries (int): Maximum number of retries for Elasticsearch requests
@ -165,7 +226,7 @@ def ingest_documents(
delete_es_index (bool): Whether to delete the existing Elasticsearch index before ingestion delete_es_index (bool): Whether to delete the existing Elasticsearch index before ingestion
Returns: Returns:
None List of dicts with Elasticsearch response for each chunk
""" """
logger.info( logger.info(
f"Instantiating Elasticsearch client with URL: {settings.elasticsearch_local_url}..." f"Instantiating Elasticsearch client with URL: {settings.elasticsearch_local_url}..."
@ -181,7 +242,7 @@ def ingest_documents(
logger.info(f"Deleting existing Elasticsearch index: {es_index}...") logger.info(f"Deleting existing Elasticsearch index: {es_index}...")
es.indices.delete(index=es_index) es.indices.delete(index=es_index)
handshake = ElasticHandshake( handshake = ElasticHandshakeWithMetadata(
client=es, client=es,
index_name=es_index, index_name=es_index,
embedding_model=OllamaEmbeddings(model=settings.ollama_emb_model_name), embedding_model=OllamaEmbeddings(model=settings.ollama_emb_model_name),
@ -190,4 +251,27 @@ def ingest_documents(
logger.info( logger.info(
f"Ingesting {len(chunked_docs)} chunks into Elasticsearch index: {es_index}..." f"Ingesting {len(chunked_docs)} chunks into Elasticsearch index: {es_index}..."
) )
handshake.write(chunked_docs) elasticsearch_chunks = handshake.write(chunked_docs)
return elasticsearch_chunks
def export_documents(elasticsearch_chunks: list[dict[str, Any]], output_path: str) -> None:
"""
Export processed documents to JSON files in the specified output folder.
Args:
elasticsearch_chunks (list[dict[str, Any]]): List of dicts with Elasticsearch response for each chunk
output_path (str): Path to the file where the JSON will be saved
Returns:
None
"""
output_path = settings.proj_root / output_path
for chunk in elasticsearch_chunks:
chunk["_source"]["embedding"] = chunk["_source"]["embedding"].tolist() # For JSON serialization
with output_path.open("w", encoding="utf-8") as f:
json.dump(elasticsearch_chunks, f, ensure_ascii=False, indent=4)
logger.info(f"Exported processed documents to {output_path}")