1818from __future__ import absolute_import
1919
2020import logging
21+ import random
2122import uuid
2223from concurrent .futures import ThreadPoolExecutor
2324from math import floor
@@ -55,9 +56,8 @@ class AsyncWrapper(beam.DoFn):
5556 TIMER_SET = ReadModifyWriteStateSpec ('timer_set' , coders .BooleanCoder ())
5657 TO_PROCESS = BagStateSpec (
5758 'to_process' ,
58- coders .TupleCoder ([coders .StrUtf8Coder (), coders .StrUtf8Coder ()]),
59- )
60- _timer_frequency = 20
59+ coders .TupleCoder (
60+ [coders .FastPrimitivesCoder (), coders .FastPrimitivesCoder ()]))
6161 # The below items are one per dofn (not instance) so are maps of UUID to
6262 # value.
6363 _processing_elements = {}
@@ -75,7 +75,8 @@ def __init__(
7575 parallelism = 1 ,
7676 callback_frequency = 5 ,
7777 max_items_to_buffer = None ,
78- max_wait_time = 120 ,
78+ timeout = 1 ,
79+ max_wait_time = 0.5 ,
7980 ):
8081 """Wraps the sync_fn to create an asynchronous version.
8182
@@ -96,14 +97,17 @@ def __init__(
9697 max_items_to_buffer: We should ideally buffer enough to always be busy but
9798 not so much that the worker ooms. By default will be 2x the parallelism
9899 which should be good for most pipelines.
99- max_wait_time: The maximum amount of time an item should wait to be added
100- to the buffer. Used for testing to ensure timeouts are met.
100+ timeout: The maximum amount of time an item should try to be scheduled
101+ locally before it goes in the queue of waiting work.
102+ max_wait_time: The maximum amount of sleep time while attempting to
103+ schedule an item. Used in testing to ensure timeouts are met.
101104 """
102105 self ._sync_fn = sync_fn
103106 self ._uuid = uuid .uuid4 ().hex
104107 self ._parallelism = parallelism
108+ self ._timeout = timeout
105109 self ._max_wait_time = max_wait_time
106- self ._timer_frequency = 20
110+ self ._timer_frequency = callback_frequency
107111 if max_items_to_buffer is None :
108112 self ._max_items_to_buffer = max (parallelism * 2 , 10 )
109113 else :
@@ -112,9 +116,6 @@ def __init__(
112116 AsyncWrapper ._processing_elements [self ._uuid ] = {}
113117 AsyncWrapper ._items_in_buffer [self ._uuid ] = 0
114118 self .max_wait_time = max_wait_time
115- self .timer_frequency_ = callback_frequency
116- self .parallelism_ = parallelism
117- self ._next_time_to_fire = Timestamp .now () + Duration (seconds = 5 )
118119 self ._shared_handle = Shared ()
119120
120121 @staticmethod
@@ -238,9 +239,9 @@ def schedule_item(self, element, ignore_buffer=False, *args, **kwargs):
238239 **kwargs: keyword arguments that the wrapped dofn requires.
239240 """
240241 done = False
241- sleep_time = 1
242+ sleep_time = 0.01
242243 total_sleep = 0
243- while not done :
244+ while not done and total_sleep < self . _timeout :
244245 done = self .schedule_if_room (element , ignore_buffer , * args , ** kwargs )
245246 if not done :
246247 sleep_time = min (self .max_wait_time , sleep_time * 2 )
@@ -256,10 +257,12 @@ def schedule_item(self, element, ignore_buffer=False, *args, **kwargs):
256257 total_sleep += sleep_time
257258 sleep (sleep_time )
258259
259- def next_time_to_fire (self ):
260+ def next_time_to_fire (self , key ):
261+ random .seed (key )
260262 return (
261263 floor ((time () + self ._timer_frequency ) / self ._timer_frequency ) *
262- self ._timer_frequency )
264+ self ._timer_frequency ) + (
265+ random .random () * self ._timer_frequency )
263266
264267 def accepting_items (self ):
265268 with AsyncWrapper ._lock :
@@ -301,7 +304,7 @@ def process(
301304 # Set a timer to fire on the next round increment of timer_frequency_. Note
302305 # we do this so that each messages timer doesn't get overwritten by the
303306 # next.
304- time_to_fire = self .next_time_to_fire ()
307+ time_to_fire = self .next_time_to_fire (element [ 0 ] )
305308 timer .set (time_to_fire )
306309
307310 # Don't output any elements. This will be done in commit_finished_items.
@@ -346,6 +349,7 @@ def commit_finished_items(
346349 # from local state and cancel their futures.
347350 to_remove = []
348351 key = None
352+ to_reschedule = []
349353 if to_process_local :
350354 key = str (to_process_local [0 ][0 ])
351355 else :
@@ -387,9 +391,13 @@ def commit_finished_items(
387391 'item %s found in processing state but not local state,'
388392 ' scheduling now' ,
389393 x )
390- self . schedule_item ( x , ignore_buffer = True )
394+ to_reschedule . append ( x )
391395 items_rescheduled += 1
392396
397+ # Reschedule the items not under a lock
398+ for x in to_reschedule :
399+ self .schedule_item (x , ignore_buffer = False )
400+
393401 # Update processing state to remove elements we've finished
394402 to_process .clear ()
395403 for x in to_process_local :
@@ -408,8 +416,8 @@ def commit_finished_items(
408416 # If there are items not yet finished then set a timer to fire in the
409417 # future.
410418 self ._next_time_to_fire = Timestamp .now () + Duration (seconds = 5 )
411- if items_not_yet_finished > 0 :
412- time_to_fire = self .next_time_to_fire ()
419+ if items_in_processing_state > 0 :
420+ time_to_fire = self .next_time_to_fire (key )
413421 timer .set (time_to_fire )
414422
415423 # Each result is a list. We want to combine them into a single
0 commit comments