assistance-engine/scripts/pipelines/flows/elasticsearch_ingestion.py

130 lines
3.7 KiB
Python

import re
import hashlib
from typing import Any
from enum import Enum
import typer
import logging
import os
from pathlib import Path
from loguru import logger
from elasticsearch import Elasticsearch
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
from src.utils.emb_factory import create_embedding_model
from scripts.pipelines.tasks.chunk import scrape_avap_docs
app = typer.Typer()
ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL")
OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL")
ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX")
OLLAMA_URL = os.getenv("OLLAMA_URL")
OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME")
AVAP_WEB_DOCS_URL = os.getenv("AVAP_WEB_DOCS_URL")
class DistanceStrategy(str, Enum):
euclidean = "EUCLIDEAN_DISTANCE"
max_inner_product = "MAX_INNER_PRODUCT"
dot_product = "DOT_PRODUCT"
jaccard = "JACCARD"
cosine = "COSINE"
def clean_text(text: str) -> str:
text = text.replace("\u00a0", " ")
text = re.sub(r"\s+", " ", text).strip()
return text
def build_documents_from_folder(
folder_path: str,
) -> list[Document]:
folder = Path(folder_path)
if not folder.exists() or not folder.is_dir():
raise ValueError(f"Invalid folder path: {folder_path}")
all_documents: list[Document] = []
for file_path in folder.glob("*.txt"):
doc_text = file_path.read_text(encoding="utf-8")
if not doc_text.strip():
continue
metadata: dict[str, Any] = {
"source": file_path.name,
}
doc_text = clean_text(doc_text)
document = Document(
id=hashlib.md5(file_path.name.encode()).hexdigest(),
page_content=doc_text,
metadata={**metadata}
)
all_documents.append(document)
return all_documents
@app.command()
def elasticsearch_ingestion(
docs_folder_path: str = "ingestion/docs",
es_request_timeout: int = 120,
es_max_retries: int = 5,
es_retry_on_timeout: bool = True,
distance_strategy: DistanceStrategy = DistanceStrategy.cosine,
):
logger.info("Starting Elasticsearch ingestion pipeline...")
logger.info(f"Using docs folder path: {docs_folder_path}")
documents = build_documents_from_folder(folder_path=docs_folder_path)
logger.info("Connecting to Elasticsearch...")
try:
es = Elasticsearch(
ELASTICSEARCH_LOCAL_URL,
request_timeout=es_request_timeout,
max_retries=es_max_retries,
retry_on_timeout=es_retry_on_timeout,
)
except:
logger.exception("Failed to connect to Elasticsearch.")
raise
logger.info("Instantiating embeddings model...")
try:
embeddings = create_embedding_model(
provider="ollama",
model=OLLAMA_EMB_MODEL_NAME,
base_url=OLLAMA_LOCAL_URL,
)
except:
logger.exception("Failed to instantiate embeddings model.")
raise
logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...")
ElasticsearchStore.from_documents(
documents,
embeddings,
client=es,
index_name=ELASTICSEARCH_INDEX,
distance_strategy=distance_strategy.value,
)
logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
try:
app()
except Exception as exc:
logger.exception(exc)
raise