Advanced

API & Webhook Integration

Build production REST API endpoints with authentication, rate limiting, webhook notifications for moderation decisions, and integration guides for common platforms.

Complete Moderation API

# app/api/routes.py
import time
import logging
from fastapi import APIRouter, HTTPException, UploadFile, File, Depends
from pydantic import BaseModel
from app.moderation.text import TextModerator
from app.moderation.image import ImageModerator
from app.policy.engine import PolicyEngine
from app.review.queue import review_queue

logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1")
text_mod = TextModerator()
image_mod = ImageModerator()
policy = PolicyEngine()


class TextRequest(BaseModel):
    content: str
    content_id: str = ""
    metadata: dict = {}


class ModerationResponse(BaseModel):
    content_id: str
    action: str
    severity_score: float
    categories: list[str]
    reasons: list[str]
    review_id: str | None = None
    processing_time_ms: int


@router.post("/moderate/text", response_model=ModerationResponse)
async def moderate_text(req: TextRequest):
    start = time.time()

    # Run text moderation
    result = text_mod.moderate(req.content)

    # Apply policy
    decision = policy.evaluate(
        result.scores, result.pii_found, result.custom_matches
    )

    # Queue for review if needed
    review_id = None
    if decision.action.value == "review":
        item = review_queue.add(
            content_id=req.content_id, content_type="text",
            preview=req.content[:500], severity=decision.severity_score,
            categories=decision.triggered_rules, reasons=decision.reasons,
        )
        review_id = item.id

    elapsed = int((time.time() - start) * 1000)

    return ModerationResponse(
        content_id=req.content_id, action=decision.action.value,
        severity_score=decision.severity_score,
        categories=decision.triggered_rules,
        reasons=decision.reasons, review_id=review_id,
        processing_time_ms=elapsed,
    )


@router.post("/moderate/image", response_model=ModerationResponse)
async def moderate_image(file: UploadFile = File(...), content_id: str = ""):
    start = time.time()
    import tempfile, os
    with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp:
        tmp.write(await file.read())
        tmp_path = tmp.name

    try:
        result = image_mod.moderate(tmp_path)
        decision = policy.evaluate(result.categories)

        review_id = None
        if decision.action.value == "review":
            item = review_queue.add(
                content_id=content_id, content_type="image",
                preview=f"[Image: {file.filename}] OCR: {result.ocr_text[:200]}",
                severity=decision.severity_score,
                categories=decision.triggered_rules, reasons=decision.reasons,
            )
            review_id = item.id

        elapsed = int((time.time() - start) * 1000)
        return ModerationResponse(
            content_id=content_id, action=decision.action.value,
            severity_score=decision.severity_score,
            categories=decision.triggered_rules,
            reasons=decision.reasons, review_id=review_id,
            processing_time_ms=elapsed,
        )
    finally:
        os.unlink(tmp_path)

Webhook Notifications

# app/api/webhooks.py
import httpx
import logging
from app.config import get_settings

logger = logging.getLogger(__name__)
settings = get_settings()


async def send_webhook(event: str, data: dict):
    if not settings.webhook_url:
        return

    payload = {"event": event, "data": data}
    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                settings.webhook_url,
                json=payload,
                timeout=10.0,
            )
            logger.info(f"Webhook sent: {event} -> {response.status_code}")
    except Exception as e:
        logger.error(f"Webhook failed: {e}")


# Usage in moderation endpoints:
# await send_webhook("content.rejected", {
#     "content_id": content_id,
#     "reason": decision.reasons,
# })

Rate Limiting

# Simple in-memory rate limiter
from collections import defaultdict
import time

class RateLimiter:
    def __init__(self, requests_per_minute: int = 60):
        self.rpm = requests_per_minute
        self.requests: dict[str, list[float]] = defaultdict(list)

    def check(self, client_id: str) -> bool:
        now = time.time()
        minute_ago = now - 60
        self.requests[client_id] = [
            t for t in self.requests[client_id] if t > minute_ago
        ]
        if len(self.requests[client_id]) >= self.rpm:
            return False
        self.requests[client_id].append(now)
        return True
💡
Integration patterns: Common integrations: (1) Pre-publish hook in your CMS, (2) Slack bot that moderates channel messages, (3) API gateway filter for user-generated content, (4) Email filter for support ticket screening.

Key Takeaways

  • REST API with typed request/response models provides clean integration for any platform.
  • Webhook notifications alert downstream systems of moderation decisions in real time.
  • Rate limiting prevents abuse and controls API costs in production.
  • Processing time tracking helps monitor and optimize moderation latency.