Intermediate

Step 1: Document Ingestion Pipeline

The ingestion pipeline is the foundation of every RAG system. In this step, you will build a complete pipeline that loads PDF and HTML documents, splits them into semantically meaningful chunks, extracts metadata, and prepares everything for embedding in the next step.

What We Are Building

By the end of this lesson, you will have three Python modules:

  • app/ingestion/loader.py — Loads PDF and HTML files into raw text with page/section metadata
  • app/ingestion/chunker.py — Splits documents into overlapping chunks using recursive character splitting
  • app/ingestion/pipeline.py — Orchestrates the full ingestion flow: load → chunk → return structured documents

Document Loader Module

We support two document types: PDF files (using pypdf) and HTML files (using BeautifulSoup). Each loader extracts the text and attaches metadata about the source, page number, and file type.

# app/ingestion/loader.py
"""Document loaders for PDF and HTML files."""
import os
import logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional

from pypdf import PdfReader
from bs4 import BeautifulSoup

logger = logging.getLogger(__name__)


@dataclass
class RawDocument:
    """A loaded document with its text and metadata."""
    text: str
    metadata: dict = field(default_factory=dict)


def load_pdf(file_path: str) -> list[RawDocument]:
    """Load a PDF file and return one RawDocument per page.

    Args:
        file_path: Path to the PDF file.

    Returns:
        List of RawDocument objects, one per page.
    """
    file_path = Path(file_path)
    if not file_path.exists():
        raise FileNotFoundError(f"PDF not found: {file_path}")

    reader = PdfReader(str(file_path))
    documents = []

    for page_num, page in enumerate(reader.pages, start=1):
        text = page.extract_text()
        if not text or not text.strip():
            logger.warning(f"Empty page {page_num} in {file_path.name}")
            continue

        documents.append(RawDocument(
            text=text.strip(),
            metadata={
                "source": file_path.name,
                "source_path": str(file_path),
                "file_type": "pdf",
                "page_number": page_num,
                "total_pages": len(reader.pages),
            }
        ))

    logger.info(f"Loaded {len(documents)} pages from {file_path.name}")
    return documents


def load_html(file_path: str) -> list[RawDocument]:
    """Load an HTML file and return a single RawDocument.

    Strips HTML tags, scripts, and styles to extract clean text.

    Args:
        file_path: Path to the HTML file.

    Returns:
        List containing one RawDocument.
    """
    file_path = Path(file_path)
    if not file_path.exists():
        raise FileNotFoundError(f"HTML file not found: {file_path}")

    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
        html_content = f.read()

    soup = BeautifulSoup(html_content, "lxml")

    # Remove script and style elements
    for element in soup(["script", "style", "nav", "footer", "header"]):
        element.decompose()

    # Extract text
    text = soup.get_text(separator="\n", strip=True)

    # Extract title
    title_tag = soup.find("title")
    title = title_tag.get_text(strip=True) if title_tag else file_path.stem

    if not text.strip():
        logger.warning(f"No text content in {file_path.name}")
        return []

    return [RawDocument(
        text=text,
        metadata={
            "source": file_path.name,
            "source_path": str(file_path),
            "file_type": "html",
            "title": title,
        }
    )]


def load_document(file_path: str) -> list[RawDocument]:
    """Load a document based on its file extension.

    Args:
        file_path: Path to the document.

    Returns:
        List of RawDocument objects.

    Raises:
        ValueError: If the file type is not supported.
    """
    ext = Path(file_path).suffix.lower()

    if ext == ".pdf":
        return load_pdf(file_path)
    elif ext in (".html", ".htm"):
        return load_html(file_path)
    else:
        raise ValueError(f"Unsupported file type: {ext}. Supported: .pdf, .html, .htm")


def load_directory(directory: str) -> list[RawDocument]:
    """Load all supported documents from a directory.

    Args:
        directory: Path to the directory containing documents.

    Returns:
        List of all RawDocument objects from all files.
    """
    directory = Path(directory)
    if not directory.is_dir():
        raise NotADirectoryError(f"Not a directory: {directory}")

    supported_extensions = {".pdf", ".html", ".htm"}
    all_documents = []

    for file_path in sorted(directory.rglob("*")):
        if file_path.suffix.lower() in supported_extensions:
            try:
                docs = load_document(str(file_path))
                all_documents.extend(docs)
            except Exception as e:
                logger.error(f"Failed to load {file_path}: {e}")

    logger.info(f"Loaded {len(all_documents)} documents from {directory}")
    return all_documents

Text Chunking Module

Chunking is the most impactful decision in a RAG pipeline. Too large and your chunks contain irrelevant information. Too small and you lose context. We use recursive character splitting with overlap to balance these concerns.

