@@ -85,6 +85,7 @@ def __init__(self, configs, args):
8585 self ._storage_config , storage_cred
8686 )
8787 self ._job_tmp_dirs = {}
88+ self ._api_helper_lock = threading .Lock ()
8889
8990 def _get_runtimes_configs (self , configs , runtimes ):
9091 runtimes_configs = {}
@@ -105,11 +106,16 @@ def _cleanup_paths(self):
105106 # ToDo: if stat != 0 then report error to API?
106107
107108 def _setup (self , args ):
108- return self ._api .subscribe ('node' )
109-
110- def _stop (self , sub_id ):
111- if sub_id :
112- self ._api_helper .unsubscribe_filters (sub_id )
109+ node_sub_id = self ._api .subscribe ('node' )
110+ self .log .debug (f"Node channel sub id: { node_sub_id } " )
111+ retry_sub_id = self ._api .subscribe ('retry' )
112+ self .log .debug (f"Retry channel sub id: { retry_sub_id } " )
113+ return [node_sub_id , retry_sub_id ]
114+
115+ def _stop (self , sub_ids ):
116+ for sub_id in sub_ids :
117+ if sub_id :
118+ self ._api_helper .unsubscribe_filters (sub_id )
113119 self ._cleanup_paths ()
114120
115121 def backup_cleanup (self ):
@@ -144,11 +150,11 @@ def backup_job(self, filename, nodeid):
144150 except Exception as e :
145151 self .log .error (f"Failed to backup { filename } to { new_filename } : { e } " )
146152
147- def _run_job (self , job_config , runtime , platform , input_node ):
153+ def _run_job (self , job_config , runtime , platform , input_node , retry_counter ):
148154 try :
149155 node = self ._api_helper .create_job_node (job_config ,
150156 input_node ,
151- runtime , platform )
157+ runtime , platform , retry_counter )
152158 except KeyError as e :
153159 self .log .error (' ' .join ([
154160 input_node ['id' ],
@@ -162,6 +168,7 @@ def _run_job(self, job_config, runtime, platform, input_node):
162168
163169 if not node :
164170 return
171+ self .log .debug (f"Job node created: { node ['id' ]} . Parent: { node ['parent' ]} " )
165172 # Most of the time, the artifacts we need originate from the parent
166173 # node. Import those into the current node, working on a copy so the
167174 # original node doesn't get "polluted" with useless artifacts when we
@@ -371,43 +378,59 @@ def _verify_architecture_filter(self, job, node):
371378 return False
372379 return True
373380
374- def _run (self , sub_id ):
381+ def _run (self , sub_ids ):
382+ threads = []
383+ for sub_id in sub_ids :
384+ thread = threading .Thread (target = self ._run_scheduler , args = (sub_id ,))
385+ threads .append (thread )
386+ thread .start ()
387+
388+ for thread in threads :
389+ thread .join ()
390+
391+ def _run_scheduler (self , sub_id ):
375392 self .log .info ("Listening for available checkout events" )
376393 self .log .info ("Press Ctrl-C to stop." )
377- subscribe_retries = 0
394+ # subscribe_retries = 0
378395
379396 while True :
380397 last_heartbeat ['time' ] = time .time ()
381398 event = None
382399 try :
383400 event = self ._api_helper .receive_event_data (sub_id , block = False )
401+ if not event :
402+ # If we received a keep-alive event, just continue
403+ continue
384404 except Exception as e :
385- self .log .error (f"Error receiving event: { e } , re-subscribing in 10 seconds" )
386- time .sleep (10 )
387- sub_id = self ._api .subscribe ('node' )
388- subscribe_retries += 1
389- if subscribe_retries > 3 :
390- self .log .error ("Failed to re-subscribe to node events" )
391- return False
392- continue
393- if not event :
394- # If we received a keep-alive event, just continue
405+ self .log .error (f"Error receiving event: { e } " )
406+ # time.sleep(10)
407+ # sub_id = self._api.subscribe('node')
408+ # subscribe_retries += 1
409+ # if subscribe_retries > 3:
410+ # self.log.error("Failed to re-subscribe to node events")
411+ # return False
395412 continue
396- subscribe_retries = 0
413+ # subscribe_retries = 0
397414 for job , runtime , platform , rules in self ._sched .get_schedule (event ):
398415 input_node = self ._api .node .get (event ['id' ])
399416 jobfilter = event .get ('jobfilter' )
400417 # Add to node data the jobfilter if it exists in event
401418 if jobfilter and isinstance (jobfilter , list ):
402419 input_node ['jobfilter' ] = jobfilter
420+ platform_filter = event .get ('platform_filter' )
421+ if platform_filter and isinstance (platform_filter , list ):
422+ input_node ['platform_filter' ] = platform_filter
403423 # we cannot use rules, as we need to have info about job too
404424 if job .params .get ('frequency' , None ):
405425 if not self ._verify_frequency (job , input_node , platform ):
406426 continue
407427 if not self ._verify_architecture_filter (job , input_node ):
408428 continue
409- if self ._api_helper .should_create_node (rules , input_node ):
410- self ._run_job (job , runtime , platform , input_node )
429+ with self ._api_helper_lock :
430+ flag = self ._api_helper .should_create_node (rules , input_node )
431+ if flag :
432+ retry_counter = event .get ('retry_counter' , 0 )
433+ self ._run_job (job , runtime , platform , input_node , retry_counter )
411434
412435 return True
413436
0 commit comments