feat: Enhance Elasticsearch ingestion with metadata export
- Added `export_documents` function to save processed documents to JSON. - Extended `ElasticHandshake` to include chunk metadata during ingestion. - Updated `process_documents` to include extra metadata for each chunk. - Modified `ingest_documents` to return Elasticsearch responses for further processing. - Adjusted `elasticsearch_ingestion` command to accept output path for exported JSON.
This commit is contained in:
parent
f183beb088
commit
654ac88da7
File diff suppressed because one or more lines are too long
|
|
@ -3,7 +3,12 @@ import logging
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from scripts.pipelines.tasks.chunk import fetch_documents, process_documents, ingest_documents
|
from scripts.pipelines.tasks.chunk import (
|
||||||
|
fetch_documents,
|
||||||
|
process_documents,
|
||||||
|
export_documents,
|
||||||
|
ingest_documents
|
||||||
|
)
|
||||||
|
|
||||||
app = typer.Typer()
|
app = typer.Typer()
|
||||||
|
|
||||||
|
|
@ -11,6 +16,7 @@ app = typer.Typer()
|
||||||
@app.command()
|
@app.command()
|
||||||
def elasticsearch_ingestion(
|
def elasticsearch_ingestion(
|
||||||
docs_folder_path: str = "docs/samples",
|
docs_folder_path: str = "docs/samples",
|
||||||
|
output_path: str = "ingestion/chunks.json",
|
||||||
docs_extension: list[str] = [".md", ".avap"],
|
docs_extension: list[str] = [".md", ".avap"],
|
||||||
es_index: str = "avap-docs-test-v3",
|
es_index: str = "avap-docs-test-v3",
|
||||||
es_request_timeout: int = 120,
|
es_request_timeout: int = 120,
|
||||||
|
|
@ -42,8 +48,11 @@ def elasticsearch_ingestion(
|
||||||
chunked_docs = process_documents(docs_path)
|
chunked_docs = process_documents(docs_path)
|
||||||
|
|
||||||
logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...")
|
logger.info(f"Ingesting chunks in Elasticsearch index: {es_index}...")
|
||||||
ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries,
|
elasticsearch_docs = ingest_documents(chunked_docs, es_index, es_request_timeout, es_max_retries,
|
||||||
es_retry_on_timeout, delete_es_index)
|
es_retry_on_timeout, delete_es_index)
|
||||||
|
|
||||||
|
logger.info(f"Exporting processed documents to {output_path}...")
|
||||||
|
export_documents(elasticsearch_docs, output_path)
|
||||||
|
|
||||||
logger.info(f"Finished ingesting in {es_index}.")
|
logger.info(f"Finished ingesting in {es_index}.")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
|
import json
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from dataclasses import replace
|
from dataclasses import replace
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Any, Union
|
||||||
|
|
||||||
from chonkie import (
|
from chonkie import (
|
||||||
Chunk,
|
Chunk,
|
||||||
|
|
@ -97,6 +99,58 @@ def _merge_markdown_document(processed_doc: MarkdownDocument) -> MarkdownDocumen
|
||||||
return fused_processed_doc
|
return fused_processed_doc
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticHandshakeWithMetadata(ElasticHandshake):
|
||||||
|
"""Extended ElasticHandshake that preserves chunk metadata in Elasticsearch."""
|
||||||
|
|
||||||
|
def _create_bulk_actions(self, chunks: list[dict]) -> list[dict[str, Any]]:
|
||||||
|
"""Generate bulk actions including metadata."""
|
||||||
|
actions = []
|
||||||
|
embeddings = self.embedding_model.embed_batch([chunk["chunk"].text for chunk in chunks])
|
||||||
|
|
||||||
|
for i, chunk in enumerate(chunks):
|
||||||
|
source = {
|
||||||
|
"text": chunk["chunk"].text,
|
||||||
|
"embedding": embeddings[i],
|
||||||
|
"start_index": chunk["chunk"].start_index,
|
||||||
|
"end_index": chunk["chunk"].end_index,
|
||||||
|
"token_count": chunk["chunk"].token_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Include metadata if it exists
|
||||||
|
if chunk.get("extra_metadata"):
|
||||||
|
source.update(chunk["extra_metadata"])
|
||||||
|
|
||||||
|
actions.append({
|
||||||
|
"_index": self.index_name,
|
||||||
|
"_id": self._generate_id(i, chunk["chunk"]),
|
||||||
|
"_source": source,
|
||||||
|
})
|
||||||
|
|
||||||
|
return actions
|
||||||
|
|
||||||
|
def write(self, chunks: Union[Chunk, list[Chunk]]) -> list[dict[str, Any]]:
|
||||||
|
"""Write the chunks to the Elasticsearch index using the bulk API."""
|
||||||
|
if isinstance(chunks, Chunk):
|
||||||
|
chunks = [chunks]
|
||||||
|
|
||||||
|
actions = self._create_bulk_actions(chunks)
|
||||||
|
|
||||||
|
# Use the bulk helper to efficiently write the documents
|
||||||
|
from elasticsearch.helpers import bulk
|
||||||
|
|
||||||
|
success, errors = bulk(self.client, actions, raise_on_error=False)
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
logger.warning(f"Encountered {len(errors)} errors during bulk indexing.") # type: ignore
|
||||||
|
# Optionally log the first few errors for debugging
|
||||||
|
for i, error in enumerate(errors[:5]): # type: ignore
|
||||||
|
logger.error(f"Error {i + 1}: {error}")
|
||||||
|
|
||||||
|
logger.info(f"Chonkie wrote {success} chunks to Elasticsearch index: {self.index_name}")
|
||||||
|
|
||||||
|
return actions
|
||||||
|
|
||||||
|
|
||||||
def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Path]:
|
def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Path]:
|
||||||
"""
|
"""
|
||||||
Fetch files from a folder that match the specified extensions.
|
Fetch files from a folder that match the specified extensions.
|
||||||
|
|
@ -113,7 +167,7 @@ def fetch_documents(docs_folder_path: str, docs_extension: list[str]) -> list[Pa
|
||||||
return docs_path
|
return docs_path
|
||||||
|
|
||||||
|
|
||||||
def process_documents(docs_path: list[Path]) -> list[Chunk]:
|
def process_documents(docs_path: list[Path]) -> list[dict[str, Chunk | dict[str, Any]]]:
|
||||||
"""
|
"""
|
||||||
Process documents by applying appropriate chefs and chunking strategies based on file type.
|
Process documents by applying appropriate chefs and chunking strategies based on file type.
|
||||||
|
|
||||||
|
|
@ -121,7 +175,7 @@ def process_documents(docs_path: list[Path]) -> list[Chunk]:
|
||||||
docs_path (list[Path]): List of Paths to the documents to be processed
|
docs_path (list[Path]): List of Paths to the documents to be processed
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of processed documents ready for ingestion
|
List of dicts with "chunk" (Chunk object) and "metadata" (dict with file info)
|
||||||
"""
|
"""
|
||||||
processed_docs = []
|
processed_docs = []
|
||||||
custom_tokenizer = AutoTokenizer.from_pretrained(settings.hf_emb_model_name)
|
custom_tokenizer = AutoTokenizer.from_pretrained(settings.hf_emb_model_name)
|
||||||
|
|
@ -131,33 +185,40 @@ def process_documents(docs_path: list[Path]) -> list[Chunk]:
|
||||||
|
|
||||||
for doc_path in docs_path:
|
for doc_path in docs_path:
|
||||||
doc_extension = doc_path.suffix.lower()
|
doc_extension = doc_path.suffix.lower()
|
||||||
|
filename = doc_path.name
|
||||||
|
|
||||||
if doc_extension == ".md":
|
if doc_extension == ".md":
|
||||||
processed_doc = chef_md.process(doc_path)
|
processed_doc = chef_md.process(doc_path)
|
||||||
fused_doc = _merge_markdown_document(processed_doc)
|
fused_doc = _merge_markdown_document(processed_doc)
|
||||||
processed_docs.extend(fused_doc.chunks)
|
chunked_doc = fused_doc.chunks
|
||||||
|
|
||||||
elif doc_extension == ".avap":
|
elif doc_extension == ".avap":
|
||||||
processed_doc = chef_txt.process(doc_path)
|
processed_doc = chef_txt.process(doc_path)
|
||||||
chunked_doc = chunker.chunk(processed_doc.content)
|
chunked_doc = chunker.chunk(processed_doc.content)
|
||||||
processed_docs.extend(chunked_doc)
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for chunk in chunked_doc:
|
||||||
|
processed_docs.append({
|
||||||
|
"chunk": chunk,
|
||||||
|
"extra_metadata": {"file": filename}
|
||||||
|
})
|
||||||
|
|
||||||
return processed_docs
|
return processed_docs
|
||||||
|
|
||||||
|
|
||||||
def ingest_documents(
|
def ingest_documents(
|
||||||
chunked_docs: list[Chunk],
|
chunked_docs: list[dict[str, Chunk | dict[str, Any]]],
|
||||||
es_index: str,
|
es_index: str,
|
||||||
es_request_timeout: int,
|
es_request_timeout: int,
|
||||||
es_max_retries: int,
|
es_max_retries: int,
|
||||||
es_retry_on_timeout: bool,
|
es_retry_on_timeout: bool,
|
||||||
delete_es_index: bool,
|
delete_es_index: bool,
|
||||||
) -> None:
|
) -> list[dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Ingest processed documents into an Elasticsearch index.
|
Ingest processed documents into an Elasticsearch index.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
chunked_docs (list[Chunk]): List of processed document chunks to be ingested
|
chunked_docs (list[dict[str, Any]]): List of dicts with "chunk" and "metadata" keys
|
||||||
es_index (str): Name of the Elasticsearch index to ingest into
|
es_index (str): Name of the Elasticsearch index to ingest into
|
||||||
es_request_timeout (int): Timeout for Elasticsearch requests in seconds
|
es_request_timeout (int): Timeout for Elasticsearch requests in seconds
|
||||||
es_max_retries (int): Maximum number of retries for Elasticsearch requests
|
es_max_retries (int): Maximum number of retries for Elasticsearch requests
|
||||||
|
|
@ -165,7 +226,7 @@ def ingest_documents(
|
||||||
delete_es_index (bool): Whether to delete the existing Elasticsearch index before ingestion
|
delete_es_index (bool): Whether to delete the existing Elasticsearch index before ingestion
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
None
|
List of dicts with Elasticsearch response for each chunk
|
||||||
"""
|
"""
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Instantiating Elasticsearch client with URL: {settings.elasticsearch_local_url}..."
|
f"Instantiating Elasticsearch client with URL: {settings.elasticsearch_local_url}..."
|
||||||
|
|
@ -181,7 +242,7 @@ def ingest_documents(
|
||||||
logger.info(f"Deleting existing Elasticsearch index: {es_index}...")
|
logger.info(f"Deleting existing Elasticsearch index: {es_index}...")
|
||||||
es.indices.delete(index=es_index)
|
es.indices.delete(index=es_index)
|
||||||
|
|
||||||
handshake = ElasticHandshake(
|
handshake = ElasticHandshakeWithMetadata(
|
||||||
client=es,
|
client=es,
|
||||||
index_name=es_index,
|
index_name=es_index,
|
||||||
embedding_model=OllamaEmbeddings(model=settings.ollama_emb_model_name),
|
embedding_model=OllamaEmbeddings(model=settings.ollama_emb_model_name),
|
||||||
|
|
@ -190,4 +251,27 @@ def ingest_documents(
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Ingesting {len(chunked_docs)} chunks into Elasticsearch index: {es_index}..."
|
f"Ingesting {len(chunked_docs)} chunks into Elasticsearch index: {es_index}..."
|
||||||
)
|
)
|
||||||
handshake.write(chunked_docs)
|
elasticsearch_chunks = handshake.write(chunked_docs)
|
||||||
|
|
||||||
|
return elasticsearch_chunks
|
||||||
|
|
||||||
|
|
||||||
|
def export_documents(elasticsearch_chunks: list[dict[str, Any]], output_path: str) -> None:
|
||||||
|
"""
|
||||||
|
Export processed documents to JSON files in the specified output folder.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
elasticsearch_chunks (list[dict[str, Any]]): List of dicts with Elasticsearch response for each chunk
|
||||||
|
output_path (str): Path to the file where the JSON will be saved
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
output_path = settings.proj_root / output_path
|
||||||
|
|
||||||
|
for chunk in elasticsearch_chunks:
|
||||||
|
chunk["_source"]["embedding"] = chunk["_source"]["embedding"].tolist() # For JSON serialization
|
||||||
|
|
||||||
|
with output_path.open("w", encoding="utf-8") as f:
|
||||||
|
json.dump(elasticsearch_chunks, f, ensure_ascii=False, indent=4)
|
||||||
|
|
||||||
|
logger.info(f"Exported processed documents to {output_path}")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue