Step 1: Document Ingestion Pipeline
The ingestion pipeline is the foundation of every RAG system. In this step, you will build a complete pipeline that loads PDF and HTML documents, splits them into semantically meaningful chunks, extracts metadata, and prepares everything for embedding in the next step.
What We Are Building
By the end of this lesson, you will have three Python modules:
app/ingestion/loader.py— Loads PDF and HTML files into raw text with page/section metadataapp/ingestion/chunker.py— Splits documents into overlapping chunks using recursive character splittingapp/ingestion/pipeline.py— Orchestrates the full ingestion flow: load → chunk → return structured documents
Document Loader Module
We support two document types: PDF files (using pypdf) and HTML files (using BeautifulSoup). Each loader extracts the text and attaches metadata about the source, page number, and file type.
# app/ingestion/loader.py
"""Document loaders for PDF and HTML files."""
import os
import logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
from pypdf import PdfReader
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
@dataclass
class RawDocument:
"""A loaded document with its text and metadata."""
text: str
metadata: dict = field(default_factory=dict)
def load_pdf(file_path: str) -> list[RawDocument]:
"""Load a PDF file and return one RawDocument per page.
Args:
file_path: Path to the PDF file.
Returns:
List of RawDocument objects, one per page.
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"PDF not found: {file_path}")
reader = PdfReader(str(file_path))
documents = []
for page_num, page in enumerate(reader.pages, start=1):
text = page.extract_text()
if not text or not text.strip():
logger.warning(f"Empty page {page_num} in {file_path.name}")
continue
documents.append(RawDocument(
text=text.strip(),
metadata={
"source": file_path.name,
"source_path": str(file_path),
"file_type": "pdf",
"page_number": page_num,
"total_pages": len(reader.pages),
}
))
logger.info(f"Loaded {len(documents)} pages from {file_path.name}")
return documents
def load_html(file_path: str) -> list[RawDocument]:
"""Load an HTML file and return a single RawDocument.
Strips HTML tags, scripts, and styles to extract clean text.
Args:
file_path: Path to the HTML file.
Returns:
List containing one RawDocument.
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"HTML file not found: {file_path}")
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
html_content = f.read()
soup = BeautifulSoup(html_content, "lxml")
# Remove script and style elements
for element in soup(["script", "style", "nav", "footer", "header"]):
element.decompose()
# Extract text
text = soup.get_text(separator="\n", strip=True)
# Extract title
title_tag = soup.find("title")
title = title_tag.get_text(strip=True) if title_tag else file_path.stem
if not text.strip():
logger.warning(f"No text content in {file_path.name}")
return []
return [RawDocument(
text=text,
metadata={
"source": file_path.name,
"source_path": str(file_path),
"file_type": "html",
"title": title,
}
)]
def load_document(file_path: str) -> list[RawDocument]:
"""Load a document based on its file extension.
Args:
file_path: Path to the document.
Returns:
List of RawDocument objects.
Raises:
ValueError: If the file type is not supported.
"""
ext = Path(file_path).suffix.lower()
if ext == ".pdf":
return load_pdf(file_path)
elif ext in (".html", ".htm"):
return load_html(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .html, .htm")
def load_directory(directory: str) -> list[RawDocument]:
"""Load all supported documents from a directory.
Args:
directory: Path to the directory containing documents.
Returns:
List of all RawDocument objects from all files.
"""
directory = Path(directory)
if not directory.is_dir():
raise NotADirectoryError(f"Not a directory: {directory}")
supported_extensions = {".pdf", ".html", ".htm"}
all_documents = []
for file_path in sorted(directory.rglob("*")):
if file_path.suffix.lower() in supported_extensions:
try:
docs = load_document(str(file_path))
all_documents.extend(docs)
except Exception as e:
logger.error(f"Failed to load {file_path}: {e}")
logger.info(f"Loaded {len(all_documents)} documents from {directory}")
return all_documents
Text Chunking Module
Chunking is the most impactful decision in a RAG pipeline. Too large and your chunks contain irrelevant information. Too small and you lose context. We use recursive character splitting with overlap to balance these concerns.
text-embedding-3-small model handles up to 8,191 tokens, but shorter chunks produce more focused embeddings. The 50-token overlap ensures that sentences split across chunk boundaries are still captured in at least one chunk.# app/ingestion/chunker.py
"""Text chunking with metadata preservation."""
import logging
from dataclasses import dataclass, field
from langchain_text_splitters import RecursiveCharacterTextSplitter
from app.config import get_settings
from app.ingestion.loader import RawDocument
logger = logging.getLogger(__name__)
settings = get_settings()
@dataclass
class Chunk:
"""A text chunk ready for embedding."""
text: str
chunk_id: str
metadata: dict = field(default_factory=dict)
def create_splitter(
chunk_size: int | None = None,
chunk_overlap: int | None = None
) -> RecursiveCharacterTextSplitter:
"""Create a text splitter with the given parameters.
Uses recursive character splitting which tries to split on
paragraph breaks first, then sentences, then words.
"""
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size or settings.chunk_size,
chunk_overlap=chunk_overlap or settings.chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""],
is_separator_regex=False,
)
def chunk_document(
document: RawDocument,
doc_index: int = 0,
splitter: RecursiveCharacterTextSplitter | None = None
) -> list[Chunk]:
"""Split a document into chunks with metadata.
Args:
document: The raw document to chunk.
doc_index: Index of this document in the batch (for unique IDs).
splitter: Optional custom splitter. Uses default if None.
Returns:
List of Chunk objects with text, IDs, and metadata.
"""
if splitter is None:
splitter = create_splitter()
text_chunks = splitter.split_text(document.text)
chunks = []
source = document.metadata.get("source", "unknown")
for i, text in enumerate(text_chunks):
chunk_id = f"{source}::chunk_{doc_index}_{i}"
# Merge document metadata with chunk-specific metadata
chunk_metadata = {
**document.metadata,
"chunk_index": i,
"total_chunks": len(text_chunks),
"chunk_id": chunk_id,
"char_count": len(text),
}
chunks.append(Chunk(
text=text,
chunk_id=chunk_id,
metadata=chunk_metadata,
))
logger.debug(f"Split '{source}' into {len(chunks)} chunks")
return chunks
def chunk_documents(documents: list[RawDocument]) -> list[Chunk]:
"""Chunk a list of documents.
Args:
documents: List of raw documents to chunk.
Returns:
List of all chunks from all documents.
"""
splitter = create_splitter()
all_chunks = []
for doc_index, doc in enumerate(documents):
chunks = chunk_document(doc, doc_index=doc_index, splitter=splitter)
all_chunks.extend(chunks)
logger.info(
f"Chunked {len(documents)} documents into {len(all_chunks)} chunks "
f"(avg {len(all_chunks) // max(len(documents), 1)} chunks/doc)"
)
return all_chunks
Ingestion Pipeline Orchestrator
The pipeline ties everything together. It loads documents from a directory, chunks them, and returns structured data ready for embedding.
# app/ingestion/pipeline.py
"""Ingestion pipeline - orchestrates loading and chunking."""
import logging
import time
from pathlib import Path
from dataclasses import dataclass
from app.ingestion.loader import load_directory, load_document, RawDocument
from app.ingestion.chunker import chunk_documents, Chunk
logger = logging.getLogger(__name__)
@dataclass
class IngestionResult:
"""Result of an ingestion run."""
total_files: int
total_pages: int
total_chunks: int
chunks: list[Chunk]
elapsed_seconds: float
errors: list[str]
def run_ingestion(
source_path: str,
verbose: bool = False
) -> IngestionResult:
"""Run the full ingestion pipeline on a file or directory.
Args:
source_path: Path to a file or directory to ingest.
verbose: If True, log detailed progress.
Returns:
IngestionResult with all chunks and statistics.
"""
start_time = time.time()
errors = []
path = Path(source_path)
logger.info(f"Starting ingestion from: {path}")
# Step 1: Load documents
try:
if path.is_dir():
raw_docs = load_directory(str(path))
elif path.is_file():
raw_docs = load_document(str(path))
else:
raise FileNotFoundError(f"Path not found: {path}")
except Exception as e:
logger.error(f"Failed to load documents: {e}")
return IngestionResult(
total_files=0, total_pages=0, total_chunks=0,
chunks=[], elapsed_seconds=time.time() - start_time,
errors=[str(e)]
)
if not raw_docs:
logger.warning("No documents loaded - check the source path")
return IngestionResult(
total_files=0, total_pages=0, total_chunks=0,
chunks=[], elapsed_seconds=time.time() - start_time,
errors=["No documents found"]
)
if verbose:
for doc in raw_docs:
logger.info(
f" Loaded: {doc.metadata.get('source', '?')} "
f"({len(doc.text)} chars)"
)
# Step 2: Chunk documents
chunks = chunk_documents(raw_docs)
elapsed = time.time() - start_time
# Count unique files
unique_files = len(set(
doc.metadata.get("source_path", "") for doc in raw_docs
))
result = IngestionResult(
total_files=unique_files,
total_pages=len(raw_docs),
total_chunks=len(chunks),
chunks=chunks,
elapsed_seconds=round(elapsed, 2),
errors=errors,
)
logger.info(
f"Ingestion complete: {result.total_files} files, "
f"{result.total_pages} pages, {result.total_chunks} chunks "
f"in {result.elapsed_seconds}s"
)
return result
Add the Ingestion API Endpoint
Now add an endpoint to app/main.py so you can trigger ingestion via the API:
# Add these imports to app/main.py
from fastapi import UploadFile, File, HTTPException
import tempfile
import shutil
from app.ingestion.pipeline import run_ingestion
@app.post("/api/ingest")
async def ingest_documents(directory: str = "data/sample"):
"""Ingest all documents from a directory.
Args:
directory: Path to directory containing PDF/HTML files.
Returns:
Ingestion statistics.
"""
try:
result = run_ingestion(directory, verbose=True)
return {
"status": "success",
"total_files": result.total_files,
"total_pages": result.total_pages,
"total_chunks": result.total_chunks,
"elapsed_seconds": result.elapsed_seconds,
"errors": result.errors,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/ingest/upload")
async def ingest_uploaded_file(file: UploadFile = File(...)):
"""Ingest a single uploaded file.
Accepts PDF or HTML files up to 50MB.
"""
allowed_types = {".pdf", ".html", ".htm"}
ext = Path(file.filename).suffix.lower()
if ext not in allowed_types:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {ext}. Allowed: {allowed_types}"
)
# Save to temp file and process
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
shutil.copyfileobj(file.file, tmp)
tmp_path = tmp.name
try:
result = run_ingestion(tmp_path, verbose=True)
return {
"status": "success",
"filename": file.filename,
"total_chunks": result.total_chunks,
"elapsed_seconds": result.elapsed_seconds,
}
finally:
os.unlink(tmp_path)
Test the Ingestion Pipeline
Create a sample HTML file and run the pipeline:
# Create a sample document
cat > data/sample/example.html << 'EOF'
<html>
<head><title>RAG Chatbot Documentation</title></head>
<body>
<h1>Getting Started</h1>
<p>This is a sample document for testing the RAG chatbot ingestion
pipeline. The chatbot uses retrieval-augmented generation to answer
questions based on your documents.</p>
<h2>Features</h2>
<p>The RAG chatbot supports PDF and HTML document ingestion,
semantic chunking, vector search with Qdrant, and streaming
responses powered by OpenAI GPT-4o-mini.</p>
<h2>Architecture</h2>
<p>The system consists of four main components: an ingestion
pipeline, a vector store, a retrieval engine, and a generation
layer. Documents flow through the ingestion pipeline where they
are chunked and embedded before being stored in Qdrant.</p>
</body>
</html>
EOF
# Test ingestion via API
curl -X POST "http://localhost:8000/api/ingest?directory=data/sample"
# Expected response:
# {
# "status": "success",
# "total_files": 1,
# "total_pages": 1,
# "total_chunks": 2,
# "elapsed_seconds": 0.01,
# "errors": []
# }
Key Takeaways
- The loader module handles PDF (page-by-page) and HTML (stripped of tags) with proper metadata extraction.
- Recursive character splitting with 512-character chunks and 50-character overlap balances precision and context.
- Every chunk carries metadata (source file, page number, chunk index) that will be used for citations later.
- The pipeline orchestrator provides timing and error reporting for monitoring ingestion health.
What Is Next
In the next lesson, you will take these chunks and generate embeddings using OpenAI's embedding API, then store them in Qdrant with full metadata payloads. That will make the documents searchable by semantic meaning.
Lilly Tech Systems