assistance-engine/scripts/pipelines/flows/elasticsearch_ingestion.py

75 lines
2.3 KiB
Python

import hashlib
import logging
import os
from pathlib import Path
from elasticsearch import Elasticsearch
from langchain_elasticsearch import ElasticsearchStore
from langchain_ollama import OllamaEmbeddings
from scripts.pipelines.tasks.chunks import build_chunks_from_folder
logger = logging.getLogger(__name__)
ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL")
OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL")
ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX")
OLLAMA_URL = os.getenv("OLLAMA_URL")
OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME")
PROJ_ROOT = Path(__file__).resolve().parents[3]
def elasticsearch_ingestion(
docs_folder_path: str = "ingestion",
es_request_timeout: int = 120,
es_max_retries: int = 5,
es_retry_on_timeout: bool = True,
distance_strategy: str = "COSINE",
):
logger.info("Starting Elasticsearch ingestion pipeline...")
logger.info(f"Using docs folder path: {PROJ_ROOT / docs_folder_path}")
chunks = build_chunks_from_folder(folder_path=PROJ_ROOT / docs_folder_path)
logger.info("Connecting to Elasticsearch...")
try:
es = Elasticsearch(
ELASTICSEARCH_LOCAL_URL,
request_timeout=es_request_timeout,
max_retries=es_max_retries,
retry_on_timeout=es_retry_on_timeout,
)
except:
logger.exception("Failed to connect to Elasticsearch.")
raise
logger.info("Instantiating embeddings model...")
try:
embeddings = OllamaEmbeddings(
base_url=OLLAMA_LOCAL_URL, model=OLLAMA_EMB_MODEL_NAME
)
except:
logger.exception("Failed to instantiate embeddings model.")
raise
logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...")
ElasticsearchStore.from_documents(
chunks,
embeddings,
client=es,
index_name=ELASTICSEARCH_INDEX,
distance_strategy=distance_strategy,
)
logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.")
logger.info(f"Total documents uploaded: {len(chunks)}.")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
try:
elasticsearch_ingestion()
except Exception as exc:
logger.exception(exc)
raise