75 lines
2.3 KiB
Python
75 lines
2.3 KiB
Python
import hashlib
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from elasticsearch import Elasticsearch
|
|
from langchain_elasticsearch import ElasticsearchStore
|
|
from langchain_ollama import OllamaEmbeddings
|
|
from scripts.pipelines.tasks.chunks import build_chunks_from_folder
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL")
|
|
OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL")
|
|
ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX")
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL")
|
|
OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME")
|
|
PROJ_ROOT = Path(__file__).resolve().parents[3]
|
|
|
|
|
|
def elasticsearch_ingestion(
|
|
docs_folder_path: str = "ingestion",
|
|
es_request_timeout: int = 120,
|
|
es_max_retries: int = 5,
|
|
es_retry_on_timeout: bool = True,
|
|
distance_strategy: str = "COSINE",
|
|
):
|
|
logger.info("Starting Elasticsearch ingestion pipeline...")
|
|
logger.info(f"Using docs folder path: {PROJ_ROOT / docs_folder_path}")
|
|
chunks = build_chunks_from_folder(folder_path=PROJ_ROOT / docs_folder_path)
|
|
|
|
logger.info("Connecting to Elasticsearch...")
|
|
try:
|
|
es = Elasticsearch(
|
|
ELASTICSEARCH_LOCAL_URL,
|
|
request_timeout=es_request_timeout,
|
|
max_retries=es_max_retries,
|
|
retry_on_timeout=es_retry_on_timeout,
|
|
)
|
|
except:
|
|
logger.exception("Failed to connect to Elasticsearch.")
|
|
raise
|
|
|
|
logger.info("Instantiating embeddings model...")
|
|
try:
|
|
embeddings = OllamaEmbeddings(
|
|
base_url=OLLAMA_LOCAL_URL, model=OLLAMA_EMB_MODEL_NAME
|
|
)
|
|
except:
|
|
logger.exception("Failed to instantiate embeddings model.")
|
|
raise
|
|
|
|
logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...")
|
|
ElasticsearchStore.from_documents(
|
|
chunks,
|
|
embeddings,
|
|
client=es,
|
|
index_name=ELASTICSEARCH_INDEX,
|
|
distance_strategy=distance_strategy,
|
|
)
|
|
logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.")
|
|
logger.info(f"Total documents uploaded: {len(chunks)}.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
|
)
|
|
try:
|
|
elasticsearch_ingestion()
|
|
except Exception as exc:
|
|
logger.exception(exc)
|
|
raise
|