Step 1: Data Indexing Pipeline
In this lesson, you will build the complete data indexing pipeline: document loading, text processing, embedding generation with sentence-transformers, Elasticsearch index mapping, and bulk indexing. By the end, you will have thousands of documents indexed with both text fields and dense vectors, ready for search.
The Indexing Flow
Every document goes through four stages before it is searchable:
Raw Document (JSON/CSV/HTML)
|
v
[1. Load & Parse] --> Extract title, body, metadata
|
v
[2. Clean & Normalize] --> Strip HTML, normalize whitespace, lowercase
|
v
[3. Generate Embedding] --> sentence-transformers encodes text to 384-dim vector
|
v
[4. Bulk Index] --> Elasticsearch stores text + vector + metadata
Elasticsearch Index Mapping
The mapping is the most important part. It defines how Elasticsearch stores and indexes each field. We need both text fields (for BM25) and a dense_vector field (for kNN search).
# app/elasticsearch/client.py
"""Elasticsearch client wrapper with index management."""
from elasticsearch import Elasticsearch, helpers
from app.config import get_settings
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
class SearchClient:
"""Wrapper around the Elasticsearch client."""
def __init__(self):
self.es = Elasticsearch(
settings.elasticsearch_host,
basic_auth=(
settings.elasticsearch_username,
settings.elasticsearch_password
),
request_timeout=30
)
self.index_name = settings.elasticsearch_index
def create_index(self):
"""Create the search index with text + dense_vector mappings."""
if self.es.indices.exists(index=self.index_name):
logger.info(f"Index '{self.index_name}' already exists, skipping creation")
return
mapping = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"custom_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"stop",
"snowball",
"trim"
]
},
"autocomplete_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"edge_ngram_filter"
]
}
},
"filter": {
"edge_ngram_filter": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 15
}
}
}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "custom_analyzer",
"fields": {
"keyword": {"type": "keyword"},
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_analyzer",
"search_analyzer": "standard"
}
}
},
"body": {
"type": "text",
"analyzer": "custom_analyzer"
},
"category": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"url": {
"type": "keyword",
"index": False
},
"created_at": {
"type": "date"
},
"embedding": {
"type": "dense_vector",
"dims": settings.embedding_dimension,
"index": True,
"similarity": "cosine"
}
}
}
}
self.es.indices.create(index=self.index_name, body=mapping)
logger.info(f"Created index '{self.index_name}' with text + dense_vector mappings")
def bulk_index(self, documents: list[dict]) -> int:
"""Bulk index documents into Elasticsearch.
Args:
documents: List of dicts with title, body, category, tags, url,
created_at, and embedding fields.
Returns:
Number of successfully indexed documents.
"""
actions = [
{
"_index": self.index_name,
"_id": doc.get("id", None),
"_source": doc
}
for doc in documents
]
success, errors = helpers.bulk(
self.es,
actions,
raise_on_error=False,
stats_only=True
)
logger.info(f"Bulk indexed {success} documents, {errors} errors")
return success
def delete_index(self):
"""Delete the search index (use with caution)."""
if self.es.indices.exists(index=self.index_name):
self.es.indices.delete(index=self.index_name)
logger.info(f"Deleted index '{self.index_name}'")
all-MiniLM-L6-v2 model produces normalized embeddings, so cosine similarity and dot product give identical rankings. We use cosine because it is the standard choice and makes scores intuitive (0 to 1 range).Document Loader
The loader reads documents from various sources. We support JSON, CSV, and HTML out of the box.
# app/indexing/loader.py
"""Document loaders for JSON, CSV, and HTML files."""
import json
import csv
from pathlib import Path
from bs4 import BeautifulSoup
import logging
logger = logging.getLogger(__name__)
def load_json(file_path: str) -> list[dict]:
"""Load documents from a JSON file.
Expected format: list of objects with at minimum 'title' and 'body' fields.
"""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
data = [data]
logger.info(f"Loaded {len(data)} documents from {file_path}")
return data
def load_csv(file_path: str) -> list[dict]:
"""Load documents from a CSV file.
Expected columns: title, body, category (optional), tags (optional), url (optional)
"""
documents = []
with open(file_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
doc = {
"title": row.get("title", ""),
"body": row.get("body", ""),
"category": row.get("category", "general"),
"tags": row.get("tags", "").split(",") if row.get("tags") else [],
"url": row.get("url", "")
}
documents.append(doc)
logger.info(f"Loaded {len(documents)} documents from {file_path}")
return documents
def load_html(file_path: str) -> list[dict]:
"""Load a single document from an HTML file, extracting title and body text."""
with open(file_path, "r", encoding="utf-8") as f:
soup = BeautifulSoup(f.read(), "lxml")
title = soup.title.string if soup.title else Path(file_path).stem
body = soup.get_text(separator=" ", strip=True)
logger.info(f"Loaded HTML document: {title}")
return [{"title": title, "body": body, "url": file_path}]
def load_directory(dir_path: str) -> list[dict]:
"""Load all supported documents from a directory."""
documents = []
path = Path(dir_path)
for file in sorted(path.rglob("*")):
if file.suffix == ".json":
documents.extend(load_json(str(file)))
elif file.suffix == ".csv":
documents.extend(load_csv(str(file)))
elif file.suffix in (".html", ".htm"):
documents.extend(load_html(str(file)))
logger.info(f"Loaded {len(documents)} total documents from {dir_path}")
return documents
Text Processor
Raw document text needs cleaning before indexing. The processor normalizes whitespace, strips HTML fragments, and prepares the text for embedding.
# app/indexing/processor.py
"""Text cleaning and normalization for search indexing."""
import re
from datetime import datetime, timezone
import hashlib
def clean_text(text: str) -> str:
"""Clean and normalize text for indexing.
- Strip residual HTML tags
- Normalize unicode whitespace
- Collapse multiple spaces/newlines
- Strip leading/trailing whitespace
"""
if not text:
return ""
# Remove HTML tags
text = re.sub(r"<[^>]+>", " ", text)
# Decode common HTML entities
text = text.replace("&", "&").replace("<", "<").replace(">", ">")
text = text.replace(""", '"').replace("'", "'")
# Normalize whitespace
text = re.sub(r"\s+", " ", text)
return text.strip()
def prepare_document(doc: dict) -> dict:
"""Prepare a raw document for indexing.
Cleans text fields, generates a deterministic ID, and adds timestamps.
"""
title = clean_text(doc.get("title", ""))
body = clean_text(doc.get("body", ""))
# Generate deterministic ID from title + body
content_hash = hashlib.sha256(f"{title}|{body}".encode()).hexdigest()[:16]
return {
"id": doc.get("id", content_hash),
"title": title,
"body": body,
"category": doc.get("category", "general"),
"tags": doc.get("tags", []),
"url": doc.get("url", ""),
"created_at": doc.get("created_at", datetime.now(timezone.utc).isoformat()),
# 'embedding' will be added by the pipeline
}
def get_embedding_text(doc: dict) -> str:
"""Combine title and body into the text that gets embedded.
Title is weighted by repeating it. This ensures title terms
have stronger influence on the vector representation.
"""
title = doc.get("title", "")
body = doc.get("body", "")
return f"{title}. {title}. {body}"
Embedding Generator
The encoder wraps sentence-transformers to generate embeddings. The model runs locally and produces 384-dimensional vectors.
# app/embeddings/encoder.py
"""Sentence-transformer embedding encoder."""
from sentence_transformers import SentenceTransformer
from app.config import get_settings
import numpy as np
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
# Load model once at module level (cached in memory)
_model = None
def get_model() -> SentenceTransformer:
"""Lazy-load the sentence-transformer model."""
global _model
if _model is None:
logger.info(f"Loading embedding model: {settings.embedding_model}")
_model = SentenceTransformer(settings.embedding_model)
logger.info(f"Model loaded. Dimension: {_model.get_sentence_embedding_dimension()}")
return _model
def encode_texts(texts: list[str], batch_size: int = 64) -> list[list[float]]:
"""Encode a list of texts into dense vectors.
Args:
texts: List of strings to encode.
batch_size: Number of texts to encode in one batch.
Returns:
List of embedding vectors (each is a list of floats).
"""
model = get_model()
embeddings = model.encode(
texts,
batch_size=batch_size,
show_progress_bar=len(texts) > 100,
normalize_embeddings=True # Unit vectors for cosine similarity
)
return embeddings.tolist()
def encode_query(query: str) -> list[float]:
"""Encode a single search query into a dense vector.
This is separate from encode_texts because queries may need
different handling (e.g., query prefixes for asymmetric models).
"""
model = get_model()
embedding = model.encode(
query,
normalize_embeddings=True
)
return embedding.tolist()
Indexing Pipeline
The pipeline ties everything together: load documents, clean them, generate embeddings, and bulk index into Elasticsearch.
# app/indexing/pipeline.py
"""Complete indexing pipeline: load -> process -> embed -> index."""
from app.indexing.loader import load_directory, load_json
from app.indexing.processor import prepare_document, get_embedding_text
from app.embeddings.encoder import encode_texts
from app.elasticsearch.client import SearchClient
import logging
import time
logger = logging.getLogger(__name__)
def run_indexing_pipeline(source_path: str, batch_size: int = 100) -> dict:
"""Run the full indexing pipeline.
Args:
source_path: Path to a JSON file or directory of documents.
batch_size: Number of documents to process and index at once.
Returns:
Dict with indexing statistics.
"""
start_time = time.time()
# 1. Initialize Elasticsearch
client = SearchClient()
client.create_index()
# 2. Load documents
if source_path.endswith(".json"):
raw_docs = load_json(source_path)
else:
raw_docs = load_directory(source_path)
logger.info(f"Loaded {len(raw_docs)} raw documents")
# 3. Process and index in batches
total_indexed = 0
total_errors = 0
for i in range(0, len(raw_docs), batch_size):
batch = raw_docs[i:i + batch_size]
# Clean and prepare documents
prepared = [prepare_document(doc) for doc in batch]
# Generate embedding text and encode
texts = [get_embedding_text(doc) for doc in prepared]
embeddings = encode_texts(texts)
# Attach embeddings to documents
for doc, embedding in zip(prepared, embeddings):
doc["embedding"] = embedding
# Bulk index
success = client.bulk_index(prepared)
total_indexed += success
total_errors += len(batch) - success
logger.info(f"Batch {i // batch_size + 1}: indexed {success}/{len(batch)}")
elapsed = time.time() - start_time
stats = {
"total_documents": len(raw_docs),
"indexed": total_indexed,
"errors": total_errors,
"elapsed_seconds": round(elapsed, 2),
"docs_per_second": round(total_indexed / elapsed, 1) if elapsed > 0 else 0
}
logger.info(f"Indexing complete: {stats}")
return stats
Sample Data
Create a sample dataset to test the pipeline. Save this as data/sample/articles.json:
[
{
"title": "Introduction to Machine Learning",
"body": "Machine learning is a subset of artificial intelligence that enables systems to learn from data. Supervised learning uses labeled examples to train models for classification and regression tasks. Common algorithms include linear regression, decision trees, random forests, and neural networks.",
"category": "machine-learning",
"tags": ["ml", "ai", "beginner"],
"url": "/articles/intro-ml"
},
{
"title": "Understanding Vector Databases",
"body": "Vector databases store high-dimensional embeddings and enable fast similarity search. They use approximate nearest neighbor algorithms like HNSW and IVF to find similar vectors efficiently. Popular options include Qdrant, Pinecone, Weaviate, and Milvus.",
"category": "databases",
"tags": ["vectors", "embeddings", "search"],
"url": "/articles/vector-databases"
},
{
"title": "Building REST APIs with FastAPI",
"body": "FastAPI is a modern Python web framework for building APIs. It supports async/await natively, generates OpenAPI documentation automatically, and uses Pydantic for request validation. It is one of the fastest Python frameworks available.",
"category": "web-development",
"tags": ["python", "api", "fastapi"],
"url": "/articles/fastapi-guide"
},
{
"title": "Natural Language Processing Fundamentals",
"body": "NLP enables computers to understand and generate human language. Key techniques include tokenization, stemming, lemmatization, named entity recognition, and sentiment analysis. Modern NLP relies heavily on transformer models like BERT and GPT.",
"category": "nlp",
"tags": ["nlp", "transformers", "text"],
"url": "/articles/nlp-fundamentals"
},
{
"title": "Docker for Python Developers",
"body": "Docker containers package your application with all its dependencies into a reproducible unit. Use multi-stage builds to keep images small. Docker Compose orchestrates multiple services like databases and APIs. Always use .dockerignore to exclude unnecessary files.",
"category": "devops",
"tags": ["docker", "python", "deployment"],
"url": "/articles/docker-python"
}
]
Add the Index API Route
Add the indexing endpoint to app/main.py:
# Add to app/main.py
from fastapi import BackgroundTasks
from app.indexing.pipeline import run_indexing_pipeline
@app.post("/api/index")
async def index_documents(
source_path: str = "data/sample/articles.json",
background_tasks: BackgroundTasks = None
):
"""Index documents from a file or directory.
This runs synchronously for small datasets. For large datasets,
pass background=true to run in the background.
"""
stats = run_indexing_pipeline(source_path)
return {"status": "complete", "stats": stats}
Test the Pipeline
# Make sure Elasticsearch is running
docker-compose up -d elasticsearch
# Run the indexing pipeline
curl -X POST "http://localhost:8000/api/index?source_path=data/sample/articles.json"
# Expected response:
# {
# "status": "complete",
# "stats": {
# "total_documents": 5,
# "indexed": 5,
# "errors": 0,
# "elapsed_seconds": 1.23,
# "docs_per_second": 4.1
# }
# }
# Verify documents are in Elasticsearch
curl -u elastic:changeme "http://localhost:9200/ai_search_docs/_count"
# Expected: {"count":5,...}
curl -u elastic:changeme "http://localhost:9200/ai_search_docs/_search?size=1&pretty" and checking that the embedding field is a 384-element array.Key Takeaways
- The Elasticsearch mapping uses both
textfields (for BM25 keyword search) anddense_vectorfields (for kNN semantic search). - Custom analyzers with
snowballstemming improve keyword matching — "running" matches "run." - The
autocomplete_analyzerwith edge_ngram enables prefix-based autocomplete suggestions. - Embeddings are generated locally with
sentence-transformersat ~100 documents per second on a modern CPU. - The pipeline processes documents in batches for memory efficiency and progress tracking.
What Is Next
In the next lesson, you will build keyword search with BM25 — Elasticsearch queries with multi-field matching, boosting, and relevance tuning.
Lilly Tech Systems