Offline Feature Store Design Intermediate

The offline store is the foundation of your feature infrastructure. It stores historical feature values used to generate training datasets and enables point-in-time correct feature retrieval — the single most important capability for preventing data leakage in ML training pipelines.

Batch Feature Computation Pipeline

Batch features are computed on a schedule (typically daily or hourly) from raw data sources. The pipeline reads from data warehouses or data lakes, applies transformations, and writes feature values with timestamps to the offline store.

Python
# Feast feature definition for batch features
from feast import Entity, FeatureView, Field, FileSource, PushSource
from feast.types import Float64, Int64, String
from datetime import timedelta

# Define the entity (primary key for feature lookup)
user = Entity(
    name="user_id",
    description="Unique user identifier",
)

# Define the data source (where raw feature data lives)
user_stats_source = FileSource(
    name="user_stats_source",
    path="s3://feature-store/user_stats/",
    timestamp_field="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Define the feature view (schema + source + TTL)
user_spending_stats = FeatureView(
    name="user_spending_stats",
    entities=[user],
    ttl=timedelta(days=90),  # Features older than 90 days are stale
    schema=[
        Field(name="avg_transaction_amount", dtype=Float64),
        Field(name="transaction_count_30d", dtype=Int64),
        Field(name="total_spend_90d", dtype=Float64),
        Field(name="avg_days_between_purchases", dtype=Float64),
        Field(name="preferred_category", dtype=String),
    ],
    source=user_stats_source,
    online=True,  # Also materialize to online store
)

Storage Backend Comparison

Backend Best For Query Speed Cost Scalability
Parquet on S3/GCS Small-medium datasets, simple setups Moderate (seconds) Very low Good
Delta Lake ACID transactions, time-travel queries Fast (Spark-optimized) Low Excellent
BigQuery GCP-native, SQL-friendly teams Fast (serverless) Pay-per-query Excellent
Snowflake Multi-cloud, existing Snowflake users Fast (warehouse-based) Moderate Excellent
Redshift AWS-native, existing Redshift clusters Fast (cluster-based) Moderate Good

Point-in-Time Correct Joins

This is the most critical concept in offline feature stores. When generating training data, you must retrieve the feature values as they were at the time of each training event — not the latest values. Using future feature values causes data leakage and produces models that perform unrealistically well in evaluation but fail in production.

Python
# Point-in-time correct training data generation with Feast
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path="feature_repo/")

# Training events: each row has an entity key + event timestamp
# The timestamp represents WHEN the prediction was made
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1001, 1003],
    "event_timestamp": [
        "2025-01-15 10:00:00",  # Get user 1001's features as of Jan 15
        "2025-01-16 14:30:00",  # Get user 1002's features as of Jan 16
        "2025-02-01 09:00:00",  # Get user 1001's features as of Feb 1
        "2025-02-10 16:00:00",  # Get user 1003's features as of Feb 10
    ],
    "label": [1, 0, 1, 0],  # What we're predicting (e.g., churn)
})

# Feast performs point-in-time correct join automatically
# For each row, it finds the latest feature value BEFORE the event_timestamp
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_spending_stats:avg_transaction_amount",
        "user_spending_stats:transaction_count_30d",
        "user_spending_stats:total_spend_90d",
        "user_activity:last_login_days_ago",
        "user_activity:session_count_7d",
    ],
).to_df()

# Result: training_df has features as they were at each event time
# User 1001 appears twice with DIFFERENT feature values
# reflecting their state on Jan 15 vs Feb 1
print(training_df)
#    user_id  event_timestamp  label  avg_transaction_amount  transaction_count_30d  ...
# 0     1001  2025-01-15 10:00     1                   45.20                     12  ...
# 1     1002  2025-01-16 14:30     0                   28.50                      5  ...
# 2     1001  2025-02-01 09:00     1                   52.10                     18  ...
# 3     1003  2025-02-10 16:00     0                   15.75                      2  ...
Data Leakage Warning: If you use a simple SQL JOIN instead of a point-in-time join, you will join each training event with the latest feature values, which include future information. This leaks future data into your training set and produces models that appear to have high accuracy but fail catastrophically in production.

