diff --git a/README.md b/README.md index 5841fbb..92bec97 100644 --- a/README.md +++ b/README.md @@ -64,8 +64,7 @@ graph TD │ └── kubeconfig.yaml # Kubernetes cluster configuration ├── scripts/ │ └── pipelines/ -│ ├── flows/ # Data processing flows -│ └── tasks/ # Pipeline task definitions +│ └── flows/ # Data processing flows └── src/ ├── __init__.py └── utils/ @@ -164,13 +163,13 @@ Open a terminal and establish the connection to the Devaron Cluster: ```bash # 1. AI Model Tunnel (Ollama) -kubectl port-forward --address 0.0.0.0 svc/ollama-light-service 11434:11434 -n brunix --kubeconfig ./kubernetes/kubeconfig.yaml & +kubectl port-forward --address 0.0.0.0 svc/ollama-light-service 11434:11434 -n brunix --kubeconfig ./kubernetes/ivar.yaml & # 2. Knowledge Base Tunnel (Elasticsearch) -kubectl port-forward --address 0.0.0.0 svc/brunix-vector-db 9200:9200 -n brunix --kubeconfig ./kubernetes/kubeconfig.yaml & +kubectl port-forward --address 0.0.0.0 svc/brunix-vector-db 9200:9200 -n brunix --kubeconfig ./kubernetes/ivar.yaml & # 3. Observability DB Tunnel (PostgreSQL) -kubectl port-forward --address 0.0.0.0 svc/brunix-postgres 5432:5432 -n brunix --kubeconfig ./kubernetes/kubeconfig.yaml & +kubectl port-forward --address 0.0.0.0 svc/brunix-postgres 5432:5432 -n brunix --kubeconfig ./kubernetes/ivar.yaml & ``` ### 5. Launch the Engine diff --git a/changelog b/changelog index d2d9e04..f060d26 100644 --- a/changelog +++ b/changelog @@ -13,8 +13,7 @@ All notable changes to the **Brunix Assistance Engine** will be documented in th - `src/graph.py`: workflow graph orchestration module added. - `src/prompts.py`: centralized prompt definitions added. - `src/state.py`: shared state management module added. - - `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. - - `pipelines/tasks/chunks.py`: module with functions related to chunk management. + - `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database. - `ingestion/docs`: folder containing all chunked AVAP documents. ### Changed diff --git a/scripts/pipelines/flows/elasticsearch_ingestion.py b/scripts/pipelines/flows/elasticsearch_ingestion.py new file mode 100644 index 0000000..6227e7c --- /dev/null +++ b/scripts/pipelines/flows/elasticsearch_ingestion.py @@ -0,0 +1,124 @@ +import re +import hashlib +from typing import Any +from enum import Enum +import typer +import logging +import os +from pathlib import Path + +from elasticsearch import Elasticsearch +from langchain_core.documents import Document +from langchain_elasticsearch import ElasticsearchStore +from src.utils.emb_factory import create_embedding_model + +logger = logging.getLogger(__name__) +app = typer.Typer() + +ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL") +OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL") +ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX") +OLLAMA_URL = os.getenv("OLLAMA_URL") +OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME") + +class DistanceStrategy(str, Enum): + euclidean = "EUCLIDEAN_DISTANCE" + max_inner_product = "MAX_INNER_PRODUCT" + dot_product = "DOT_PRODUCT" + jaccard = "JACCARD" + cosine = "COSINE" + +def clean_text(text: str) -> str: + text = text.replace("\u00a0", " ") + text = re.sub(r"\s+", " ", text).strip() + return text + +def build_documents_from_folder( + folder_path: str, +) -> list[Document]: + + folder = Path(folder_path) + + if not folder.exists() or not folder.is_dir(): + raise ValueError(f"Invalid folder path: {folder_path}") + + all_documents: list[Document] = [] + + for file_path in folder.glob("*.txt"): + doc_text = file_path.read_text(encoding="utf-8") + + if not doc_text.strip(): + continue + + metadata: dict[str, Any] = { + "source": file_path.name, + } + + doc_text = clean_text(doc_text) + document = Document( + id=hashlib.md5(file_path.name.encode()).hexdigest(), + page_content=doc_text, + metadata={**metadata} + ) + + all_documents.append(document) + + return all_documents + + +@app.command() +def elasticsearch_ingestion( + docs_folder_path: str = "ingestion/docs", + es_request_timeout: int = 120, + es_max_retries: int = 5, + es_retry_on_timeout: bool = True, + distance_strategy: DistanceStrategy = DistanceStrategy.cosine, +): + logger.info("Starting Elasticsearch ingestion pipeline...") + logger.info(f"Using docs folder path: {docs_folder_path}") + documents = build_documents_from_folder(folder_path=docs_folder_path) + + logger.info("Connecting to Elasticsearch...") + try: + es = Elasticsearch( + ELASTICSEARCH_LOCAL_URL, + request_timeout=es_request_timeout, + max_retries=es_max_retries, + retry_on_timeout=es_retry_on_timeout, + ) + except: + logger.exception("Failed to connect to Elasticsearch.") + raise + + logger.info("Instantiating embeddings model...") + try: + embeddings = create_embedding_model( + provider="ollama", + model=OLLAMA_EMB_MODEL_NAME, + base_url=OLLAMA_LOCAL_URL, + ) + except: + logger.exception("Failed to instantiate embeddings model.") + raise + + logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...") + ElasticsearchStore.from_documents( + documents, + embeddings, + client=es, + index_name=ELASTICSEARCH_INDEX, + distance_strategy=distance_strategy.value, + ) + logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.") + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + ) + try: + app() + except Exception as exc: + logger.exception(exc) + raise diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/emb_factory.py b/src/utils/emb_factory.py new file mode 100644 index 0000000..d9fb9de --- /dev/null +++ b/src/utils/emb_factory.py @@ -0,0 +1,67 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict + + +class BaseEmbeddingFactory(ABC): + @abstractmethod + def create(self, model: str, **kwargs: Any): + raise NotImplementedError + + +class OpenAIEmbeddingFactory(BaseEmbeddingFactory): + def create(self, model: str, **kwargs: Any): + from langchain_openai import OpenAIEmbeddings + + return OpenAIEmbeddings(model=model, **kwargs) + + +class OllamaEmbeddingFactory(BaseEmbeddingFactory): + def create(self, model: str, **kwargs: Any): + from langchain_ollama import OllamaEmbeddings + + return OllamaEmbeddings(model=model, **kwargs) + + +class BedrockEmbeddingFactory(BaseEmbeddingFactory): + def create(self, model: str, **kwargs: Any): + from langchain_aws import BedrockEmbeddings + + return BedrockEmbeddings(model_id=model, **kwargs) + + +class HuggingFaceEmbeddingFactory(BaseEmbeddingFactory): + def create(self, model: str, **kwargs: Any): + from langchain_huggingface import HuggingFaceEmbeddings + + return HuggingFaceEmbeddings(model_name=model, **kwargs) + + +EMBEDDING_FACTORIES: Dict[str, BaseEmbeddingFactory] = { + "openai": OpenAIEmbeddingFactory(), + "ollama": OllamaEmbeddingFactory(), + "bedrock": BedrockEmbeddingFactory(), + "huggingface": HuggingFaceEmbeddingFactory(), +} + + +def create_embedding_model(provider: str, model: str, **kwargs: Any): + """ + Create an embedding model instance for the given provider. + + Args: + provider: The provider name (openai, ollama, bedrock, huggingface). + model: The model identifier. + **kwargs: Additional keyword arguments passed to the model constructor. + + Returns: + An embedding model instance. + """ + key = provider.strip().lower() + + if key not in EMBEDDING_FACTORIES: + raise ValueError( + f"Unsupported embedding provider: {provider}. " + f"Available providers: {list(EMBEDDING_FACTORIES.keys())}" + ) + + return EMBEDDING_FACTORIES[key].create(model=model, **kwargs) diff --git a/src/utils/llm_factory.py b/src/utils/llm_factory.py new file mode 100644 index 0000000..8b1c13c --- /dev/null +++ b/src/utils/llm_factory.py @@ -0,0 +1,72 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict + + +class BaseProviderFactory(ABC): + @abstractmethod + def create(self, model: str, **kwargs: Any): + raise NotImplementedError + + +class OpenAIChatFactory(BaseProviderFactory): + def create(self, model: str, **kwargs: Any): + from langchain_openai import ChatOpenAI + + return ChatOpenAI(model=model, **kwargs) + + +class OllamaChatFactory(BaseProviderFactory): + def create(self, model: str, **kwargs: Any): + from langchain_ollama import ChatOllama + + return ChatOllama(model=model, **kwargs) + + +class BedrockChatFactory(BaseProviderFactory): + def create(self, model: str, **kwargs: Any): + from langchain_aws import ChatBedrockConverse + + return ChatBedrockConverse(model=model, **kwargs) + + +class HuggingFaceChatFactory(BaseProviderFactory): + def create(self, model: str, **kwargs: Any): + from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline + + llm = HuggingFacePipeline.from_model_id( + model_id=model, + task="text-generation", + pipeline_kwargs=kwargs, + ) + return ChatHuggingFace(llm=llm) + + +CHAT_FACTORIES: Dict[str, BaseProviderFactory] = { + "openai": OpenAIChatFactory(), + "ollama": OllamaChatFactory(), + "bedrock": BedrockChatFactory(), + "huggingface": HuggingFaceChatFactory(), +} + + +def create_chat_model(provider: str, model: str, **kwargs: Any): + """ + Create a chat model instance for the given provider. + + Args: + provider: The provider name (openai, ollama, bedrock, huggingface). + model: The model identifier. + **kwargs: Additional keyword arguments passed to the model constructor. + + Returns: + A chat model instance. + """ + key = provider.strip().lower() + + if key not in CHAT_FACTORIES: + raise ValueError( + f"Unsupported chat provider: {provider}. " + f"Available providers: {list(CHAT_FACTORIES.keys())}" + ) + + return CHAT_FACTORIES[key].create(model=model, **kwargs)