assistance-engine/scripts/pipelines/ingestion/avap_ingestor.py

452 lines
14 KiB
Python

"""
avap_ingest.py v2.0
Uso:
# Ingestar
python avap_ingest.py --chunks ingestion/chunks.jsonl --index avap-knowledge-v1
# Borrar indice y re-ingestar desde cero
python avap_ingest.py --chunks ingestion/chunks.jsonl --index avap-knowledge-v1 --delete
# Reprocesar solo los fallidos (DLQ)
python avap_ingest.py --chunks ingestion/failed_chunks.jsonl --index avap-knowledge-v1
"""
import os
import json
import time
import asyncio
import argparse
import traceback
from pathlib import Path
from datetime import datetime
from typing import AsyncGenerator
from elasticsearch import AsyncElasticsearch
import httpx
from tqdm import tqdm
from elasticsearch import helpers as es_helpers
DEFAULT_CHUNKS_PATH = "ingestion/chunks.jsonl"
DEFAULT_INDEX = "avap-knowledge-v1"
DEFAULT_OLLAMA_URL= "http://localhost:11434"
DEFAULT_OLLAMA_MODEL= "qwen3-0.6B-emb:latest"
DEFAULT_EMBEDDING_DIM= 1024
BATCH_SIZE_EMBED= 8
BATCH_SIZE_ES= 50
QUEUE_MAXSIZE= 5
MAX_RETRIES= 3
RETRY_DELAY= 2.0
OLLAMA_TIMEOUT= 120
def iter_chunks_jsonl(path, batch_size):
batch = []
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
chunk = json.loads(line)
batch.append(chunk)
if len(batch) >= batch_size:
yield batch
batch = []
except json.JSONDecodeError as e:
print(e)
if batch:
yield batch
def count_lines(path):
n = 0
with open(path, encoding="utf-8") as f:
for line in f:
if line.strip():
n += 1
return n
def build_index_mapping(embedding_dim):
return {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"avap_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop"]
}
}
}
},
"mappings": {
"properties": {
"chunk_id": {"type": "keyword"},
"content": {
"type": "text",
"analyzer": "avap_analyzer"
},
"embedding": {
"type": "dense_vector",
"dims": embedding_dim,
"index": True,
"similarity": "cosine",
"index_options": {
"type": "int8_hnsw",
"m": 16,
"ef_construction": 100
}
},
"doc_type": {"type": "keyword"},
"block_type": {"type": "keyword"},
"section": {
"type": "text",
"fields": {"keyword": {"type": "keyword"}}
},
"source_file": {"type": "keyword"},
"start_line": {"type": "integer"},
"end_line": {"type": "integer"},
"token_estimate": {"type": "integer"},
"metadata": {
"properties": {
"uses_orm": {"type": "boolean"},
"uses_http": {"type": "boolean"},
"uses_connector": {"type": "boolean"},
"uses_async": {"type": "boolean"},
"uses_crypto": {"type": "boolean"},
"uses_auth": {"type": "boolean"},
"uses_error_handling": {"type": "boolean"},
"uses_loop": {"type": "boolean"},
"uses_json": {"type": "boolean"},
"uses_list": {"type": "boolean"},
"uses_regex": {"type": "boolean"},
"uses_datetime": {"type": "boolean"},
"returns_result": {"type": "boolean"},
"registers_endpoint": {"type": "boolean"},
"has_overlap": {"type": "boolean"},
"complexity": {"type": "integer"},
"full_block_start": {"type": "integer"},
"full_block_end": {"type": "integer"},
}
}
}
}
}
class DeadLetterQueue:
def __init__(self, base_path = "ingestion"):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
self.path = Path(base_path) / f"failed_chunks_{ts}.jsonl"
self._handle = None
self.count = 0
def _open(self):
if self._handle is None:
self.path.parent.mkdir(parents=True, exist_ok=True)
self._handle = open(self.path, "w", encoding="utf-8")
def write(self, chunk, reason) -> None:
self._open()
record = {"reason": reason, "chunk": chunk}
self._handle.write(json.dumps(record, ensure_ascii=False) + "\n")
self._handle.flush()
self.count += 1
def close(self):
if self._handle:
self._handle.close()
self._handle = None
def report(self):
if self.count:
print(f"{self.count} Failed: {self.path}")
else:
print(" No failed chunks")
class OllamaAsyncEmbedder:
def __init__(self, base_url, model, timeout = OLLAMA_TIMEOUT):
self.base_url = base_url.rstrip("/")
self.model = model
self._client = httpx.AsyncClient(timeout=timeout)
async def probe_dimension(self):
vecs = await self._embed(["dimension probe"])
return len(vecs[0])
async def _embed(self, texts):
payload = {"model": self.model, "input": texts}
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = await self._client.post(
f"{self.base_url}/api/embed",
json=payload
)
resp.raise_for_status()
return resp.json()["embeddings"]
except Exception as exc:
if attempt >= MAX_RETRIES:
raise RuntimeError(f"Embeddings fail {MAX_RETRIES}: {exc}") from exc
await asyncio.sleep(RETRY_DELAY * attempt)
return []
async def embed_batch(self, chunks, dlq):
texts = [c["content"] for c in chunks]
try:
vectors = await self._embed(texts)
return list(zip(chunks, vectors))
except Exception as exc:
print(exc)
results = []
for chunk in chunks:
try:
vecs = await self._embed([chunk["content"]])
results.append((chunk, vecs[0]))
except Exception as single_exc:
dlq.write(chunk, f"Ollama embed failed: {single_exc}")
return results
async def close(self):
await self._client.aclose()
async def producer(chunks_path, embedder, queue, dlq, batch_size, pbar):
for batch in iter_chunks_jsonl(chunks_path, batch_size):
embedded = await embedder.embed_batch(batch, dlq)
if embedded:
await queue.put(embedded)
pbar.update(len(batch))
await queue.put(None)
async def consumer( queue, es_client, index, dlq, batch_size_es, stats):
buffer: list[tuple[dict, list[float]]] = []
async def flush_buffer():
if not buffer:
return
actions = [
{
"_index": index,
"_id": chunk["chunk_id"],
"_source": {
"chunk_id": chunk["chunk_id"],
"content": chunk["content"],
"embedding": vector,
"doc_type": chunk.get("doc_type", "unknown"),
"block_type": chunk.get("block_type", ""),
"section": chunk.get("section", ""),
"source_file": chunk.get("source_file", ""),
"start_line": chunk.get("start_line", 0),
"end_line": chunk.get("end_line", 0),
"token_estimate": chunk.get("token_estimate", 0),
"metadata": chunk.get("metadata", {}),
}
}
for chunk, vector in buffer
]
try:
ok, errors = await es_helpers.async_bulk(
es_client, actions,
raise_on_error=False,
stats_only=False
)
stats["ok"] += ok
stats["errors"] += len(errors)
for err in errors:
failed_id = err.get("index", {}).get("_id", "unknown")
reason = str(err.get("index", {}).get("error", "unknown ES error"))
for chunk, _ in buffer:
if chunk["chunk_id"] == failed_id:
dlq.write(chunk, f"ES bulk error: {reason}")
break
except Exception as exc:
for chunk, _ in buffer:
dlq.write(chunk, f"ES bulk exception: {exc}")
stats["errors"] += len(buffer)
buffer.clear()
while True:
item = await queue.get()
if item is None:
await flush_buffer()
break
buffer.extend(item)
if len(buffer) >= batch_size_es:
await flush_buffer()
async def build_es_client():
url = "http://127.0.0.1:9200"
client = AsyncElasticsearch(
url,
verify_certs=False,
request_timeout=60
)
try:
info = await client.info()
print(f" Elasticsearch {info['version']['number']} en {url}")
except Exception as e:
raise ConnectionError(f"Cant connet {url}. Error: {e}")
return client
async def create_index(client: AsyncElasticsearch, index: str,
embedding_dim: int,
delete_if_exists: bool = False) -> None:
exists = await client.indices.exists(index=index)
if exists and delete_if_exists:
await client.indices.delete(index=index)
exists = False
if not exists:
await client.indices.create(index=index, body=build_index_mapping(embedding_dim))
print(f" · Index '{index}' created (dim={embedding_dim}, int8_hnsw, cosine).")
else:
print(f" · Inex '{index}' reused.")
"""
async def build_es_client():
url = "http://127.0.0.1:9200"
client = AsyncElasticsearch(
url,
verify_certs=False,
request_timeout=60,
headers={
"Accept": "application/vnd.elasticsearch+json; compatible-with=8",
"Content-Type": "application/json"
}
)
client.options(headers={"Accept": "application/vnd.elasticsearch+json; compatible-with=8"})
try:
await client.info()
except Exception as e:
raise ConnectionError(f"Error de versión/compatibilidad: {e}")
return client
"""
async def run(args):
ollama_url = os.environ.get("OLLAMA_URL", DEFAULT_OLLAMA_URL)
ollama_model = os.environ.get("OLLAMA_MODEL", DEFAULT_OLLAMA_MODEL)
embed_dim = int(os.environ.get("OLLAMA_EMBEDDING_DIM", DEFAULT_EMBEDDING_DIM))
embedder = OllamaAsyncEmbedder(ollama_url, ollama_model)
if args.probe_dim:
dim = await embedder.probe_dimension()
print(f" Model dimensions: {dim}")
await embedder.close()
return
if not Path(args.chunks).exists():
print(f"File Not Found: {args.chunks}")
await embedder.close()
return
total = count_lines(args.chunks)
print(f" Total Chunks: {total}")
print("\nConnecting to VectorDB...")
es_client = await build_es_client()
print(f"\nGenerating index '{args.index}'...")
await create_index(es_client, args.index, embed_dim,
delete_if_exists=args.delete)
print("\n Checking Model dimmensions...")
actual_dim = await embedder.probe_dimension()
if actual_dim != embed_dim:
print(f" Real dimmension ({actual_dim}) != OLLAMA_EMBEDDING_DIM ({embed_dim})")
await embedder.close()
await es_client.close()
return
print(f" Dimmension: {actual_dim}")
dlq = DeadLetterQueue(base_path=str(Path(args.chunks).parent))
stats = {"ok": 0, "errors": 0}
queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)
print(f"\nAsync pipeline (Ollama <-> Elasticsearch)...\n")
t0 = time.time()
pbar = tqdm(total=total, desc=" Processing", unit="chunks")
await asyncio.gather(
producer(args.chunks, embedder, queue, dlq,
args.batch_embed, pbar),
consumer(queue, es_client, args.index, dlq,
args.batch_es, stats),
)
pbar.close()
elapsed = time.time() - t0
await embedder.close()
await es_client.close()
dlq.close()
print("RESULT")
print("----------------")
print(f"Chunks : {total}")
print(f" -OK : {stats['ok']}")
print(f" -Errors : {stats['errors']}")
print(f" -Index Name: {args.index}")
print()
dlq.report()
print("----------------")
def main():
parser = argparse.ArgumentParser(
description="AVAP Ingestor"
)
parser.add_argument("--chunks", default=DEFAULT_CHUNKS_PATH,
help=f"JSONL Chunk File (default: {DEFAULT_CHUNKS_PATH})")
parser.add_argument("--index", default=DEFAULT_INDEX,
help=f"Index Name (default: {DEFAULT_INDEX})")
parser.add_argument("--delete", action="store_true",
help="Delete index before send")
parser.add_argument("--probe-dim", action="store_true",
help="Check Model dimmension")
parser.add_argument("--batch-embed", type=int, default=BATCH_SIZE_EMBED,
help=f"Chunks by Ollama call(default: {BATCH_SIZE_EMBED})")
parser.add_argument("--batch-es", type=int, default=BATCH_SIZE_ES,
help=f"Docs by bulk ES (default: {BATCH_SIZE_ES})")
args = parser.parse_args()
print("----------------")
print("AVAP INGESTOR")
print("----------------")
if not args.probe_dim:
print(f" Chunks : {args.chunks}")
print(f" INDEX ES : {args.index}")
print(f" Ollama URL : {os.environ.get('OLLAMA_URL', DEFAULT_OLLAMA_URL)}")
print(f" MODEL : {os.environ.get('OLLAMA_MODEL', DEFAULT_OLLAMA_MODEL)}")
print(f" MODEL DIM : {os.environ.get('OLLAMA_EMBEDDING_DIM', DEFAULT_EMBEDDING_DIM)}")
print()
asyncio.run(run(args))
if __name__ == "__main__":
main()