{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "0a8abbfa", "metadata": {}, "outputs": [], "source": [ "import os\n", "import re\n", "import uuid\n", "from dataclasses import dataclass\n", "from typing import Iterable, List, Dict, Any, Callable, Protocol\n", "\n", "import torch\n", "import torch.nn.functional as F\n", "from loguru import logger\n", "from transformers import AutoTokenizer, AutoModel, AutoConfig\n", "from elasticsearch import Elasticsearch\n", "from elasticsearch.helpers import bulk\n", "import nltk\n", "from nltk.tokenize import sent_tokenize\n", "nltk.download(\"punkt\", quiet=True)\n", "\n", "config = AutoConfig.from_pretrained(os.getenv(\"HF_EMBEDDING_MODEL_NAME\"))\n", "embedding_dim = config.hidden_size\n", "\n", "es_index = os.getenv(\"ELASTICSEARCH_INDEX\")\n", "embedding_model_name = os.getenv(\"HF_EMBEDDING_MODEL_NAME\")" ] }, { "cell_type": "markdown", "id": "77f6c552", "metadata": {}, "source": [ "### Domain model" ] }, { "cell_type": "code", "execution_count": 2, "id": "c4cd2bc2", "metadata": {}, "outputs": [], "source": [ "@dataclass(frozen=True)\n", "class Chunk:\n", " doc_id: str\n", " chunk_id: int\n", " text: str\n", " source: str\n", " metadata: Dict[str, Any]" ] }, { "cell_type": "markdown", "id": "5cd700bd", "metadata": {}, "source": [ "### Utilities" ] }, { "cell_type": "code", "execution_count": 3, "id": "84e834d9", "metadata": {}, "outputs": [], "source": [ "def clean_text(text: str) -> str:\n", " text = text.replace(\"\\u00a0\", \" \")\n", " text = re.sub(r\"\\s+\", \" \", text).strip()\n", " return text" ] }, { "cell_type": "code", "execution_count": 4, "id": "4ebdc5f5", "metadata": {}, "outputs": [], "source": [ "class ChunkingStrategy(Protocol):\n", " def __call__(self, text: str, **kwargs) -> List[str]:\n", " ..." ] }, { "cell_type": "markdown", "id": "82209fc0", "metadata": {}, "source": [ "### Chunking strategies" ] }, { "cell_type": "code", "execution_count": null, "id": "9f360449", "metadata": {}, "outputs": [], "source": [ "def fixed_size_token_chunking(\n", " text: str,\n", " embedding_model_name: str = embedding_model_name,\n", " chunk_size: int = 1200,\n", " overlap: int = 200,\n", ") -> List[str]:\n", "\n", " if chunk_size <= overlap:\n", " raise ValueError(\"chunk_size must be greater than overlap\")\n", "\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name, use_fast=True)\n", " token_ids = tokenizer.encode(text, add_special_tokens=False)\n", "\n", " chunks: List[str] = []\n", " start = 0\n", " n = len(token_ids)\n", "\n", " while start < n:\n", " end = min(start + chunk_size, n)\n", " chunk_ids = token_ids[start:end]\n", " chunks.append(tokenizer.decode(chunk_ids, skip_special_tokens=True))\n", "\n", " if end == n:\n", " break\n", "\n", " start = end - overlap\n", "\n", " return chunks\n", "\n", "\n", "def semantic_chunking(\n", " text: str,\n", " embedding_model_name: str = embedding_model_name,\n", " similarity_threshold: float = 0.6,\n", " max_sentences_per_chunk: int = 12,\n", ") -> List[str]:\n", " sentences = [s.strip() for s in sent_tokenize(text) if s.strip()]\n", " if not sentences:\n", " return []\n", " logger.info(f\"Semantic chunking: {len(sentences)} sentences found\")\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)\n", " model = AutoModel.from_pretrained(embedding_model_name).to(device)\n", " model.eval()\n", "\n", " with torch.no_grad():\n", " enc = tokenizer(sentences, padding=True, truncation=True, return_tensors=\"pt\").to(device)\n", " out = model(**enc)\n", " mask = enc[\"attention_mask\"].unsqueeze(-1)\n", " vecs = (out.last_hidden_state * mask).sum(1) / mask.sum(1).clamp(min=1e-9)\n", " vecs = F.normalize(vecs, p=2, dim=1)\n", "\n", " chunks: List[List[str]] = [[sentences[0]]]\n", "\n", " for i in range(1, len(sentences)):\n", " sim = float((vecs[i - 1] * vecs[i]).sum())\n", " logger.info(f\"Similarity between sentence {i-1} and {i}: {sim:.4f}\")\n", " if sim < similarity_threshold or len(chunks[-1]) >= max_sentences_per_chunk:\n", " chunks.append([])\n", " chunks[-1].append(sentences[i])\n", "\n", " return [\" \".join(chunk) for chunk in chunks if chunk]" ] }, { "cell_type": "code", "execution_count": 6, "id": "bc7267d7", "metadata": {}, "outputs": [], "source": [ "CHUNKING_REGISTRY: Dict[str, ChunkingStrategy] = {\n", " \"fixed\": fixed_size_token_chunking,\n", " \"semantic\": semantic_chunking,\n", "}" ] }, { "cell_type": "code", "execution_count": 7, "id": "87f2f70c", "metadata": {}, "outputs": [], "source": [ "def build_chunks(\n", " doc_text: str,\n", " source: str,\n", " metadata: Dict[str, Any],\n", " chunking_strategy: str = \"fixed\",\n", " **chunking_kwargs,\n", ") -> List[Chunk]:\n", "\n", " if chunking_strategy not in CHUNKING_REGISTRY:\n", " raise ValueError(\n", " f\"Unknown chunking strategy '{chunking_strategy}'. \"\n", " f\"Available: {list(CHUNKING_REGISTRY.keys())}\"\n", " )\n", "\n", " doc_id = metadata.get(\"doc_id\") or str(uuid.uuid4())\n", " cleaned = clean_text(doc_text)\n", "\n", " chunking_fn = CHUNKING_REGISTRY[chunking_strategy]\n", "\n", " parts = chunking_fn(cleaned, **chunking_kwargs)\n", "\n", " return [\n", " Chunk(\n", " doc_id=doc_id,\n", " chunk_id=i,\n", " text=part,\n", " source=source,\n", " metadata={**metadata, \"doc_id\": doc_id},\n", " )\n", " for i, part in enumerate(parts)\n", " if part.strip()\n", " ]" ] }, { "cell_type": "markdown", "id": "ba5649e9", "metadata": {}, "source": [ "### Ingestion in elasticsearch" ] }, { "cell_type": "code", "execution_count": 8, "id": "ff03c689", "metadata": {}, "outputs": [], "source": [ "def index_chunks(\n", " es: Elasticsearch,\n", " index_name: str,\n", " chunks: List[Chunk],\n", " embedding_model_name: str = embedding_model_name,\n", " batch_size: int = 64,\n", ") -> None:\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)\n", " model = AutoModel.from_pretrained(embedding_model_name)\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " model.to(device)\n", " model.eval()\n", " \n", " def actions() -> Iterable[Dict[str, Any]]:\n", " # Embed in batches for speed\n", " for i in range(0, len(chunks), batch_size):\n", " batch = chunks[i:i + batch_size]\n", " texts = [c.text for c in batch]\n", " \n", " with torch.no_grad():\n", " enc = tokenizer(texts, padding=True, truncation=True, return_tensors=\"pt\").to(device)\n", " out = model(**enc)\n", " mask = enc[\"attention_mask\"].unsqueeze(-1)\n", " vecs = (out.last_hidden_state * mask).sum(1) / mask.sum(1).clamp(min=1e-9)\n", " vecs = F.normalize(vecs, p=2, dim=1)\n", " vectors = vecs.cpu().tolist()\n", "\n", " for c, v in zip(batch, vectors):\n", " yield {\n", " \"_op_type\": \"index\",\n", " \"_index\": index_name,\n", " \"_id\": f\"{c.doc_id}:{c.chunk_id}\",\n", " \"_source\": {\n", " \"doc_id\": c.doc_id,\n", " \"chunk_id\": c.chunk_id,\n", " \"text\": c.text,\n", " \"source\": c.source,\n", " \"metadata\": c.metadata,\n", " \"embedding\": v,\n", " },\n", " }\n", "\n", " bulk(es.options(request_timeout=120), actions())" ] }, { "cell_type": "markdown", "id": "11c8a650", "metadata": {}, "source": [ "### Test" ] }, { "cell_type": "code", "execution_count": 13, "id": "739b813e", "metadata": {}, "outputs": [], "source": [ "mapping = {\n", " \"settings\": {\n", " \"index\": {\n", " \"number_of_shards\": 1\n", " }\n", " },\n", " \"mappings\": {\n", " \"properties\": {\n", " \"doc_id\": { \"type\": \"keyword\" },\n", " \"chunk_id\": { \"type\": \"integer\" },\n", " \"text\": { \"type\": \"text\" },\n", " \"source\": { \"type\": \"keyword\" },\n", " \"metadata\": { \"type\": \"object\", \"enabled\": True },\n", " \"embedding\": {\n", " \"type\": \"dense_vector\",\n", " \"dims\": embedding_dim,\n", " \"index\": True,\n", " \"similarity\": \"cosine\"\n", " }\n", " }\n", " }\n", "}\n", "\n", "es = Elasticsearch(\n", " os.getenv(\"ELASTICSEARCH_LOCAL_URL\"),\n", " request_timeout=60,\n", " max_retries=5,\n", " retry_on_timeout=True,\n", ")\n" ] }, { "cell_type": "code", "execution_count": 14, "id": "96cf615f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "avap-docs-test\n" ] } ], "source": [ "for index in es.indices.get(index=\"*\"):\n", " print(index)" ] }, { "cell_type": "code", "execution_count": 15, "id": "6bbb736f", "metadata": {}, "outputs": [], "source": [ "if es.indices.exists(index=es_index):\n", " es.indices.delete(index=es_index)" ] }, { "cell_type": "code", "execution_count": 16, "id": "44f88a82", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Index created: avap-docs-test\n" ] } ], "source": [ "es.indices.create(index=es_index, body=mapping)\n", "print(\"Index created:\", es_index)" ] }, { "cell_type": "code", "execution_count": 17, "id": "0fecc480", "metadata": {}, "outputs": [], "source": [ "doc_text = \"\"\"The Impact of Distributed Computing on Scientific Research\n", "\n", "Distributed computing has profoundly transformed the way large-scale scientific research is conducted. Instead of relying on a single centralized supercomputer, it is now possible to coordinate thousands of interconnected machines that share complex tasks. This approach makes it possible to process massive volumes of data in significantly shorter periods of time.\n", "\n", "One of the best-known examples is genomic analysis. DNA sequencing generates enormous amounts of information that must be processed and compared. Thanks to distributed systems, researchers can analyze mutations, identify genetic patterns, and accelerate the development of personalized treatments.\n", "\n", "Beyond the medical field, particle physics also benefits enormously. Experiments such as those conducted at CERN produce petabytes of data that must be distributed among research centers around the world. Without this collaborative and distributed model, many discoveries would simply be unfeasible.\n", "\n", "Energy Limitations and Sustainability\n", "\n", "However, the expansion of computational infrastructure brings significant challenges. One of the most relevant is energy consumption. Modern data centers require massive amounts of electricity both to operate and to cool their equipment.\n", "\n", "This demand has driven research into energy efficiency, more sustainable architectures, and the use of renewable energy sources. Some technology companies are already installing data centers in colder regions to reduce cooling costs, while others are exploring AI-based solutions to optimize energy usage.\n", "\n", "Sustainability has become a strategic criterion, not only economically but also reputationally. Organizations that fail to properly manage their carbon footprint may face public and regulatory criticism.\n", "\n", "Language Models and Deep Learning\n", "\n", "At the same time, the development of large-scale language models has redefined contemporary artificial intelligence. These models, trained with billions or even trillions of parameters, can generate coherent text, translate languages, and solve complex problems.\n", "\n", "Training these systems requires extremely powerful distributed infrastructures. Data parallelism and model parallelism make it possible to divide the computational workload across multiple GPUs or specialized nodes.\n", "\n", "However, the inference phase presents different challenges. Although it is less intensive than training, large-scale inference — such as in public AI services — requires constant optimization to reduce latency and resource consumption.\n", "\n", "Chunking Techniques and Semantic Retrieval\n", "\n", "In retrieval-augmented generation (RAG) systems, the way information is segmented directly influences the quality of the generated responses. Length-based chunking alone can split ideas in half, affecting the coherence of the retrieved context.\n", "\n", "Semantic chunking, on the other hand, attempts to group text fragments that share meaning. This approach uses embeddings to measure similarity and determine where to divide the content.\n", "\n", "A similarity threshold that is too low may generate excessively large and heterogeneous chunks. Conversely, a threshold that is too high may produce small fragments and lose relevant context.\n", "\n", "Proper calibration depends on the text domain, the average paragraph length, and the embedding model used.\n", "\n", "Vertical Farming and the Urbanism of the Future\n", "\n", "Vertical farming proposes growing food in multi-level urban structures. This technique aims to reduce transportation dependency and optimize space usage in densely populated cities.\n", "\n", "Through hydroponic systems and automated nutrient control, plants can grow without traditional soil. Distributed sensors monitor humidity, temperature, and nutrient levels in real time.\n", "\n", "Moreover, integration with renewable energy allows these facilities to operate more sustainably. In some cases, agricultural buildings are architecturally designed to integrate seamlessly into the urban environment.\n", "\n", "Although it still faces economic challenges, vertical farming represents a potential solution for food security in megacities.\"\"\"" ] }, { "cell_type": "code", "execution_count": 18, "id": "7bcf0c87", "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "2ac7e2565683429bb3b6c1d7130d7022", "version_major": 2, "version_minor": 0 }, "text/plain": [ "tokenizer_config.json: 0.00B [00:00, ?B/s]" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "98ff37d5570a4978b7d2c25ad3ce98fb", "version_major": 2, "version_minor": 0 }, "text/plain": [ "vocab.json: 0.00B [00:00, ?B/s]" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "0b8a0c1a1f274c5aa5c100f8022b2973", "version_major": 2, "version_minor": 0 }, "text/plain": [ "merges.txt: 0.00B [00:00, ?B/s]" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "97479b5674aa4445b4add3e8bd015ae1", "version_major": 2, "version_minor": 0 }, "text/plain": [ "tokenizer.json: 0.00B [00:00, ?B/s]" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "Warning: You are sending unauthenticated requests to the HF Hub. Please set a HF_TOKEN to enable higher rate limits and faster downloads.\n" ] } ], "source": [ "chunks = build_chunks(\n", " doc_text=doc_text,\n", " source=\"local_demo\",\n", " metadata={\"title\": \"Demo\", \"doc_id\": \"demo-001\"},\n", " chunking_strategy=\"fixed\",\n", " chunk_size=150,\n", " overlap=25,\n", ")" ] }, { "cell_type": "code", "execution_count": 19, "id": "e716e9fb", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/pseco/VsCodeProjects/assistance-engine/.venv/lib/python3.12/site-packages/torch/cuda/__init__.py:184: UserWarning: CUDA initialization: The NVIDIA driver on your system is too old (found version 11040). Please update your GPU driver by downloading and installing a new version from the URL: http://www.nvidia.com/Download/index.aspx Alternatively, go to: https://pytorch.org to install a PyTorch version that has been compiled with your version of the CUDA driver. (Triggered internally at /pytorch/c10/cuda/CUDAFunctions.cpp:119.)\n", " return torch._C._cuda_getDeviceCount() > 0\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "50ec02cf4b2247f7993d11905022f949", "version_major": 2, "version_minor": 0 }, "text/plain": [ "model.safetensors: 0%| | 0.00/3.09G [00:00 List[Dict[str, Any]]:\n", " # Embed the query\n", " tokenizer = AutoTokenizer.from_pretrained(embedding_model_name)\n", " model = AutoModel.from_pretrained(embedding_model_name)\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", " model.to(device)\n", " model.eval()\n", " with torch.no_grad():\n", " enc = tokenizer([query], padding=True, truncation=True, return_tensors=\"pt\").to(device)\n", " out = model(**enc)\n", " mask = enc[\"attention_mask\"].unsqueeze(-1)\n", " vec = (out.last_hidden_state * mask).sum(1) / mask.sum(1).clamp(min=1e-9)\n", " vec = F.normalize(vec, p=2, dim=1)\n", " query_vector = vec[0].cpu().tolist()\n", "\n", " # Search in Elasticsearch using dense vector\n", " body = {\n", " \"size\": top_k,\n", " \"query\": {\n", " \"script_score\": {\n", " \"query\": {\"match_all\": {}},\n", " \"script\": {\n", " \"source\": \"cosineSimilarity(params.query_vector, 'embedding') + 1.0\",\n", " \"params\": {\"query_vector\": query_vector},\n", " },\n", " }\n", " }\n", " }\n", " res = es.search(index=index_name, body=body)\n", " return [hit[\"_source\"] for hit in res[\"hits\"][\"hits\"]]\n", "\n", "# Example usage:\n", "results = retrieve_similar_chunks(es, es_index, \"What are the challenges of distributed computing?\", top_k=3)\n", "for r in results:\n", " print(r[\"text\"])" ] } ], "metadata": { "kernelspec": { "display_name": "assistance-engine", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.11" } }, "nbformat": 4, "nbformat_minor": 5 }