2929import six .moves .urllib .parse as urlparse
3030
3131from .auth import JWT , Auth , AccessRequest
32- from .error import UnavailableError , ClientError , RequestError , ServerError
32+ from .error import BatchError , UnavailableError , ClientError , RequestError , ServerError
3333from .version import __version__ as umapi_version
3434
3535
@@ -285,13 +285,18 @@ def execute_multiple(self, actions, immediate=True):
285285
286286 NOTE: This is where we throttle the number of commands per action. So the number
287287 of actions we were given may not be the same as the number we queue or send to the server.
288+
289+ NOTE: If the server gives us a response we don't understand, we note that and continue
290+ processing as usual. Then, at the end of the batch, we throw in order to warn the client
291+ that we had a problem understanding the server.
288292
289293 :param actions: the list of Action objects to be executed
290294 :param immediate: whether to immediately send them to the server
291295 :return: tuple: the number of actions in the queue, that got sent, and that executed successfully.
292296 """
293297 # throttling part 1: split up each action into smaller actions, as needed
294298 split_actions = []
299+ exceptions = []
295300 for a in actions :
296301 if len (a .commands ) == 0 :
297302 if self .logger : self .logger .warning ("Sending action with no commands: %s" , a .frame )
@@ -303,29 +308,24 @@ def execute_multiple(self, actions, immediate=True):
303308 split_actions .append (a )
304309 actions = self .action_queue + split_actions
305310 # throttling part 2: execute the action list in batches, as needed
306- sent = completed = last_batch_sent = last_batch_completed = 0
307- try :
308- while len (actions ) >= self .throttle_actions :
309- batch , actions = actions [0 :self .throttle_actions ], actions [self .throttle_actions :]
310- if self .logger : self .logger .debug ("Executing %d actions (%d remaining)." , len (batch ), len (actions ))
311- sent += len (batch )
312- completed += self ._execute_batch (batch )
313- finally :
314- self .action_queue = actions
315- self .local_status ["actions-queued" ] = len (actions )
316- self .local_status ["actions-sent" ] += sent
317- self .local_status ["actions-completed" ] += completed
318- # there may be actions left over
319- if actions and immediate :
311+ sent = completed = 0
312+ batch_size = self .throttle_actions
313+ min_size = 1 if immediate else batch_size
314+ while len (actions ) >= min_size :
315+ batch , actions = actions [0 :batch_size ], actions [batch_size :]
316+ if self .logger : self .logger .debug ("Executing %d actions (%d remaining)." , len (batch ), len (actions ))
317+ sent += len (batch )
320318 try :
321- last_batch_sent = len (actions )
322- last_batch_completed += self ._execute_batch (actions )
323- finally :
324- self .action_queue = []
325- self .local_status ["actions-queued" ] = 0
326- self .local_status ["actions-sent" ] += last_batch_sent
327- self .local_status ["actions-completed" ] += last_batch_completed
328- return len (self .action_queue ), sent + last_batch_sent , completed + last_batch_completed
319+ completed += self ._execute_batch (batch )
320+ except Exception as e :
321+ exceptions .append (e )
322+ self .action_queue = actions
323+ self .local_status ["actions-queued" ] = queued = len (actions )
324+ self .local_status ["actions-sent" ] += sent
325+ self .local_status ["actions-completed" ] += completed
326+ if exceptions :
327+ raise BatchError (exceptions , queued , sent , completed )
328+ return queued , sent , completed
329329
330330 def _execute_batch (self , actions ):
331331 """
0 commit comments