Offline Feature Store Design Intermediate
The offline store is the foundation of your feature infrastructure. It stores historical feature values used to generate training datasets and enables point-in-time correct feature retrieval — the single most important capability for preventing data leakage in ML training pipelines.
Batch Feature Computation Pipeline
Batch features are computed on a schedule (typically daily or hourly) from raw data sources. The pipeline reads from data warehouses or data lakes, applies transformations, and writes feature values with timestamps to the offline store.
# Feast feature definition for batch features from feast import Entity, FeatureView, Field, FileSource, PushSource from feast.types import Float64, Int64, String from datetime import timedelta # Define the entity (primary key for feature lookup) user = Entity( name="user_id", description="Unique user identifier", ) # Define the data source (where raw feature data lives) user_stats_source = FileSource( name="user_stats_source", path="s3://feature-store/user_stats/", timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) # Define the feature view (schema + source + TTL) user_spending_stats = FeatureView( name="user_spending_stats", entities=[user], ttl=timedelta(days=90), # Features older than 90 days are stale schema=[ Field(name="avg_transaction_amount", dtype=Float64), Field(name="transaction_count_30d", dtype=Int64), Field(name="total_spend_90d", dtype=Float64), Field(name="avg_days_between_purchases", dtype=Float64), Field(name="preferred_category", dtype=String), ], source=user_stats_source, online=True, # Also materialize to online store )
Storage Backend Comparison
| Backend | Best For | Query Speed | Cost | Scalability |
|---|---|---|---|---|
| Parquet on S3/GCS | Small-medium datasets, simple setups | Moderate (seconds) | Very low | Good |
| Delta Lake | ACID transactions, time-travel queries | Fast (Spark-optimized) | Low | Excellent |
| BigQuery | GCP-native, SQL-friendly teams | Fast (serverless) | Pay-per-query | Excellent |
| Snowflake | Multi-cloud, existing Snowflake users | Fast (warehouse-based) | Moderate | Excellent |
| Redshift | AWS-native, existing Redshift clusters | Fast (cluster-based) | Moderate | Good |
Point-in-Time Correct Joins
This is the most critical concept in offline feature stores. When generating training data, you must retrieve the feature values as they were at the time of each training event — not the latest values. Using future feature values causes data leakage and produces models that perform unrealistically well in evaluation but fail in production.
# Point-in-time correct training data generation with Feast from feast import FeatureStore import pandas as pd store = FeatureStore(repo_path="feature_repo/") # Training events: each row has an entity key + event timestamp # The timestamp represents WHEN the prediction was made entity_df = pd.DataFrame({ "user_id": [1001, 1002, 1001, 1003], "event_timestamp": [ "2025-01-15 10:00:00", # Get user 1001's features as of Jan 15 "2025-01-16 14:30:00", # Get user 1002's features as of Jan 16 "2025-02-01 09:00:00", # Get user 1001's features as of Feb 1 "2025-02-10 16:00:00", # Get user 1003's features as of Feb 10 ], "label": [1, 0, 1, 0], # What we're predicting (e.g., churn) }) # Feast performs point-in-time correct join automatically # For each row, it finds the latest feature value BEFORE the event_timestamp training_df = store.get_historical_features( entity_df=entity_df, features=[ "user_spending_stats:avg_transaction_amount", "user_spending_stats:transaction_count_30d", "user_spending_stats:total_spend_90d", "user_activity:last_login_days_ago", "user_activity:session_count_7d", ], ).to_df() # Result: training_df has features as they were at each event time # User 1001 appears twice with DIFFERENT feature values # reflecting their state on Jan 15 vs Feb 1 print(training_df) # user_id event_timestamp label avg_transaction_amount transaction_count_30d ... # 0 1001 2025-01-15 10:00 1 45.20 12 ... # 1 1002 2025-01-16 14:30 0 28.50 5 ... # 2 1001 2025-02-01 09:00 1 52.10 18 ... # 3 1003 2025-02-10 16:00 0 15.75 2 ...
Implementing Point-in-Time Joins in SQL
Under the hood, point-in-time joins use an ASOF join pattern. Here is the raw SQL equivalent for understanding what Feast does internally:
-- Point-in-time correct join in BigQuery/Snowflake WITH ranked_features AS ( SELECT e.user_id, e.event_timestamp, e.label, f.avg_transaction_amount, f.transaction_count_30d, f.feature_timestamp, -- Rank features: most recent BEFORE the event gets rank 1 ROW_NUMBER() OVER ( PARTITION BY e.user_id, e.event_timestamp ORDER BY f.feature_timestamp DESC ) AS feature_rank FROM training_events e LEFT JOIN user_spending_features f ON e.user_id = f.user_id AND f.feature_timestamp <= e.event_timestamp -- NO future data! AND f.feature_timestamp >= e.event_timestamp - INTERVAL 90 DAY -- TTL ) SELECT * FROM ranked_features WHERE feature_rank = 1; -- Keep only the most recent valid feature
Batch Feature Computation with Spark
# Production batch feature pipeline using PySpark from pyspark.sql import SparkSession, Window from pyspark.sql import functions as F from datetime import datetime spark = SparkSession.builder \ .appName("feature_computation") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .getOrCreate() # Read raw transaction data transactions = spark.read.parquet("s3://data-lake/transactions/") # Define windows for aggregation window_30d = Window.partitionBy("user_id") \ .orderBy("transaction_date") \ .rangeBetween(-30 * 86400, 0) # 30 days in seconds window_90d = Window.partitionBy("user_id") \ .orderBy("transaction_date") \ .rangeBetween(-90 * 86400, 0) # Compute features with proper windowing user_features = transactions \ .withColumn("transaction_ts", F.col("transaction_date").cast("long")) \ .groupBy("user_id") \ .agg( F.avg("amount").alias("avg_transaction_amount"), F.count( F.when(F.col("transaction_date") >= F.date_sub(F.current_date(), 30), F.lit(1)) ).alias("transaction_count_30d"), F.sum( F.when(F.col("transaction_date") >= F.date_sub(F.current_date(), 90), F.col("amount")).otherwise(0) ).alias("total_spend_90d"), ) \ .withColumn("event_timestamp", F.current_timestamp()) \ .withColumn("created_timestamp", F.current_timestamp()) # Write to Delta Lake (offline store) user_features.write \ .format("delta") \ .mode("append") \ .partitionBy("event_timestamp") \ .save("s3://feature-store/user_spending_stats/") # Each run appends new feature values with the current timestamp # Historical values are preserved for point-in-time correct retrieval
Storage Layout Best Practices
# Recommended S3/GCS layout for offline feature store s3://feature-store/ user_spending_stats/ _delta_log/ # Delta Lake transaction log year=2025/ month=01/ part-00000.parquet # Partitioned by time for efficient queries part-00001.parquet month=02/ part-00000.parquet user_activity/ _delta_log/ year=2025/ month=01/ part-00000.parquet # Key principles: # 1. Partition by time (year/month) for efficient range queries # 2. Use Delta Lake for ACID transactions and time-travel # 3. Keep feature groups in separate directories # 4. Use Parquet for columnar compression (10-50x vs CSV) # 5. Set retention policy to auto-delete data beyond TTL
Ready for Online Feature Serving?
The next lesson covers designing low-latency online stores with Redis, DynamoDB, and materialization pipelines for real-time inference.
Next: Online Store Design →
Lilly Tech Systems