|
| 1 | +# Tiling with Intermediate Representations in Feast |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +**Tiling** is an optimization technique for **streaming time-windowed aggregations** that enables massively efficient feature computation by pre-aggregating data into smaller time intervals (tiles) and storing **Intermediate Representations (IRs)** for correct merging. |
| 6 | + |
| 7 | +**Primary Use Case: Streaming** |
| 8 | + |
| 9 | +Tiling provides **speedup** for streaming scenarios where features are updated frequently (every few minutes) from sources like Kafka, Kinesis, or PushSource. |
| 10 | + |
| 11 | +**Key Benefits (Streaming):** |
| 12 | +- **Faster**: Reuse 90%+ of tiles between updates instead of recomputing from scratch |
| 13 | +- **Correct results**: IRs ensure mathematically accurate merging for all aggregation types |
| 14 | +- **Memory efficient**: Only process new events, reuse previous tiles in memory |
| 15 | +- **Real-time capable**: Handle high-throughput streaming with low latency |
| 16 | +- **Incremental updates**: Compute 1 new tile instead of rescanning entire window |
| 17 | + |
| 18 | +--- |
| 19 | + |
| 20 | +## The Problem: Why Intermediate Representations? |
| 21 | + |
| 22 | +Traditional approaches to time-windowed aggregations either: |
| 23 | +1. **Recompute from raw data** every time → Slow, expensive |
| 24 | +2. **Store final aggregated values** per tile → Fast but often **incorrect** when merging |
| 25 | + |
| 26 | +### The Merging Problem |
| 27 | + |
| 28 | +You **cannot correctly merge** many common aggregations: |
| 29 | + |
| 30 | +``` |
| 31 | +WRONG: avg(tile1, tile2) ≠ (avg_tile1 + avg_tile2) / 2 |
| 32 | +
|
| 33 | +Example: |
| 34 | + tile1: [10, 20, 30] → avg = 20 |
| 35 | + tile2: [100] → avg = 100 |
| 36 | + |
| 37 | + Correct merged avg: (10+20+30+100) / 4 = 40 |
| 38 | + Wrong merged avg: (20 + 100) / 2 = 60 |
| 39 | +``` |
| 40 | + |
| 41 | +**The same problem exists for:** |
| 42 | +- Standard deviation (`std`) |
| 43 | +- Variance (`var`) |
| 44 | +- Median and percentiles |
| 45 | +- Any "holistic" aggregation that requires knowledge of all values |
| 46 | + |
| 47 | +--- |
| 48 | + |
| 49 | +## The Solution: Intermediate Representations (IRs) |
| 50 | + |
| 51 | +Instead of storing **final aggregated values**, store **intermediate data** that preserves the mathematical properties needed for correct merging. |
| 52 | + |
| 53 | +### Example: Average |
| 54 | + |
| 55 | +**Traditional (Incorrect)**: |
| 56 | +``` |
| 57 | +Tile 1: avg = 20 |
| 58 | +Tile 2: avg = 20 |
| 59 | +Merged avg = (20 + 20) / 2 = 20 - WRONG |
| 60 | +``` |
| 61 | + |
| 62 | +**With IRs (Correct)**: |
| 63 | +``` |
| 64 | +Tile 1: sum = 60, count = 3 |
| 65 | +Tile 2: sum = 100, count = 1 |
| 66 | +Merged: sum = 160, count = 4 |
| 67 | +Merged avg = 160 / 4 = 40 - CORRECT |
| 68 | +``` |
| 69 | + |
| 70 | +--- |
| 71 | + |
| 72 | +## Aggregation Categories |
| 73 | + |
| 74 | +### Algebraic Aggregations |
| 75 | + |
| 76 | +These can be merged by applying the same aggregation function to tiles: |
| 77 | + |
| 78 | +| Aggregation | Stored Value | Merge Strategy | Storage | |
| 79 | +|-------------|--------------|----------------|---------| |
| 80 | +| `sum` | sum | `sum(tile_sums)` | 1 column | |
| 81 | +| `count` | count | `sum(tile_counts)` | 1 column | |
| 82 | +| `max` | max | `max(tile_maxes)` | 1 column | |
| 83 | +| `min` | min | `min(tile_mins)` | 1 column | |
| 84 | + |
| 85 | +**No IRs needed** - the final value is the IR! |
| 86 | + |
| 87 | +--- |
| 88 | + |
| 89 | +### Holistic Aggregations |
| 90 | + |
| 91 | +These require storing multiple intermediate values: |
| 92 | + |
| 93 | +#### Average (`avg`, `mean`) |
| 94 | + |
| 95 | +**Stored IRs**: `sum`, `count` |
| 96 | +**Final computation**: `avg = sum / count` |
| 97 | +**Merge strategy**: Sum the sums and counts, then divide |
| 98 | + |
| 99 | +**Storage**: 3 columns (final + 2 IRs) |
| 100 | + |
| 101 | +--- |
| 102 | + |
| 103 | +#### Standard Deviation (`std`, `stddev`) |
| 104 | + |
| 105 | +**Stored IRs**: `count`, `sum`, `sum_of_squares` |
| 106 | +**Final computation**: |
| 107 | +```python |
| 108 | +variance = (sum_sq - sum²/count) / (count - δ) |
| 109 | +std = sqrt(variance) |
| 110 | +# δ = 1 for sample, 0 for population |
| 111 | +``` |
| 112 | + |
| 113 | +**Merge strategy**: Sum all three IRs, then apply formula |
| 114 | + |
| 115 | +**Storage**: 4 columns (final + 3 IRs) |
| 116 | + |
| 117 | +--- |
| 118 | + |
| 119 | +#### Variance (`var`, `variance`) |
| 120 | + |
| 121 | +**Stored IRs**: `count`, `sum`, `sum_of_squares` |
| 122 | +**Final computation**: Same as std but without `sqrt()` |
| 123 | + |
| 124 | +**Storage**: 4 columns (final + 3 IRs) |
| 125 | + |
| 126 | +--- |
| 127 | + |
| 128 | +## How Tiling Works |
| 129 | + |
| 130 | +Tiling is optimized for **streaming scenarios** with frequent updates (e.g., every few minutes). |
| 131 | + |
| 132 | +### 1. Continuous Tile Updates |
| 133 | + |
| 134 | +``` |
| 135 | +Stream Events → Partition by Hop Intervals → Compute IRs → Store Windowed Aggregations |
| 136 | + | | | | |
| 137 | + | | | └─> Online Store (Redis, etc.) |
| 138 | + | | └─> avg_sum, avg_count, std_sum_sq, etc. |
| 139 | + | └─> 5-min hops: [00:00-00:05], [00:05-00:10], ... |
| 140 | + └─> customer_id=1: [txn1, txn2, txn3, ...] |
| 141 | +
|
| 142 | +Every 5 minutes: |
| 143 | +- New events arrive |
| 144 | +- Only 1 new tile computed (5 min of data) |
| 145 | +- 11 previous tiles reused (in memory during streaming session) |
| 146 | +- Final aggregation = merge 12 tiles (1 new + 11 reused) |
| 147 | +``` |
| 148 | + |
| 149 | +**Why It's Fast:** |
| 150 | +- **Without tiling:** Scan entire 1-hour window (1000+ events) every 5 minutes |
| 151 | +- **With tiling:** Only process 5 minutes of new events, reuse previous tiles |
| 152 | +- **Speedup:** Faster for streaming updates! |
| 153 | + |
| 154 | +--- |
| 155 | + |
| 156 | +### 2. Streaming Update Efficiency |
| 157 | + |
| 158 | +| Update | Without Tiling | With Tiling | Tile Reuse | |
| 159 | +|--------|---------------|-------------|------------| |
| 160 | +| T=00:00 | Compute 1hr | Compute 12 tiles | 0% reuse (initial) | |
| 161 | +| T=00:05 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse | |
| 162 | +| T=00:10 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse | |
| 163 | +| T=00:15 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse | |
| 164 | + |
| 165 | +**Key Benefit:** Tiles stay in memory during the streaming session, enabling massive reuse. |
| 166 | + |
| 167 | +--- |
| 168 | + |
| 169 | +## Tiling Algorithm |
| 170 | + |
| 171 | +### Sawtooth Window Tiling |
| 172 | + |
| 173 | +1. **Partition events** into hop-sized intervals (e.g., 5 minutes) |
| 174 | +2. **Compute cumulative tail aggregations** for each hop from the start of the materialization window |
| 175 | +3. **Subtract tiles** to form windowed aggregations (current_tile - previous_tile) |
| 176 | +4. **Store IRs** for correct merging of holistic aggregations |
| 177 | +5. **At materialization**, store windowed aggregations in online store |
| 178 | + |
| 179 | +**Benefits**: |
| 180 | +- Efficient query-time performance (pre-computed windows) |
| 181 | +- Minimal storage overhead (only hop-sized tiles) |
| 182 | +- Mathematically correct for all aggregation types |
| 183 | + |
| 184 | +--- |
| 185 | + |
| 186 | +## Configuration |
| 187 | + |
| 188 | +### Recommended: StreamFeatureView (Streaming Scenarios) |
| 189 | + |
| 190 | +Tiling provides **maximum benefit for streaming scenarios** with frequent updates: |
| 191 | + |
| 192 | +```python |
| 193 | +from feast import StreamFeatureView, Aggregation |
| 194 | +from feast.data_source import PushSource, KafkaSource |
| 195 | +from datetime import timedelta |
| 196 | + |
| 197 | +# Example with Kafka streaming source |
| 198 | +customer_features = StreamFeatureView( |
| 199 | + name="customer_transaction_features", |
| 200 | + entities=[customer], |
| 201 | + source=KafkaSource( |
| 202 | + name="transactions_stream", |
| 203 | + kafka_bootstrap_servers="localhost:9092", |
| 204 | + topic="transactions", |
| 205 | + timestamp_field="event_timestamp", |
| 206 | + batch_source=file_source, # For historical data |
| 207 | + ), |
| 208 | + aggregations=[ |
| 209 | + Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)), |
| 210 | + Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)), |
| 211 | + Aggregation(column="amount", function="std", time_window=timedelta(hours=1)), |
| 212 | + ], |
| 213 | + timestamp_field="event_timestamp", |
| 214 | + online=True, |
| 215 | + |
| 216 | + # Tiling configuration |
| 217 | + enable_tiling=True, # speedup for streaming! |
| 218 | + tiling_hop_size=timedelta(minutes=5), # Update frequency |
| 219 | +) |
| 220 | +``` |
| 221 | + |
| 222 | +**When to Enable:** |
| 223 | +- Streaming data sources (Kafka, Kinesis, PushSource) |
| 224 | +- Frequent updates (every few minutes) |
| 225 | +- Real-time feature serving |
| 226 | +- High-throughput event processing |
| 227 | + |
| 228 | +--- |
| 229 | + |
| 230 | +### Key Parameters |
| 231 | + |
| 232 | +- `aggregations`: List of time-windowed aggregations to compute |
| 233 | +- `timestamp_field`: Column name for timestamps (required when aggregations are specified) |
| 234 | +- `enable_tiling`: Enable tiling optimization (default: `False`) |
| 235 | + - Set to `True` for **streaming scenarios** |
| 236 | +- `tiling_hop_size`: Time interval between tiles (default: 5 minutes) |
| 237 | + - Smaller = more granular tiles, potentially higher memory during processing window |
| 238 | + - Larger = less granular tiles, potentially lower memory during processing window |
| 239 | + |
| 240 | +### Compute Engine Requirements |
| 241 | + |
| 242 | +- **Spark Compute Engine**: Fully supported for streaming and batch |
| 243 | +- **Ray Compute Engine**: Fully supported for streaming and batch |
| 244 | +- **Local Compute Engine**: Does NOT support time-windowed aggregations |
| 245 | + |
| 246 | +--- |
| 247 | + |
| 248 | +## Architecture |
| 249 | + |
| 250 | +Tiling in Feast uses a **simple, pure pandas architecture** that works with any compute engine: |
| 251 | + |
| 252 | +### How It Works |
| 253 | + |
| 254 | +``` |
| 255 | +┌─────────────────┐ |
| 256 | +│ Engine DataFrame│ (Spark/Ray/etc) |
| 257 | +└────────┬────────┘ |
| 258 | + │ .toPandas() / .to_pandas() |
| 259 | + ▼ |
| 260 | +┌─────────────────┐ |
| 261 | +│ Pandas DataFrame│ |
| 262 | +└────────┬────────┘ |
| 263 | + │ orchestrator.apply_sawtooth_window_tiling() |
| 264 | + ▼ |
| 265 | +┌─────────────────┐ |
| 266 | +│ Cumulative │ (pandas with _tile_start, _tile_end, IRs) |
| 267 | +│ Tiles │ |
| 268 | +└────────┬────────┘ |
| 269 | + │ tile_subtraction.convert_cumulative_to_windowed() |
| 270 | + ▼ |
| 271 | +┌─────────────────┐ |
| 272 | +│ Windowed │ (pandas with final aggregations) |
| 273 | +│ Aggregations │ |
| 274 | +└────────┬────────┘ |
| 275 | + │ spark.createDataFrame() / ray.from_pandas() |
| 276 | + ▼ |
| 277 | +┌─────────────────┐ |
| 278 | +│ Engine DataFrame│ |
| 279 | +└─────────────────┘ |
| 280 | +``` |
| 281 | + |
| 282 | + |
| 283 | +## Summary |
| 284 | + |
| 285 | +Tiling with Intermediate Representations provides a powerful optimization for **streaming time-windowed aggregations** in Feast. |
0 commit comments