Implementing Point-in-Time Joins in SQL

Under the hood, point-in-time joins use an ASOF join pattern. Here is the raw SQL equivalent for understanding what Feast does internally:

SQL
-- Point-in-time correct join in BigQuery/Snowflake
WITH ranked_features AS (
    SELECT
        e.user_id,
        e.event_timestamp,
        e.label,
        f.avg_transaction_amount,
        f.transaction_count_30d,
        f.feature_timestamp,
        -- Rank features: most recent BEFORE the event gets rank 1
        ROW_NUMBER() OVER (
            PARTITION BY e.user_id, e.event_timestamp
            ORDER BY f.feature_timestamp DESC
        ) AS feature_rank
    FROM training_events e
    LEFT JOIN user_spending_features f
        ON e.user_id = f.user_id
        AND f.feature_timestamp <= e.event_timestamp  -- NO future data!
        AND f.feature_timestamp >= e.event_timestamp - INTERVAL 90 DAY  -- TTL
)
SELECT *
FROM ranked_features
WHERE feature_rank = 1;  -- Keep only the most recent valid feature

Batch Feature Computation with Spark

Python
# Production batch feature pipeline using PySpark
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from datetime import datetime

spark = SparkSession.builder \
    .appName("feature_computation") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# Read raw transaction data
transactions = spark.read.parquet("s3://data-lake/transactions/")

# Define windows for aggregation
window_30d = Window.partitionBy("user_id") \
    .orderBy("transaction_date") \
    .rangeBetween(-30 * 86400, 0)  # 30 days in seconds

window_90d = Window.partitionBy("user_id") \
    .orderBy("transaction_date") \
    .rangeBetween(-90 * 86400, 0)

# Compute features with proper windowing
user_features = transactions \
    .withColumn("transaction_ts",
        F.col("transaction_date").cast("long")) \
    .groupBy("user_id") \
    .agg(
        F.avg("amount").alias("avg_transaction_amount"),
        F.count(
            F.when(F.col("transaction_date") >= F.date_sub(F.current_date(), 30),
                   F.lit(1))
        ).alias("transaction_count_30d"),
        F.sum(
            F.when(F.col("transaction_date") >= F.date_sub(F.current_date(), 90),
                   F.col("amount")).otherwise(0)
        ).alias("total_spend_90d"),
    ) \
    .withColumn("event_timestamp", F.current_timestamp()) \
    .withColumn("created_timestamp", F.current_timestamp())

# Write to Delta Lake (offline store)
user_features.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("event_timestamp") \
    .save("s3://feature-store/user_spending_stats/")

# Each run appends new feature values with the current timestamp
# Historical values are preserved for point-in-time correct retrieval

Storage Layout Best Practices

Text
# Recommended S3/GCS layout for offline feature store
s3://feature-store/
  user_spending_stats/
    _delta_log/             # Delta Lake transaction log
    year=2025/
      month=01/
        part-00000.parquet  # Partitioned by time for efficient queries
        part-00001.parquet
      month=02/
        part-00000.parquet
  user_activity/
    _delta_log/
    year=2025/
      month=01/
        part-00000.parquet

# Key principles:
# 1. Partition by time (year/month) for efficient range queries
# 2. Use Delta Lake for ACID transactions and time-travel
# 3. Keep feature groups in separate directories
# 4. Use Parquet for columnar compression (10-50x vs CSV)
# 5. Set retention policy to auto-delete data beyond TTL
Performance Tip: For offline stores serving more than 100 million entity-timestamp pairs, partition features by both entity hash (for parallel reads) and timestamp (for range pruning). This reduces training data generation from hours to minutes.

Ready for Online Feature Serving?

The next lesson covers designing low-latency online stores with Redis, DynamoDB, and materialization pipelines for real-time inference.

Next: Online Store Design →