@@ -105,11 +105,16 @@ def _cleanup_paths(self):
105105 # ToDo: if stat != 0 then report error to API?
106106
107107 def _setup (self , args ):
108- return self ._api .subscribe ('node' )
109-
110- def _stop (self , sub_id ):
111- if sub_id :
112- self ._api_helper .unsubscribe_filters (sub_id )
108+ node_sub_id = self ._api .subscribe ('node' )
109+ self .log .debug (f"Node channel sub id: { node_sub_id } " )
110+ retry_sub_id = self ._api .subscribe ('retry' )
111+ self .log .debug (f"Retry channel sub id: { retry_sub_id } " )
112+ return [node_sub_id , retry_sub_id ]
113+
114+ def _stop (self , sub_ids ):
115+ for sub_id in sub_ids :
116+ if sub_id :
117+ self ._api_helper .unsubscribe_filters (sub_id )
113118 self ._cleanup_paths ()
114119
115120 def backup_cleanup (self ):
@@ -144,11 +149,11 @@ def backup_job(self, filename, nodeid):
144149 except Exception as e :
145150 self .log .error (f"Failed to backup { filename } to { new_filename } : { e } " )
146151
147- def _run_job (self , job_config , runtime , platform , input_node ):
152+ def _run_job (self , job_config , runtime , platform , input_node , retry_counter ):
148153 try :
149154 node = self ._api_helper .create_job_node (job_config ,
150155 input_node ,
151- runtime , platform )
156+ runtime , platform , retry_counter )
152157 except KeyError as e :
153158 self .log .error (' ' .join ([
154159 input_node ['id' ],
@@ -162,6 +167,7 @@ def _run_job(self, job_config, runtime, platform, input_node):
162167
163168 if not node :
164169 return
170+ self .log .debug (f"Job node created: { node ['id' ]} . Parent: { node ['parent' ]} " )
165171 # Most of the time, the artifacts we need originate from the parent
166172 # node. Import those into the current node, working on a copy so the
167173 # original node doesn't get "polluted" with useless artifacts when we
@@ -371,43 +377,57 @@ def _verify_architecture_filter(self, job, node):
371377 return False
372378 return True
373379
374- def _run (self , sub_id ):
380+ def _run (self , sub_ids ):
381+ threads = []
382+ for sub_id in sub_ids :
383+ thread = threading .Thread (target = self ._run_scheduler , args = (sub_id ,))
384+ threads .append (thread )
385+ thread .start ()
386+
387+ for thread in threads :
388+ thread .join ()
389+
390+ def _run_scheduler (self , sub_id ):
375391 self .log .info ("Listening for available checkout events" )
376392 self .log .info ("Press Ctrl-C to stop." )
377- subscribe_retries = 0
393+ # subscribe_retries = 0
378394
379395 while True :
380396 last_heartbeat ['time' ] = time .time ()
381397 event = None
382398 try :
383399 event = self ._api_helper .receive_event_data (sub_id , block = False )
400+ if not event :
401+ # If we received a keep-alive event, just continue
402+ continue
384403 except Exception as e :
385- self .log .error (f"Error receiving event: { e } , re-subscribing in 10 seconds" )
386- time .sleep (10 )
387- sub_id = self ._api .subscribe ('node' )
388- subscribe_retries += 1
389- if subscribe_retries > 3 :
390- self .log .error ("Failed to re-subscribe to node events" )
391- return False
392- continue
393- if not event :
394- # If we received a keep-alive event, just continue
404+ self .log .error (f"Error receiving event: { e } " )
405+ # time.sleep(10)
406+ # sub_id = self._api.subscribe('node')
407+ # subscribe_retries += 1
408+ # if subscribe_retries > 3:
409+ # self.log.error("Failed to re-subscribe to node events")
410+ # return False
395411 continue
396- subscribe_retries = 0
412+ # subscribe_retries = 0
397413 for job , runtime , platform , rules in self ._sched .get_schedule (event ):
398414 input_node = self ._api .node .get (event ['id' ])
399415 jobfilter = event .get ('jobfilter' )
400416 # Add to node data the jobfilter if it exists in event
401417 if jobfilter and isinstance (jobfilter , list ):
402418 input_node ['jobfilter' ] = jobfilter
419+ platform_filter = event .get ('platform_filter' )
420+ if platform_filter and isinstance (platform_filter , list ):
421+ input_node ['platform_filter' ] = platform_filter
403422 # we cannot use rules, as we need to have info about job too
404423 if job .params .get ('frequency' , None ):
405424 if not self ._verify_frequency (job , input_node , platform ):
406425 continue
407426 if not self ._verify_architecture_filter (job , input_node ):
408427 continue
409428 if self ._api_helper .should_create_node (rules , input_node ):
410- self ._run_job (job , runtime , platform , input_node )
429+ retry_counter = event .get ('retry_counter' , 0 )
430+ self ._run_job (job , runtime , platform , input_node , retry_counter )
411431
412432 return True
413433
0 commit comments