1+ # Copyright 2025 The Feast Authors
2+ #
3+ # Licensed under the Apache License, Version 2.0 (the "License");
4+ # you may not use this file except in compliance with the License.
5+ # You may obtain a copy of the License at
6+ #
7+ # https://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS,
11+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+ # See the License for the specific language governing permissions and
13+ # limitations under the License.
14+
115import asyncio
216import os
317import sys
418import threading
519import time
620import traceback
21+ from collections import defaultdict
722from contextlib import asynccontextmanager
823from datetime import datetime
924from importlib import resources as importlib_resources
10- from typing import Any , Dict , List , Optional , Union
25+ from types import SimpleNamespace
26+ from typing import Any , DefaultDict , Dict , List , NamedTuple , Optional , Union
1127
1228import pandas as pd
1329import psutil
@@ -195,6 +211,43 @@ def get_app(
195211 registry_proto = None
196212 shutting_down = False
197213 active_timer : Optional [threading .Timer ] = None
214+ # --- Offline write batching config and batcher ---
215+ fs_cfg = getattr (store .config , "feature_server" , None )
216+ batching_cfg = None
217+ if fs_cfg is not None :
218+ enabled = getattr (fs_cfg , "offline_push_batching_enabled" , False )
219+ batch_size = getattr (fs_cfg , "offline_push_batching_batch_size" , None )
220+ batch_interval_seconds = getattr (
221+ fs_cfg , "offline_push_batching_batch_interval_seconds" , None
222+ )
223+
224+ if enabled is True :
225+ size_ok = isinstance (batch_size , int ) and not isinstance (
226+ batch_size , bool
227+ )
228+ interval_ok = isinstance (batch_interval_seconds , int ) and not isinstance (
229+ batch_interval_seconds , bool
230+ )
231+ if size_ok and interval_ok :
232+ batching_cfg = SimpleNamespace (
233+ enabled = True ,
234+ batch_size = batch_size ,
235+ batch_interval_seconds = batch_interval_seconds ,
236+ )
237+ else :
238+ logger .warning (
239+ "Offline write batching enabled but missing or invalid numeric values; "
240+ "disabling batching (batch_size=%r, batch_interval_seconds=%r)" ,
241+ batch_size ,
242+ batch_interval_seconds ,
243+ )
244+
245+ offline_batcher : Optional [OfflineWriteBatcher ] = None
246+ if batching_cfg is not None and batching_cfg .enabled is True :
247+ offline_batcher = OfflineWriteBatcher (store = store , cfg = batching_cfg )
248+ logger .debug ("Offline write batching is ENABLED" )
249+ else :
250+ logger .debug ("Offline write batching is DISABLED" )
198251
199252 def stop_refresh ():
200253 nonlocal shutting_down
@@ -219,9 +272,13 @@ def async_refresh():
219272 async def lifespan (app : FastAPI ):
220273 await store .initialize ()
221274 async_refresh ()
222- yield
223- stop_refresh ()
224- await store .close ()
275+ try :
276+ yield
277+ finally :
278+ stop_refresh ()
279+ if offline_batcher is not None :
280+ offline_batcher .shutdown ()
281+ await store .close ()
225282
226283 app = FastAPI (lifespan = lifespan )
227284
@@ -326,22 +383,48 @@ async def push(request: PushFeaturesRequest) -> None:
326383 for feature_view in fvs_with_push_sources :
327384 assert_permissions (resource = feature_view , actions = actions )
328385
329- push_params = dict (
330- push_source_name = request .push_source_name ,
331- df = df ,
332- allow_registry_cache = request .allow_registry_cache ,
333- to = to ,
334- transform_on_write = request .transform_on_write ,
335- )
386+ async def _push_with_to (push_to : PushMode ) -> None :
387+ """
388+ Helper for performing a single push operation.
389+
390+ NOTE:
391+ - Feast providers **do not currently support async offline writes**.
392+ - Therefore:
393+ * ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write
394+ * OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers.
395+ - The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only.
396+ """
397+ push_params = dict (
398+ push_source_name = request .push_source_name ,
399+ df = df ,
400+ allow_registry_cache = request .allow_registry_cache ,
401+ to = push_to ,
402+ transform_on_write = request .transform_on_write ,
403+ )
336404
337- should_push_async = (
338- store ._get_provider ().async_supported .online .write
339- and to in [PushMode .ONLINE , PushMode .ONLINE_AND_OFFLINE ]
340- )
341- if should_push_async :
342- await store .push_async (** push_params )
405+ # Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store
406+ if push_to in (PushMode .ONLINE , PushMode .ONLINE_AND_OFFLINE ) and (
407+ store ._get_provider ().async_supported .online .write
408+ ):
409+ await store .push_async (** push_params )
410+ else :
411+ await run_in_threadpool (lambda : store .push (** push_params ))
412+
413+ needs_online = to in (PushMode .ONLINE , PushMode .ONLINE_AND_OFFLINE )
414+ needs_offline = to in (PushMode .OFFLINE , PushMode .ONLINE_AND_OFFLINE )
415+
416+ if offline_batcher is None or not needs_offline :
417+ await _push_with_to (to )
343418 else :
344- store .push (** push_params )
419+ if needs_online :
420+ await _push_with_to (PushMode .ONLINE )
421+
422+ offline_batcher .enqueue (
423+ push_source_name = request .push_source_name ,
424+ df = df ,
425+ allow_registry_cache = request .allow_registry_cache ,
426+ transform_on_write = request .transform_on_write ,
427+ )
345428
346429 async def _get_feast_object (
347430 feature_view_name : str , allow_registry_cache : bool
@@ -683,3 +766,170 @@ def start_server(
683766 )
684767 else :
685768 uvicorn .run (app , host = host , port = port , access_log = (not no_access_log ))
769+
770+
771+ class _OfflineBatchKey (NamedTuple ):
772+ push_source_name : str
773+ allow_registry_cache : bool
774+ transform_on_write : bool
775+
776+
777+ class OfflineWriteBatcher :
778+ """
779+ In-process offline write batcher for /push requests.
780+
781+ - Buffers DataFrames per (push_source_name, allow_registry_cache, transform_on_write)
782+ - Flushes when either:
783+ * total rows in a buffer >= batch_size, or
784+ * time since last flush >= batch_interval_seconds
785+ - Flush runs in a dedicated background thread so the HTTP event loop stays unblocked.
786+ """
787+
788+ def __init__ (self , store : "feast.FeatureStore" , cfg : Any ):
789+ self ._store = store
790+ self ._cfg = cfg
791+
792+ # Buffers and timestamps keyed by batch key
793+ self ._buffers : DefaultDict [_OfflineBatchKey , List [pd .DataFrame ]] = defaultdict (
794+ list
795+ )
796+ self ._last_flush : DefaultDict [_OfflineBatchKey , float ] = defaultdict (time .time )
797+
798+ self ._lock = threading .Lock ()
799+ self ._stop_event = threading .Event ()
800+
801+ # Start background flusher thread
802+ self ._thread = threading .Thread (
803+ target = self ._run , name = "offline_write_batcher" , daemon = True
804+ )
805+ self ._thread .start ()
806+
807+ logger .debug (
808+ "OfflineWriteBatcher initialized: batch_size=%s, batch_interval_seconds=%s" ,
809+ getattr (cfg , "batch_size" , None ),
810+ getattr (cfg , "batch_interval_seconds" , None ),
811+ )
812+
813+ # ---------- Public API ----------
814+
815+ def enqueue (
816+ self ,
817+ push_source_name : str ,
818+ df : pd .DataFrame ,
819+ allow_registry_cache : bool ,
820+ transform_on_write : bool ,
821+ ) -> None :
822+ """
823+ Enqueue a dataframe for offline write, grouped by push source + flags.
824+ Cheap and non-blocking; heavy I/O happens in background thread.
825+ """
826+ key = _OfflineBatchKey (
827+ push_source_name = push_source_name ,
828+ allow_registry_cache = allow_registry_cache ,
829+ transform_on_write = transform_on_write ,
830+ )
831+
832+ with self ._lock :
833+ self ._buffers [key ].append (df )
834+ total_rows = sum (len (d ) for d in self ._buffers [key ])
835+
836+ # Size-based flush
837+ if total_rows >= self ._cfg .batch_size :
838+ logger .debug (
839+ "OfflineWriteBatcher size threshold reached for %s: %s rows" ,
840+ key ,
841+ total_rows ,
842+ )
843+ self ._flush_locked (key )
844+
845+ def flush_all (self ) -> None :
846+ """
847+ Flush all buffers synchronously. Intended for graceful shutdown.
848+ """
849+ with self ._lock :
850+ keys = list (self ._buffers .keys ())
851+ for key in keys :
852+ self ._flush_locked (key )
853+
854+ def shutdown (self , timeout : float = 5.0 ) -> None :
855+ """
856+ Stop the background thread and perform a best-effort flush.
857+ """
858+ logger .debug ("Shutting down OfflineWriteBatcher" )
859+ self ._stop_event .set ()
860+ try :
861+ self ._thread .join (timeout = timeout )
862+ except Exception :
863+ logger .exception ("Error joining OfflineWriteBatcher thread" )
864+
865+ # Best-effort final flush
866+ try :
867+ self .flush_all ()
868+ except Exception :
869+ logger .exception ("Error during final OfflineWriteBatcher flush" )
870+
871+ # ---------- Internal helpers ----------
872+
873+ def _run (self ) -> None :
874+ """
875+ Background loop: periodically checks for buffers that should be flushed
876+ based on time since last flush.
877+ """
878+ interval = max (1 , int (getattr (self ._cfg , "batch_interval_seconds" , 30 )))
879+ logger .debug (
880+ "OfflineWriteBatcher background loop started with check interval=%s" ,
881+ interval ,
882+ )
883+
884+ while not self ._stop_event .wait (timeout = interval ):
885+ now = time .time ()
886+ try :
887+ with self ._lock :
888+ for key , dfs in list (self ._buffers .items ()):
889+ if not dfs :
890+ continue
891+ last = self ._last_flush [
892+ key
893+ ] # this will also init the default timestamp
894+ age = now - last
895+ if age >= self ._cfg .batch_interval_seconds :
896+ logger .debug (
897+ "OfflineWriteBatcher time threshold reached for %s: age=%s" ,
898+ key ,
899+ age ,
900+ )
901+ self ._flush_locked (key )
902+ except Exception :
903+ logger .exception ("Error in OfflineWriteBatcher background loop" )
904+
905+ logger .debug ("OfflineWriteBatcher background loop exiting" )
906+
907+ def _flush_locked (self , key : _OfflineBatchKey ) -> None :
908+ """
909+ Flush a single buffer; caller must hold self._lock.
910+ """
911+ dfs = self ._buffers .get (key )
912+ if not dfs :
913+ return
914+
915+ batch_df = pd .concat (dfs , ignore_index = True )
916+ self ._buffers [key ].clear ()
917+ self ._last_flush [key ] = time .time ()
918+
919+ logger .debug (
920+ "Flushing offline batch for push_source=%s with %s rows" ,
921+ key .push_source_name ,
922+ len (batch_df ),
923+ )
924+
925+ # NOTE: offline writes are currently synchronous only, so we call directly
926+ try :
927+ self ._store .push (
928+ push_source_name = key .push_source_name ,
929+ df = batch_df ,
930+ allow_registry_cache = key .allow_registry_cache ,
931+ to = PushMode .OFFLINE ,
932+ transform_on_write = key .transform_on_write ,
933+ )
934+ except Exception :
935+ logger .exception ("Error flushing offline batch for %s" , key )
0 commit comments