Skip to content

Commit c980db9

Browse files
ntkatholejfw-ppi
authored andcommitted
feat: Implemented Tiling Support for Time-Windowed Aggregations (#5724)
1 parent e0c629f commit c980db9

File tree

19 files changed

+1139
-15
lines changed

19 files changed

+1139
-15
lines changed

docs/getting-started/concepts/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
[stream-feature-view.md](stream-feature-view.md)
2929
{% endcontent-ref %}
3030

31+
{% content-ref url="tiling.md" %}
32+
[tiling.md](tiling.md)
33+
{% endcontent-ref %}
34+
3135
{% content-ref url="feature-retrieval.md" %}
3236
[feature-retrieval.md](feature-retrieval.md)
3337
{% endcontent-ref %}

docs/getting-started/concepts/stream-feature-view.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@
1313

1414
- **Aggregations**: Define time-windowed aggregations (e.g., `sum`, `avg`) over event-timestamped data.
1515

16+
- **⚡ Tiling with Intermediate Representations**: Enable efficient pre-aggregation with correct merging semantics for holistic aggregations like `avg` and `std`. This provides faster queries while maintaining mathematical accuracy. [Learn more about tiling](tiling.md)
17+
1618
- **Feature resolution & execution**: Automatically resolves and executes dependent views during materialization or retrieval.
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
# Tiling with Intermediate Representations in Feast
2+
3+
## Overview
4+
5+
**Tiling** is an optimization technique for **streaming time-windowed aggregations** that enables massively efficient feature computation by pre-aggregating data into smaller time intervals (tiles) and storing **Intermediate Representations (IRs)** for correct merging.
6+
7+
**Primary Use Case: Streaming**
8+
9+
Tiling provides **speedup** for streaming scenarios where features are updated frequently (every few minutes) from sources like Kafka, Kinesis, or PushSource.
10+
11+
**Key Benefits (Streaming):**
12+
- **Faster**: Reuse 90%+ of tiles between updates instead of recomputing from scratch
13+
- **Correct results**: IRs ensure mathematically accurate merging for all aggregation types
14+
- **Memory efficient**: Only process new events, reuse previous tiles in memory
15+
- **Real-time capable**: Handle high-throughput streaming with low latency
16+
- **Incremental updates**: Compute 1 new tile instead of rescanning entire window
17+
18+
---
19+
20+
## The Problem: Why Intermediate Representations?
21+
22+
Traditional approaches to time-windowed aggregations either:
23+
1. **Recompute from raw data** every time → Slow, expensive
24+
2. **Store final aggregated values** per tile → Fast but often **incorrect** when merging
25+
26+
### The Merging Problem
27+
28+
You **cannot correctly merge** many common aggregations:
29+
30+
```
31+
WRONG: avg(tile1, tile2) ≠ (avg_tile1 + avg_tile2) / 2
32+
33+
Example:
34+
tile1: [10, 20, 30] → avg = 20
35+
tile2: [100] → avg = 100
36+
37+
Correct merged avg: (10+20+30+100) / 4 = 40
38+
Wrong merged avg: (20 + 100) / 2 = 60
39+
```
40+
41+
**The same problem exists for:**
42+
- Standard deviation (`std`)
43+
- Variance (`var`)
44+
- Median and percentiles
45+
- Any "holistic" aggregation that requires knowledge of all values
46+
47+
---
48+
49+
## The Solution: Intermediate Representations (IRs)
50+
51+
Instead of storing **final aggregated values**, store **intermediate data** that preserves the mathematical properties needed for correct merging.
52+
53+
### Example: Average
54+
55+
**Traditional (Incorrect)**:
56+
```
57+
Tile 1: avg = 20
58+
Tile 2: avg = 20
59+
Merged avg = (20 + 20) / 2 = 20 - WRONG
60+
```
61+
62+
**With IRs (Correct)**:
63+
```
64+
Tile 1: sum = 60, count = 3
65+
Tile 2: sum = 100, count = 1
66+
Merged: sum = 160, count = 4
67+
Merged avg = 160 / 4 = 40 - CORRECT
68+
```
69+
70+
---
71+
72+
## Aggregation Categories
73+
74+
### Algebraic Aggregations
75+
76+
These can be merged by applying the same aggregation function to tiles:
77+
78+
| Aggregation | Stored Value | Merge Strategy | Storage |
79+
|-------------|--------------|----------------|---------|
80+
| `sum` | sum | `sum(tile_sums)` | 1 column |
81+
| `count` | count | `sum(tile_counts)` | 1 column |
82+
| `max` | max | `max(tile_maxes)` | 1 column |
83+
| `min` | min | `min(tile_mins)` | 1 column |
84+
85+
**No IRs needed** - the final value is the IR!
86+
87+
---
88+
89+
### Holistic Aggregations
90+
91+
These require storing multiple intermediate values:
92+
93+
#### Average (`avg`, `mean`)
94+
95+
**Stored IRs**: `sum`, `count`
96+
**Final computation**: `avg = sum / count`
97+
**Merge strategy**: Sum the sums and counts, then divide
98+
99+
**Storage**: 3 columns (final + 2 IRs)
100+
101+
---
102+
103+
#### Standard Deviation (`std`, `stddev`)
104+
105+
**Stored IRs**: `count`, `sum`, `sum_of_squares`
106+
**Final computation**:
107+
```python
108+
variance = (sum_sq - sum²/count) / (count - δ)
109+
std = sqrt(variance)
110+
# δ = 1 for sample, 0 for population
111+
```
112+
113+
**Merge strategy**: Sum all three IRs, then apply formula
114+
115+
**Storage**: 4 columns (final + 3 IRs)
116+
117+
---
118+
119+
#### Variance (`var`, `variance`)
120+
121+
**Stored IRs**: `count`, `sum`, `sum_of_squares`
122+
**Final computation**: Same as std but without `sqrt()`
123+
124+
**Storage**: 4 columns (final + 3 IRs)
125+
126+
---
127+
128+
## How Tiling Works
129+
130+
Tiling is optimized for **streaming scenarios** with frequent updates (e.g., every few minutes).
131+
132+
### 1. Continuous Tile Updates
133+
134+
```
135+
Stream Events → Partition by Hop Intervals → Compute IRs → Store Windowed Aggregations
136+
| | | |
137+
| | | └─> Online Store (Redis, etc.)
138+
| | └─> avg_sum, avg_count, std_sum_sq, etc.
139+
| └─> 5-min hops: [00:00-00:05], [00:05-00:10], ...
140+
└─> customer_id=1: [txn1, txn2, txn3, ...]
141+
142+
Every 5 minutes:
143+
- New events arrive
144+
- Only 1 new tile computed (5 min of data)
145+
- 11 previous tiles reused (in memory during streaming session)
146+
- Final aggregation = merge 12 tiles (1 new + 11 reused)
147+
```
148+
149+
**Why It's Fast:**
150+
- **Without tiling:** Scan entire 1-hour window (1000+ events) every 5 minutes
151+
- **With tiling:** Only process 5 minutes of new events, reuse previous tiles
152+
- **Speedup:** Faster for streaming updates!
153+
154+
---
155+
156+
### 2. Streaming Update Efficiency
157+
158+
| Update | Without Tiling | With Tiling | Tile Reuse |
159+
|--------|---------------|-------------|------------|
160+
| T=00:00 | Compute 1hr | Compute 12 tiles | 0% reuse (initial) |
161+
| T=00:05 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse |
162+
| T=00:10 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse |
163+
| T=00:15 | Compute 1hr (1000+ events) | Compute 1 tile + reuse 11 | 92% reuse |
164+
165+
**Key Benefit:** Tiles stay in memory during the streaming session, enabling massive reuse.
166+
167+
---
168+
169+
## Tiling Algorithm
170+
171+
### Sawtooth Window Tiling
172+
173+
1. **Partition events** into hop-sized intervals (e.g., 5 minutes)
174+
2. **Compute cumulative tail aggregations** for each hop from the start of the materialization window
175+
3. **Subtract tiles** to form windowed aggregations (current_tile - previous_tile)
176+
4. **Store IRs** for correct merging of holistic aggregations
177+
5. **At materialization**, store windowed aggregations in online store
178+
179+
**Benefits**:
180+
- Efficient query-time performance (pre-computed windows)
181+
- Minimal storage overhead (only hop-sized tiles)
182+
- Mathematically correct for all aggregation types
183+
184+
---
185+
186+
## Configuration
187+
188+
### Recommended: StreamFeatureView (Streaming Scenarios)
189+
190+
Tiling provides **maximum benefit for streaming scenarios** with frequent updates:
191+
192+
```python
193+
from feast import StreamFeatureView, Aggregation
194+
from feast.data_source import PushSource, KafkaSource
195+
from datetime import timedelta
196+
197+
# Example with Kafka streaming source
198+
customer_features = StreamFeatureView(
199+
name="customer_transaction_features",
200+
entities=[customer],
201+
source=KafkaSource(
202+
name="transactions_stream",
203+
kafka_bootstrap_servers="localhost:9092",
204+
topic="transactions",
205+
timestamp_field="event_timestamp",
206+
batch_source=file_source, # For historical data
207+
),
208+
aggregations=[
209+
Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)),
210+
Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)),
211+
Aggregation(column="amount", function="std", time_window=timedelta(hours=1)),
212+
],
213+
timestamp_field="event_timestamp",
214+
online=True,
215+
216+
# Tiling configuration
217+
enable_tiling=True, # speedup for streaming!
218+
tiling_hop_size=timedelta(minutes=5), # Update frequency
219+
)
220+
```
221+
222+
**When to Enable:**
223+
- Streaming data sources (Kafka, Kinesis, PushSource)
224+
- Frequent updates (every few minutes)
225+
- Real-time feature serving
226+
- High-throughput event processing
227+
228+
---
229+
230+
### Key Parameters
231+
232+
- `aggregations`: List of time-windowed aggregations to compute
233+
- `timestamp_field`: Column name for timestamps (required when aggregations are specified)
234+
- `enable_tiling`: Enable tiling optimization (default: `False`)
235+
- Set to `True` for **streaming scenarios**
236+
- `tiling_hop_size`: Time interval between tiles (default: 5 minutes)
237+
- Smaller = more granular tiles, potentially higher memory during processing window
238+
- Larger = less granular tiles, potentially lower memory during processing window
239+
240+
### Compute Engine Requirements
241+
242+
- **Spark Compute Engine**: Fully supported for streaming and batch
243+
- **Ray Compute Engine**: Fully supported for streaming and batch
244+
- **Local Compute Engine**: Does NOT support time-windowed aggregations
245+
246+
---
247+
248+
## Architecture
249+
250+
Tiling in Feast uses a **simple, pure pandas architecture** that works with any compute engine:
251+
252+
### How It Works
253+
254+
```
255+
┌─────────────────┐
256+
│ Engine DataFrame│ (Spark/Ray/etc)
257+
└────────┬────────┘
258+
│ .toPandas() / .to_pandas()
259+
260+
┌─────────────────┐
261+
│ Pandas DataFrame│
262+
└────────┬────────┘
263+
│ orchestrator.apply_sawtooth_window_tiling()
264+
265+
┌─────────────────┐
266+
│ Cumulative │ (pandas with _tile_start, _tile_end, IRs)
267+
│ Tiles │
268+
└────────┬────────┘
269+
│ tile_subtraction.convert_cumulative_to_windowed()
270+
271+
┌─────────────────┐
272+
│ Windowed │ (pandas with final aggregations)
273+
│ Aggregations │
274+
└────────┬────────┘
275+
│ spark.createDataFrame() / ray.from_pandas()
276+
277+
┌─────────────────┐
278+
│ Engine DataFrame│
279+
└─────────────────┘
280+
```
281+
282+
283+
## Summary
284+
285+
Tiling with Intermediate Representations provides a powerful optimization for **streaming time-windowed aggregations** in Feast.

protos/feast/core/StreamFeatureView.proto

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ message StreamFeatureView {
3737
FeatureViewMeta meta = 2;
3838
}
3939

40-
// Next available id: 17
40+
// Next available id: 20
4141
message StreamFeatureViewSpec {
4242
// Name of the feature view. Must be unique. Not updated.
4343
string name = 1;
@@ -92,5 +92,12 @@ message StreamFeatureViewSpec {
9292

9393
// Oneof with {user_defined_function, on_demand_substrait_transformation}
9494
FeatureTransformationV2 feature_transformation = 17;
95+
96+
// Enable tiling for efficient window aggregation
97+
bool enable_tiling = 18;
98+
99+
// Hop size for tiling (e.g., 5 minutes). Determines the granularity of pre-aggregated tiles.
100+
// If not specified, defaults to 5 minutes. Only used when enable_tiling is true.
101+
google.protobuf.Duration tiling_hop_size = 19;
95102
}
96103

sdk/python/feast/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from feast.infra.offline_stores.redshift_source import RedshiftSource
1010
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
1111

12+
from .aggregation import Aggregation
1213
from .batch_feature_view import BatchFeatureView
1314
from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource
1415
from .dataframe import DataFrameEngine, FeastDataFrame
@@ -32,6 +33,7 @@
3233
pass
3334

3435
__all__ = [
36+
"Aggregation",
3537
"BatchFeatureView",
3638
"DataFrameEngine",
3739
"Entity",

sdk/python/feast/aggregation.py renamed to sdk/python/feast/aggregation/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
"""
2+
Aggregation module for Feast.
3+
"""
4+
15
from datetime import timedelta
26
from typing import Optional
37

@@ -91,3 +95,6 @@ def __eq__(self, other):
9195
return False
9296

9397
return True
98+
99+
100+
__all__ = ["Aggregation"]
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
Tiling for efficient time-windowed aggregations.
3+
4+
This module provides tiling algorithms and interfaces
5+
that can be implemented by any compute engine (Spark, Ray, etc.).
6+
7+
Architecture:
8+
1. Engine nodes: Convert to pandas (e.g., dataset.to_pandas(), toPandas())
9+
2. orchestrator.py: Generate cumulative tiles
10+
3. tile_subtraction.py: Convert cumulative tiles to windowed aggregations
11+
4. Engine nodes: Convert back to engine format (e.g., from_pandas(), createDataFrame())
12+
"""
13+
14+
from feast.aggregation.tiling.base import IRMetadata, get_ir_metadata_for_aggregation
15+
from feast.aggregation.tiling.orchestrator import apply_sawtooth_window_tiling
16+
from feast.aggregation.tiling.tile_subtraction import (
17+
convert_cumulative_to_windowed,
18+
deduplicate_keep_latest,
19+
)
20+
21+
__all__ = [
22+
"IRMetadata",
23+
"get_ir_metadata_for_aggregation",
24+
"apply_sawtooth_window_tiling",
25+
"convert_cumulative_to_windowed",
26+
"deduplicate_keep_latest",
27+
]

0 commit comments

Comments
 (0)