{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "0a8abbfa", "metadata": {}, "outputs": [], "source": [ "import os\n", "import re\n", "import uuid\n", "from dataclasses import dataclass\n", "from typing import Iterable, List, Dict, Any, Callable, Protocol\n", "\n", "import torch\n", "import torch.nn.functional as F\n", "from loguru import logger\n", "from transformers import AutoTokenizer, AutoModel, AutoConfig\n", "from elasticsearch import Elasticsearch\n", "from elasticsearch.helpers import bulk\n", "import nltk\n", "from nltk.tokenize import sent_tokenize\n", "nltk.download(\"punkt\", quiet=True)\n", "\n", "config = AutoConfig.from_pretrained(os.getenv(\"EMBEDDING_MODEL_NAME\"))\n", "embedding_dim = config.hidden_size\n", "\n", "es_index = os.getenv(\"ELASTICSEARCH_INDEX\")\n", "embedding_model_name = os.getenv(\"EMBEDDING_MODEL_NAME\")" ] }, { "cell_type": "markdown", "id": "77f6c552", "metadata": {}, "source": [ "### Domain model" ] }, { "cell_type": "code", "execution_count": 2, "id": "c4cd2bc2", "metadata": {}, "outputs": [], "source": [ "@dataclass(frozen=True)\n", "class Chunk:\n", " doc_id: str\n", " chunk_id: int\n", " text: str\n", " source: str\n", " metadata: Dict[str, Any]" ] }, { "cell_type": "markdown", "id": "5cd700bd", "metadata": {}, "source": [ "### Utilities" ] }, { "cell_type": "code", "execution_count": 3, "id": "84e834d9", "metadata": {}, "outputs": [], "source": [ "def clean_text(text: str) -> str:\n", " text = text.replace(\"\\u00a0\", \" \")\n", " text = re.sub(r\"\\s+\", \" \", text).strip()\n", " return text" ] }, { "cell_type": "code", "execution_count": 4, "id": "4ebdc5f5", "metadata": {}, "outputs": [], "source": [ "class ChunkingStrategy(Protocol):\n", " def __call__(self, text: str, **kwargs) -> List[str]:\n", " ..." ] }, { "cell_type": "markdown", "id": "82209fc0", "metadata": {}, "source": [ "### Chunking strategies" ] }, { "cell_type": "code", "execution_count": 5, "id": "9f360449", "metadata": {}, "outputs": [], "source": [ "def fixed_size_token_chunking(\n", " text: str,\n", " embedding_model_name: str = embedding_model_name,\n", " chunk_size: int = 1200,\n", " overlap: int = 200,\n", ") -> List[str]:\n", "\n", " if chunk_size <= overlap:\n", " raise ValueError(\"chunk_size must be greater than overlap\")\n", "\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name, use_fast=True)\n", " token_ids = tokenizer.encode(text, add_special_tokens=False)\n", "\n", " chunks: List[str] = []\n", " start = 0\n", " n = len(token_ids)\n", "\n", " while start < n:\n", " end = min(start + chunk_size, n)\n", " chunk_ids = token_ids[start:end]\n", " chunks.append(tokenizer.decode(chunk_ids, skip_special_tokens=True))\n", "\n", " if end == n:\n", " break\n", "\n", " start = end - overlap\n", "\n", " return chunks\n", "\n", "\n", "def semantic_chunking(\n", " text: str,\n", " embedding_model_name: str = embedding_model_name,\n", " similarity_threshold: float = 0.6,\n", " max_sentences_per_chunk: int = 12,\n", ") -> List[str]:\n", " sentences = [s.strip() for s in sent_tokenize(text) if s.strip()]\n", " if not sentences:\n", " return []\n", " logger.info(f\"Semantic chunking: {len(sentences)} sentences found\")\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)\n", " model = AutoModel.from_pretrained(embedding_model_name).to(device)\n", " model.eval()\n", "\n", " with torch.no_grad():\n", " enc = tokenizer(sentences, padding=True, truncation=True, return_tensors=\"pt\").to(device)\n", " out = model(**enc)\n", " mask = enc[\"attention_mask\"].unsqueeze(-1)\n", " vecs = (out.last_hidden_state * mask).sum(1) / mask.sum(1).clamp(min=1e-9)\n", " vecs = F.normalize(vecs, p=2, dim=1)\n", "\n", " chunks: List[List[str]] = [[sentences[0]]]\n", "\n", " for i in range(1, len(sentences)):\n", " sim = float((vecs[i - 1] * vecs[i]).sum())\n", " logger.info(f\"Similarity between sentence {i-1} and {i}: {sim:.4f}\")\n", " if sim < similarity_threshold or len(chunks[-1]) >= max_sentences_per_chunk:\n", " chunks.append([])\n", " chunks[-1].append(sentences[i])\n", "\n", " return [\" \".join(chunk) for chunk in chunks if chunk]" ] }, { "cell_type": "code", "execution_count": 6, "id": "bc7267d7", "metadata": {}, "outputs": [], "source": [ "CHUNKING_REGISTRY: Dict[str, ChunkingStrategy] = {\n", " \"fixed\": fixed_size_token_chunking,\n", " \"semantic\": semantic_chunking,\n", "}" ] }, { "cell_type": "code", "execution_count": 7, "id": "87f2f70c", "metadata": {}, "outputs": [], "source": [ "def build_chunks(\n", " doc_text: str,\n", " source: str,\n", " metadata: Dict[str, Any],\n", " chunking_strategy: str = \"fixed\",\n", " **chunking_kwargs,\n", ") -> List[Chunk]:\n", "\n", " if chunking_strategy not in CHUNKING_REGISTRY:\n", " raise ValueError(\n", " f\"Unknown chunking strategy '{chunking_strategy}'. \"\n", " f\"Available: {list(CHUNKING_REGISTRY.keys())}\"\n", " )\n", "\n", " doc_id = metadata.get(\"doc_id\") or str(uuid.uuid4())\n", " cleaned = clean_text(doc_text)\n", "\n", " chunking_fn = CHUNKING_REGISTRY[chunking_strategy]\n", "\n", " parts = chunking_fn(cleaned, **chunking_kwargs)\n", "\n", " return [\n", " Chunk(\n", " doc_id=doc_id,\n", " chunk_id=i,\n", " text=part,\n", " source=source,\n", " metadata={**metadata, \"doc_id\": doc_id},\n", " )\n", " for i, part in enumerate(parts)\n", " if part.strip()\n", " ]" ] }, { "cell_type": "markdown", "id": "ba5649e9", "metadata": {}, "source": [ "### Ingestion in elasticsearch" ] }, { "cell_type": "code", "execution_count": 25, "id": "ff03c689", "metadata": {}, "outputs": [], "source": [ "def index_chunks(\n", " es: Elasticsearch,\n", " index_name: str,\n", " chunks: List[Chunk],\n", " embedding_model_name: str = embedding_model_name,\n", " batch_size: int = 64,\n", ") -> None:\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)\n", " model = AutoModel.from_pretrained(embedding_model_name)\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " model.to(device)\n", " model.eval()\n", " \n", " def actions() -> Iterable[Dict[str, Any]]:\n", " # Embed in batches for speed\n", " for i in range(0, len(chunks), batch_size):\n", " batch = chunks[i:i + batch_size]\n", " texts = [c.text for c in batch]\n", " \n", " with torch.no_grad():\n", " enc = tokenizer(texts, padding=True, truncation=True, return_tensors=\"pt\").to(device)\n", " out = model(**enc)\n", " mask = enc[\"attention_mask\"].unsqueeze(-1)\n", " vecs = (out.last_hidden_state * mask).sum(1) / mask.sum(1).clamp(min=1e-9)\n", " vecs = F.normalize(vecs, p=2, dim=1)\n", " vectors = vecs.cpu().tolist()\n", "\n", " for c, v in zip(batch, vectors):\n", " yield {\n", " \"_op_type\": \"index\",\n", " \"_index\": index_name,\n", " \"_id\": f\"{c.doc_id}:{c.chunk_id}\",\n", " \"_source\": {\n", " \"doc_id\": c.doc_id,\n", " \"chunk_id\": c.chunk_id,\n", " \"text\": c.text,\n", " \"source\": c.source,\n", " \"metadata\": c.metadata,\n", " \"embedding\": v,\n", " },\n", " }\n", "\n", " bulk(es.options(request_timeout=120), actions())" ] }, { "cell_type": "markdown", "id": "11c8a650", "metadata": {}, "source": [ "### Test" ] }, { "cell_type": "code", "execution_count": 9, "id": "739b813e", "metadata": {}, "outputs": [], "source": [ "mapping = {\n", " \"settings\": {\n", " \"index\": {\n", " \"number_of_shards\": 1\n", " }\n", " },\n", " \"mappings\": {\n", " \"properties\": {\n", " \"doc_id\": { \"type\": \"keyword\" },\n", " \"chunk_id\": { \"type\": \"integer\" },\n", " \"text\": { \"type\": \"text\" },\n", " \"source\": { \"type\": \"keyword\" },\n", " \"metadata\": { \"type\": \"object\", \"enabled\": True },\n", " \"embedding\": {\n", " \"type\": \"dense_vector\",\n", " \"dims\": embedding_dim,\n", " \"index\": True,\n", " \"similarity\": \"cosine\"\n", " }\n", " }\n", " }\n", "}\n", "\n", "es = Elasticsearch(\n", " os.getenv(\"ELASTICSEARCH_LOCAL_URL\"),\n", ")\n" ] }, { "cell_type": "code", "execution_count": 16, "id": "96cf615f", "metadata": {}, "outputs": [], "source": [ "for index in es.indices.get(index=\"*\"):\n", " print(index)" ] }, { "cell_type": "code", "execution_count": 15, "id": "6bbb736f", "metadata": {}, "outputs": [], "source": [ "if es.indices.exists(index=es_index):\n", " es.indices.delete(index=es_index)" ] }, { "cell_type": "code", "execution_count": 17, "id": "44f88a82", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Index created: avap-docs-test\n" ] } ], "source": [ "es.indices.create(index=es_index, body=mapping)\n", "print(\"Index created:\", es_index)" ] }, { "cell_type": "code", "execution_count": 19, "id": "0fecc480", "metadata": {}, "outputs": [], "source": [ "doc_text = \"\"\"El impacto de la computación distribuida en la investigación científica\n", "La computación distribuida ha transformado profundamente la manera en que se realiza investigación científica a gran escala. En lugar de depender de un único superordenador centralizado, hoy es posible coordinar miles de máquinas interconectadas que comparten tareas complejas. Este enfoque permite procesar volúmenes masivos de datos en tiempos significativamente menores.\n", "Uno de los casos más conocidos es el análisis genómico. La secuenciación del ADN genera cantidades enormes de información que deben procesarse y compararse. Gracias a sistemas distribuidos, los investigadores pueden analizar mutaciones, identificar patrones genéticos y acelerar el desarrollo de tratamientos personalizados.\n", "Además del ámbito médico, la física de partículas también se beneficia enormemente. Experimentos como los realizados en el CERN producen petabytes de datos que deben distribuirse entre centros de investigación de todo el mundo. Sin este modelo colaborativo y distribuido, muchos descubrimientos serían simplemente inviables.\n", "Limitaciones energéticas y sostenibilidad\n", "Sin embargo, la expansión de infraestructuras computacionales trae consigo desafíos importantes. Uno de los más relevantes es el consumo energético. Los centros de datos modernos requieren cantidades masivas de electricidad tanto para operar como para refrigerar los equipos.\n", "Este consumo ha impulsado investigaciones en eficiencia energética, arquitecturas más sostenibles y el uso de energías renovables. Algunas empresas tecnológicas ya están instalando centros de datos en regiones frías para reducir costes de refrigeración, mientras que otras exploran soluciones basadas en inteligencia artificial para optimizar el uso energético.\n", "La sostenibilidad se ha convertido en un criterio estratégico, no solo económico sino también reputacional. Las organizaciones que no gestionan adecuadamente su huella de carbono pueden enfrentar críticas públicas y regulatorias.\n", "Modelos de lenguaje y aprendizaje profundo\n", "En paralelo, el desarrollo de modelos de lenguaje de gran escala ha redefinido la inteligencia artificial contemporánea. Estos modelos, entrenados con billones de parámetros, pueden generar texto coherente, traducir idiomas y resolver problemas complejos.\n", "El entrenamiento de estos sistemas requiere infraestructuras distribuidas extremadamente potentes. El paralelismo de datos y el paralelismo de modelo permiten dividir la carga computacional entre múltiples GPUs o nodos especializados.\n", "Sin embargo, la fase de inferencia presenta retos distintos. Aunque es menos intensiva que el entrenamiento, la inferencia a gran escala —como en servicios públicos de IA— requiere optimización constante para reducir latencia y consumo de recursos.\n", "Técnicas de chunking y recuperación semántica\n", "En sistemas basados en recuperación aumentada (RAG), el modo en que se fragmenta la información influye directamente en la calidad de las respuestas generadas. El chunking basado únicamente en longitud puede cortar ideas a la mitad, afectando la coherencia del contexto recuperado.\n", "Por otro lado, el semantic chunking intenta agrupar fragmentos de texto que comparten significado. Este enfoque utiliza embeddings para medir similitud y decidir dónde dividir el contenido.\n", "Un umbral de similitud demasiado bajo puede generar fragmentos excesivamente grandes y heterogéneos. En cambio, un umbral demasiado alto puede producir fragmentos pequeños y perder contexto relevante.\n", "La calibración adecuada depende del dominio del texto, la longitud media de los párrafos y el modelo de embeddings utilizado.\n", "Agricultura vertical y urbanismo del futuro\n", "La agricultura vertical propone cultivar alimentos en estructuras urbanas de múltiples niveles. Esta técnica busca reducir la dependencia del transporte y optimizar el uso del espacio en ciudades densamente pobladas.\n", "Mediante sistemas hidropónicos y control automatizado de nutrientes, las plantas pueden crecer sin suelo tradicional. Sensores distribuidos monitorizan humedad, temperatura y niveles de nutrientes en tiempo real.\n", "Además, la integración con energías renovables permite que estas instalaciones funcionen de manera más sostenible. En algunos casos, los edificios agrícolas se diseñan para integrarse arquitectónicamente en el entorno urbano.\n", "Aunque todavía enfrenta desafíos económicos, la agricultura vertical representa una posible solución para la seguridad alimentaria en megaciudades.\"\"\"" ] }, { "cell_type": "code", "execution_count": 21, "id": "7bcf0c87", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Warning: You are sending unauthenticated requests to the HF Hub. Please set a HF_TOKEN to enable higher rate limits and faster downloads.\n" ] } ], "source": [ "chunks = build_chunks(\n", " doc_text=doc_text,\n", " source=\"local_demo\",\n", " metadata={\"title\": \"Demo\", \"doc_id\": \"demo-001\"},\n", " chunking_strategy=\"fixed\",\n", " chunk_size=150,\n", " overlap=25,\n", ")" ] }, { "cell_type": "code", "execution_count": 26, "id": "e716e9fb", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "e888ce3c36084633ac54ecad484ece49", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Loading weights: 0%| | 0/310 [00:00