feat: implement Elasticsearch ingestion pipeline and embedding factories
This commit is contained in:
parent
8914acbb95
commit
a4267e1b60
|
|
@ -64,8 +64,7 @@ graph TD
|
||||||
│ └── kubeconfig.yaml # Kubernetes cluster configuration
|
│ └── kubeconfig.yaml # Kubernetes cluster configuration
|
||||||
├── scripts/
|
├── scripts/
|
||||||
│ └── pipelines/
|
│ └── pipelines/
|
||||||
│ ├── flows/ # Data processing flows
|
│ └── flows/ # Data processing flows
|
||||||
│ └── tasks/ # Pipeline task definitions
|
|
||||||
└── src/
|
└── src/
|
||||||
├── __init__.py
|
├── __init__.py
|
||||||
└── utils/
|
└── utils/
|
||||||
|
|
@ -164,13 +163,13 @@ Open a terminal and establish the connection to the Devaron Cluster:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# 1. AI Model Tunnel (Ollama)
|
# 1. AI Model Tunnel (Ollama)
|
||||||
kubectl port-forward --address 0.0.0.0 svc/ollama-light-service 11434:11434 -n brunix --kubeconfig ./kubernetes/kubeconfig.yaml &
|
kubectl port-forward --address 0.0.0.0 svc/ollama-light-service 11434:11434 -n brunix --kubeconfig ./kubernetes/ivar.yaml &
|
||||||
|
|
||||||
# 2. Knowledge Base Tunnel (Elasticsearch)
|
# 2. Knowledge Base Tunnel (Elasticsearch)
|
||||||
kubectl port-forward --address 0.0.0.0 svc/brunix-vector-db 9200:9200 -n brunix --kubeconfig ./kubernetes/kubeconfig.yaml &
|
kubectl port-forward --address 0.0.0.0 svc/brunix-vector-db 9200:9200 -n brunix --kubeconfig ./kubernetes/ivar.yaml &
|
||||||
|
|
||||||
# 3. Observability DB Tunnel (PostgreSQL)
|
# 3. Observability DB Tunnel (PostgreSQL)
|
||||||
kubectl port-forward --address 0.0.0.0 svc/brunix-postgres 5432:5432 -n brunix --kubeconfig ./kubernetes/kubeconfig.yaml &
|
kubectl port-forward --address 0.0.0.0 svc/brunix-postgres 5432:5432 -n brunix --kubeconfig ./kubernetes/ivar.yaml &
|
||||||
```
|
```
|
||||||
|
|
||||||
### 5. Launch the Engine
|
### 5. Launch the Engine
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,7 @@ All notable changes to the **Brunix Assistance Engine** will be documented in th
|
||||||
- `src/graph.py`: workflow graph orchestration module added.
|
- `src/graph.py`: workflow graph orchestration module added.
|
||||||
- `src/prompts.py`: centralized prompt definitions added.
|
- `src/prompts.py`: centralized prompt definitions added.
|
||||||
- `src/state.py`: shared state management module added.
|
- `src/state.py`: shared state management module added.
|
||||||
- `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database.
|
- `pipelines/flows/elasticsearch_ingestion.py`: pipeline to populate the elasticsearch vector database.
|
||||||
- `pipelines/tasks/chunks.py`: module with functions related to chunk management.
|
|
||||||
- `ingestion/docs`: folder containing all chunked AVAP documents.
|
- `ingestion/docs`: folder containing all chunked AVAP documents.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,124 @@
|
||||||
|
import re
|
||||||
|
import hashlib
|
||||||
|
from typing import Any
|
||||||
|
from enum import Enum
|
||||||
|
import typer
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from elasticsearch import Elasticsearch
|
||||||
|
from langchain_core.documents import Document
|
||||||
|
from langchain_elasticsearch import ElasticsearchStore
|
||||||
|
from src.utils.emb_factory import create_embedding_model
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
app = typer.Typer()
|
||||||
|
|
||||||
|
ELASTICSEARCH_LOCAL_URL = os.getenv("ELASTICSEARCH_LOCAL_URL")
|
||||||
|
OLLAMA_LOCAL_URL = os.getenv("OLLAMA_LOCAL_URL")
|
||||||
|
ELASTICSEARCH_INDEX = os.getenv("ELASTICSEARCH_INDEX")
|
||||||
|
OLLAMA_URL = os.getenv("OLLAMA_URL")
|
||||||
|
OLLAMA_EMB_MODEL_NAME = os.getenv("OLLAMA_EMB_MODEL_NAME")
|
||||||
|
|
||||||
|
class DistanceStrategy(str, Enum):
|
||||||
|
euclidean = "EUCLIDEAN_DISTANCE"
|
||||||
|
max_inner_product = "MAX_INNER_PRODUCT"
|
||||||
|
dot_product = "DOT_PRODUCT"
|
||||||
|
jaccard = "JACCARD"
|
||||||
|
cosine = "COSINE"
|
||||||
|
|
||||||
|
def clean_text(text: str) -> str:
|
||||||
|
text = text.replace("\u00a0", " ")
|
||||||
|
text = re.sub(r"\s+", " ", text).strip()
|
||||||
|
return text
|
||||||
|
|
||||||
|
def build_documents_from_folder(
|
||||||
|
folder_path: str,
|
||||||
|
) -> list[Document]:
|
||||||
|
|
||||||
|
folder = Path(folder_path)
|
||||||
|
|
||||||
|
if not folder.exists() or not folder.is_dir():
|
||||||
|
raise ValueError(f"Invalid folder path: {folder_path}")
|
||||||
|
|
||||||
|
all_documents: list[Document] = []
|
||||||
|
|
||||||
|
for file_path in folder.glob("*.txt"):
|
||||||
|
doc_text = file_path.read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
if not doc_text.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
metadata: dict[str, Any] = {
|
||||||
|
"source": file_path.name,
|
||||||
|
}
|
||||||
|
|
||||||
|
doc_text = clean_text(doc_text)
|
||||||
|
document = Document(
|
||||||
|
id=hashlib.md5(file_path.name.encode()).hexdigest(),
|
||||||
|
page_content=doc_text,
|
||||||
|
metadata={**metadata}
|
||||||
|
)
|
||||||
|
|
||||||
|
all_documents.append(document)
|
||||||
|
|
||||||
|
return all_documents
|
||||||
|
|
||||||
|
|
||||||
|
@app.command()
|
||||||
|
def elasticsearch_ingestion(
|
||||||
|
docs_folder_path: str = "ingestion/docs",
|
||||||
|
es_request_timeout: int = 120,
|
||||||
|
es_max_retries: int = 5,
|
||||||
|
es_retry_on_timeout: bool = True,
|
||||||
|
distance_strategy: DistanceStrategy = DistanceStrategy.cosine,
|
||||||
|
):
|
||||||
|
logger.info("Starting Elasticsearch ingestion pipeline...")
|
||||||
|
logger.info(f"Using docs folder path: {docs_folder_path}")
|
||||||
|
documents = build_documents_from_folder(folder_path=docs_folder_path)
|
||||||
|
|
||||||
|
logger.info("Connecting to Elasticsearch...")
|
||||||
|
try:
|
||||||
|
es = Elasticsearch(
|
||||||
|
ELASTICSEARCH_LOCAL_URL,
|
||||||
|
request_timeout=es_request_timeout,
|
||||||
|
max_retries=es_max_retries,
|
||||||
|
retry_on_timeout=es_retry_on_timeout,
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception("Failed to connect to Elasticsearch.")
|
||||||
|
raise
|
||||||
|
|
||||||
|
logger.info("Instantiating embeddings model...")
|
||||||
|
try:
|
||||||
|
embeddings = create_embedding_model(
|
||||||
|
provider="ollama",
|
||||||
|
model=OLLAMA_EMB_MODEL_NAME,
|
||||||
|
base_url=OLLAMA_LOCAL_URL,
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
logger.exception("Failed to instantiate embeddings model.")
|
||||||
|
raise
|
||||||
|
|
||||||
|
logger.info(f"Uploading documents to index {ELASTICSEARCH_INDEX}...")
|
||||||
|
ElasticsearchStore.from_documents(
|
||||||
|
documents,
|
||||||
|
embeddings,
|
||||||
|
client=es,
|
||||||
|
index_name=ELASTICSEARCH_INDEX,
|
||||||
|
distance_strategy=distance_strategy.value,
|
||||||
|
)
|
||||||
|
logger.info(f"Finished uploading documents to index {ELASTICSEARCH_INDEX}.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
app()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception(exc)
|
||||||
|
raise
|
||||||
|
|
@ -0,0 +1,67 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
|
||||||
|
class BaseEmbeddingFactory(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class OpenAIEmbeddingFactory(BaseEmbeddingFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_openai import OpenAIEmbeddings
|
||||||
|
|
||||||
|
return OpenAIEmbeddings(model=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class OllamaEmbeddingFactory(BaseEmbeddingFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_ollama import OllamaEmbeddings
|
||||||
|
|
||||||
|
return OllamaEmbeddings(model=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class BedrockEmbeddingFactory(BaseEmbeddingFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_aws import BedrockEmbeddings
|
||||||
|
|
||||||
|
return BedrockEmbeddings(model_id=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class HuggingFaceEmbeddingFactory(BaseEmbeddingFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_huggingface import HuggingFaceEmbeddings
|
||||||
|
|
||||||
|
return HuggingFaceEmbeddings(model_name=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
EMBEDDING_FACTORIES: Dict[str, BaseEmbeddingFactory] = {
|
||||||
|
"openai": OpenAIEmbeddingFactory(),
|
||||||
|
"ollama": OllamaEmbeddingFactory(),
|
||||||
|
"bedrock": BedrockEmbeddingFactory(),
|
||||||
|
"huggingface": HuggingFaceEmbeddingFactory(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_embedding_model(provider: str, model: str, **kwargs: Any):
|
||||||
|
"""
|
||||||
|
Create an embedding model instance for the given provider.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
provider: The provider name (openai, ollama, bedrock, huggingface).
|
||||||
|
model: The model identifier.
|
||||||
|
**kwargs: Additional keyword arguments passed to the model constructor.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
An embedding model instance.
|
||||||
|
"""
|
||||||
|
key = provider.strip().lower()
|
||||||
|
|
||||||
|
if key not in EMBEDDING_FACTORIES:
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported embedding provider: {provider}. "
|
||||||
|
f"Available providers: {list(EMBEDDING_FACTORIES.keys())}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return EMBEDDING_FACTORIES[key].create(model=model, **kwargs)
|
||||||
|
|
@ -0,0 +1,72 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
|
|
||||||
|
class BaseProviderFactory(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class OpenAIChatFactory(BaseProviderFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_openai import ChatOpenAI
|
||||||
|
|
||||||
|
return ChatOpenAI(model=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class OllamaChatFactory(BaseProviderFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_ollama import ChatOllama
|
||||||
|
|
||||||
|
return ChatOllama(model=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class BedrockChatFactory(BaseProviderFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_aws import ChatBedrockConverse
|
||||||
|
|
||||||
|
return ChatBedrockConverse(model=model, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class HuggingFaceChatFactory(BaseProviderFactory):
|
||||||
|
def create(self, model: str, **kwargs: Any):
|
||||||
|
from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline
|
||||||
|
|
||||||
|
llm = HuggingFacePipeline.from_model_id(
|
||||||
|
model_id=model,
|
||||||
|
task="text-generation",
|
||||||
|
pipeline_kwargs=kwargs,
|
||||||
|
)
|
||||||
|
return ChatHuggingFace(llm=llm)
|
||||||
|
|
||||||
|
|
||||||
|
CHAT_FACTORIES: Dict[str, BaseProviderFactory] = {
|
||||||
|
"openai": OpenAIChatFactory(),
|
||||||
|
"ollama": OllamaChatFactory(),
|
||||||
|
"bedrock": BedrockChatFactory(),
|
||||||
|
"huggingface": HuggingFaceChatFactory(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_chat_model(provider: str, model: str, **kwargs: Any):
|
||||||
|
"""
|
||||||
|
Create a chat model instance for the given provider.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
provider: The provider name (openai, ollama, bedrock, huggingface).
|
||||||
|
model: The model identifier.
|
||||||
|
**kwargs: Additional keyword arguments passed to the model constructor.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A chat model instance.
|
||||||
|
"""
|
||||||
|
key = provider.strip().lower()
|
||||||
|
|
||||||
|
if key not in CHAT_FACTORIES:
|
||||||
|
raise ValueError(
|
||||||
|
f"Unsupported chat provider: {provider}. "
|
||||||
|
f"Available providers: {list(CHAT_FACTORIES.keys())}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return CHAT_FACTORIES[key].create(model=model, **kwargs)
|
||||||
Loading…
Reference in New Issue