💡
Why 512 tokens with 50 token overlap? This is the sweet spot for most use cases. The text-embedding-3-small model handles up to 8,191 tokens, but shorter chunks produce more focused embeddings. The 50-token overlap ensures that sentences split across chunk boundaries are still captured in at least one chunk.
# app/ingestion/chunker.py
"""Text chunking with metadata preservation."""
import logging
from dataclasses import dataclass, field

from langchain_text_splitters import RecursiveCharacterTextSplitter

from app.config import get_settings
from app.ingestion.loader import RawDocument

logger = logging.getLogger(__name__)
settings = get_settings()


@dataclass
class Chunk:
    """A text chunk ready for embedding."""
    text: str
    chunk_id: str
    metadata: dict = field(default_factory=dict)


def create_splitter(
    chunk_size: int | None = None,
    chunk_overlap: int | None = None
) -> RecursiveCharacterTextSplitter:
    """Create a text splitter with the given parameters.

    Uses recursive character splitting which tries to split on
    paragraph breaks first, then sentences, then words.
    """
    return RecursiveCharacterTextSplitter(
        chunk_size=chunk_size or settings.chunk_size,
        chunk_overlap=chunk_overlap or settings.chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", ". ", " ", ""],
        is_separator_regex=False,
    )


def chunk_document(
    document: RawDocument,
    doc_index: int = 0,
    splitter: RecursiveCharacterTextSplitter | None = None
) -> list[Chunk]:
    """Split a document into chunks with metadata.

    Args:
        document: The raw document to chunk.
        doc_index: Index of this document in the batch (for unique IDs).
        splitter: Optional custom splitter. Uses default if None.

    Returns:
        List of Chunk objects with text, IDs, and metadata.
    """
    if splitter is None:
        splitter = create_splitter()

    text_chunks = splitter.split_text(document.text)

    chunks = []
    source = document.metadata.get("source", "unknown")

    for i, text in enumerate(text_chunks):
        chunk_id = f"{source}::chunk_{doc_index}_{i}"

        # Merge document metadata with chunk-specific metadata
        chunk_metadata = {
            **document.metadata,
            "chunk_index": i,
            "total_chunks": len(text_chunks),
            "chunk_id": chunk_id,
            "char_count": len(text),
        }

        chunks.append(Chunk(
            text=text,
            chunk_id=chunk_id,
            metadata=chunk_metadata,
        ))

    logger.debug(f"Split '{source}' into {len(chunks)} chunks")
    return chunks


def chunk_documents(documents: list[RawDocument]) -> list[Chunk]:
    """Chunk a list of documents.

    Args:
        documents: List of raw documents to chunk.

    Returns:
        List of all chunks from all documents.
    """
    splitter = create_splitter()
    all_chunks = []

    for doc_index, doc in enumerate(documents):
        chunks = chunk_document(doc, doc_index=doc_index, splitter=splitter)
        all_chunks.extend(chunks)

    logger.info(
        f"Chunked {len(documents)} documents into {len(all_chunks)} chunks "
        f"(avg {len(all_chunks) // max(len(documents), 1)} chunks/doc)"
    )
    return all_chunks

Ingestion Pipeline Orchestrator

The pipeline ties everything together. It loads documents from a directory, chunks them, and returns structured data ready for embedding.

# app/ingestion/pipeline.py
"""Ingestion pipeline - orchestrates loading and chunking."""
import logging
import time
from pathlib import Path
from dataclasses import dataclass

from app.ingestion.loader import load_directory, load_document, RawDocument
from app.ingestion.chunker import chunk_documents, Chunk

logger = logging.getLogger(__name__)


@dataclass
class IngestionResult:
    """Result of an ingestion run."""
    total_files: int
    total_pages: int
    total_chunks: int
    chunks: list[Chunk]
    elapsed_seconds: float
    errors: list[str]


