assistance-engine/Docker/src/server.py

96 lines
2.9 KiB
Python

import logging
import os
from concurrent import futures
from dotenv import load_dotenv
load_dotenv()
import brunix_pb2
import brunix_pb2_grpc
import grpc
from grpc_reflection.v1alpha import reflection
from langchain_elasticsearch import ElasticsearchStore
from utils.llm_factory import create_chat_model
from utils.emb_factory import create_embedding_model
from graph import build_graph
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("brunix-engine")
class BrunixEngine(brunix_pb2_grpc.AssistanceEngineServicer):
def __init__(self):
self.llm = create_chat_model(
provider="ollama",
model=os.getenv("OLLAMA_MODEL_NAME"),
base_url=os.getenv("OLLAMA_URL"),
temperature=0,
validate_model_on_init=True,
)
self.embeddings = create_embedding_model(
provider="ollama",
model=os.getenv("OLLAMA_EMB_MODEL_NAME"),
base_url=os.getenv("OLLAMA_URL"),
)
self.vector_store = ElasticsearchStore(
es_url=os.getenv("ELASTICSEARCH_URL"),
index_name=os.getenv("ELASTICSEARCH_INDEX"),
embedding=self.embeddings,
query_field="text",
vector_query_field="embedding",
)
self.graph = build_graph(
llm=self.llm,
vector_store=self.vector_store
)
logger.info("Brunix Engine initializing.")
def AskAgent(self, request, context):
logger.info(f"request {request.session_id}): {request.query[:50]}.")
try:
final_state = self.graph.invoke({"messages": [{"role": "user",
"content": request.query}]})
messages = final_state.get("messages", [])
last_msg = messages[-1] if messages else None
result_text = getattr(last_msg, "content", str(last_msg)) if last_msg else ""
yield brunix_pb2.AgentResponse(
text=result_text,
avap_code="AVAP-2026",
is_final=True,
)
yield brunix_pb2.AgentResponse(text="", avap_code="", is_final=True)
except Exception as e:
logger.error(f"Error in AskAgent: {str(e)}", exc_info=True)
yield brunix_pb2.AgentResponse(
text=f"[Error Motor]: {str(e)}",
is_final=True,
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
brunix_pb2_grpc.add_AssistanceEngineServicer_to_server(BrunixEngine(), server)
SERVICE_NAMES = (
brunix_pb2.DESCRIPTOR.services_by_name["AssistanceEngine"].full_name,
reflection.SERVICE_NAME,
)
reflection.enable_server_reflection(SERVICE_NAMES, server)
server.add_insecure_port("[::]:50051")
logger.info("Brunix Engine on port 50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()