Skip to content

Commit 494912d

Browse files
committed
feat: added batching to feature server /push to offline store (#5683)
Signed-off-by: Jacob Weinhold <[email protected]>
1 parent 6f4145d commit 494912d

File tree

1 file changed

+21
-14
lines changed

1 file changed

+21
-14
lines changed

sdk/python/feast/feature_server.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -215,28 +215,35 @@ def get_app(
215215
fs_cfg = getattr(store.config, "feature_server", None)
216216
batching_cfg = None
217217
if fs_cfg is not None:
218-
enabled = getattr(fs_cfg, "offline_push_batching_enabled", None)
218+
enabled = getattr(fs_cfg, "offline_push_batching_enabled", False)
219219
batch_size = getattr(fs_cfg, "offline_push_batching_batch_size", None)
220220
batch_interval_seconds = getattr(
221221
fs_cfg, "offline_push_batching_batch_interval_seconds", None
222222
)
223223

224-
if (
225-
enabled is not None
226-
or batch_size is not None
227-
or batch_interval_seconds is not None
228-
):
229-
batching_cfg = SimpleNamespace(
230-
enabled=bool(enabled) if enabled is not None else False,
231-
batch_size=batch_size if batch_size is not None else 100,
232-
batch_interval_seconds=batch_interval_seconds
233-
if batch_interval_seconds is not None
234-
else 1,
224+
if enabled is True:
225+
size_ok = isinstance(batch_size, int) and not isinstance(
226+
batch_size, bool
227+
)
228+
interval_ok = isinstance(batch_interval_seconds, int) and not isinstance(
229+
batch_interval_seconds, bool
235230
)
231+
if size_ok and interval_ok:
232+
batching_cfg = SimpleNamespace(
233+
enabled=True,
234+
batch_size=batch_size,
235+
batch_interval_seconds=batch_interval_seconds,
236+
)
237+
else:
238+
logger.warning(
239+
"Offline write batching enabled but missing or invalid numeric values; "
240+
"disabling batching (batch_size=%r, batch_interval_seconds=%r)",
241+
batch_size,
242+
batch_interval_seconds,
243+
)
236244

237245
offline_batcher: Optional[OfflineWriteBatcher] = None
238-
enabled = getattr(batching_cfg, "enabled", False)
239-
if batching_cfg is not None and isinstance(enabled, bool) and enabled is True:
246+
if batching_cfg is not None and batching_cfg.enabled is True:
240247
offline_batcher = OfflineWriteBatcher(store=store, cfg=batching_cfg)
241248
logger.debug("Offline write batching is ENABLED")
242249
else:

0 commit comments

Comments
 (0)