def run_ingestion(
    source_path: str,
    verbose: bool = False
) -> IngestionResult:
    """Run the full ingestion pipeline on a file or directory.

    Args:
        source_path: Path to a file or directory to ingest.
        verbose: If True, log detailed progress.

    Returns:
        IngestionResult with all chunks and statistics.
    """
    start_time = time.time()
    errors = []

    path = Path(source_path)
    logger.info(f"Starting ingestion from: {path}")

    # Step 1: Load documents
    try:
        if path.is_dir():
            raw_docs = load_directory(str(path))
        elif path.is_file():
            raw_docs = load_document(str(path))
        else:
            raise FileNotFoundError(f"Path not found: {path}")
    except Exception as e:
        logger.error(f"Failed to load documents: {e}")
        return IngestionResult(
            total_files=0, total_pages=0, total_chunks=0,
            chunks=[], elapsed_seconds=time.time() - start_time,
            errors=[str(e)]
        )

    if not raw_docs:
        logger.warning("No documents loaded - check the source path")
        return IngestionResult(
            total_files=0, total_pages=0, total_chunks=0,
            chunks=[], elapsed_seconds=time.time() - start_time,
            errors=["No documents found"]
        )

    if verbose:
        for doc in raw_docs:
            logger.info(
                f"  Loaded: {doc.metadata.get('source', '?')} "
                f"({len(doc.text)} chars)"
            )

    # Step 2: Chunk documents
    chunks = chunk_documents(raw_docs)

    elapsed = time.time() - start_time

    # Count unique files
    unique_files = len(set(
        doc.metadata.get("source_path", "") for doc in raw_docs
    ))

    result = IngestionResult(
        total_files=unique_files,
        total_pages=len(raw_docs),
        total_chunks=len(chunks),
        chunks=chunks,
        elapsed_seconds=round(elapsed, 2),
        errors=errors,
    )

    logger.info(
        f"Ingestion complete: {result.total_files} files, "
        f"{result.total_pages} pages, {result.total_chunks} chunks "
        f"in {result.elapsed_seconds}s"
    )

    return result

Add the Ingestion API Endpoint

Now add an endpoint to app/main.py so you can trigger ingestion via the API:

# Add these imports to app/main.py
from fastapi import UploadFile, File, HTTPException
import tempfile
import shutil
from app.ingestion.pipeline import run_ingestion


@app.post("/api/ingest")
async def ingest_documents(directory: str = "data/sample"):
    """Ingest all documents from a directory.

    Args:
        directory: Path to directory containing PDF/HTML files.

    Returns:
        Ingestion statistics.
    """
    try:
        result = run_ingestion(directory, verbose=True)
        return {
            "status": "success",
            "total_files": result.total_files,
            "total_pages": result.total_pages,
            "total_chunks": result.total_chunks,
            "elapsed_seconds": result.elapsed_seconds,
            "errors": result.errors,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/api/ingest/upload")
async def ingest_uploaded_file(file: UploadFile = File(...)):
    """Ingest a single uploaded file.

    Accepts PDF or HTML files up to 50MB.
    """
    allowed_types = {".pdf", ".html", ".htm"}
    ext = Path(file.filename).suffix.lower()

    if ext not in allowed_types:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported file type: {ext}. Allowed: {allowed_types}"
        )

    # Save to temp file and process
    with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
        shutil.copyfileobj(file.file, tmp)
        tmp_path = tmp.name

    try:
        result = run_ingestion(tmp_path, verbose=True)
        return {
            "status": "success",
            "filename": file.filename,
            "total_chunks": result.total_chunks,
            "elapsed_seconds": result.elapsed_seconds,
        }
    finally:
        os.unlink(tmp_path)

Test the Ingestion Pipeline

Create a sample HTML file and run the pipeline:

# Create a sample document
cat > data/sample/example.html << 'EOF'
<html>
<head><title>RAG Chatbot Documentation</title></head>
<body>
<h1>Getting Started</h1>
<p>This is a sample document for testing the RAG chatbot ingestion
pipeline. The chatbot uses retrieval-augmented generation to answer
questions based on your documents.</p>

<h2>Features</h2>
<p>The RAG chatbot supports PDF and HTML document ingestion,
semantic chunking, vector search with Qdrant, and streaming
responses powered by OpenAI GPT-4o-mini.</p>

<h2>Architecture</h2>
<p>The system consists of four main components: an ingestion
pipeline, a vector store, a retrieval engine, and a generation
layer. Documents flow through the ingestion pipeline where they
are chunked and embedded before being stored in Qdrant.</p>
</body>
</html>
EOF

# Test ingestion via API
curl -X POST "http://localhost:8000/api/ingest?directory=data/sample"

# Expected response:
# {
#   "status": "success",
#   "total_files": 1,
#   "total_pages": 1,
#   "total_chunks": 2,
#   "elapsed_seconds": 0.01,
#   "errors": []
# }

Key Takeaways

  • The loader module handles PDF (page-by-page) and HTML (stripped of tags) with proper metadata extraction.
  • Recursive character splitting with 512-character chunks and 50-character overlap balances precision and context.
  • Every chunk carries metadata (source file, page number, chunk index) that will be used for citations later.
  • The pipeline orchestrator provides timing and error reporting for monitoring ingestion health.

What Is Next

In the next lesson, you will take these chunks and generate embeddings using OpenAI's embedding API, then store them in Qdrant with full metadata payloads. That will make the documents searchable by semantic meaning.