@@ -37,7 +37,7 @@ class RecvError(Exception):
3737
3838class Collector (salt .utils .process .SignalHandlingProcess ):
3939 def __init__ (
40- self , minion_config , interface , port , aes_key , timeout = 30 , zmq_filtering = False
40+ self , minion_config , interface , port , aes_key , timeout = 300 , zmq_filtering = False
4141 ):
4242 super ().__init__ ()
4343 self .minion_config = minion_config
@@ -87,9 +87,9 @@ def _setup_listener(self):
8787 pub_uri = "tcp://{}:{}" .format (self .interface , self .port )
8888 self .sock .connect (pub_uri )
8989 else :
90- sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
91- end = time .time () + 60
90+ end = time .time () + 300
9291 while True :
92+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
9393 try :
9494 sock .connect ((self .interface , self .port ))
9595 except ConnectionRefusedError :
@@ -102,6 +102,7 @@ def _setup_listener(self):
102102
103103 @salt .ext .tornado .gen .coroutine
104104 def _recv (self ):
105+ exc = None
105106 if self .transport == "zeromq" :
106107 # test_zeromq_filtering requires catching the
107108 # SaltDeserializationError in order to pass.
@@ -110,7 +111,7 @@ def _recv(self):
110111 serial_payload = salt .payload .Serial ({}).loads (payload )
111112 raise salt .ext .tornado .gen .Return (serial_payload )
112113 except (zmq .ZMQError , salt .exceptions .SaltDeserializationError ):
113- raise RecvError ("ZMQ Error" )
114+ exc = RecvError ("ZMQ Error" )
114115 else :
115116 for msg in self .unpacker :
116117 serial_payload = salt .payload .Serial ({}).loads (msg ["body" ])
@@ -120,19 +121,27 @@ def _recv(self):
120121 for msg in self .unpacker :
121122 serial_payload = salt .payload .Serial ({}).loads (msg ["body" ])
122123 raise salt .ext .tornado .gen .Return (serial_payload )
123- raise RecvError ("TCP Error" )
124+ exc = RecvError ("TCP Error" )
125+ raise exc
124126
125127 @salt .ext .tornado .gen .coroutine
126128 def _run (self , loop ):
127- self ._setup_listener ()
129+ try :
130+ self ._setup_listener ()
131+ except Exception : # pylint: disable=broad-except
132+ self .started .set ()
133+ log .exception ("Failed to start listening" )
134+ return
135+ self .started .set ()
128136 last_msg = time .time ()
129137 crypticle = salt .crypt .Crypticle (self .minion_config , self .aes_key )
130- self .started .set ()
131138 while True :
132139 curr_time = time .time ()
133140 if time .time () > self .hard_timeout :
141+ log .error ("Hard timeout reaced in test collector!" )
134142 break
135143 if curr_time - last_msg >= self .timeout :
144+ log .error ("Receive timeout reaced in test collector!" )
136145 break
137146 try :
138147 payload = yield self ._recv ()
@@ -144,13 +153,16 @@ def _run(self, loop):
144153 if not payload :
145154 continue
146155 if "start" in payload :
156+ log .info ("Collector started" )
147157 self .running .set ()
148158 continue
149159 if "stop" in payload :
160+ log .info ("Collector stopped" )
150161 break
151162 last_msg = time .time ()
152163 self .results .append (payload ["jid" ])
153164 except salt .exceptions .SaltDeserializationError :
165+ log .error ("Deserializer Error" )
154166 if not self .zmq_filtering :
155167 log .exception ("Failed to deserialize..." )
156168 break
@@ -261,7 +273,7 @@ def publish(self, payload):
261273 def __enter__ (self ):
262274 self .start ()
263275 self .collector .__enter__ ()
264- attempts = 60
276+ attempts = 300
265277 while attempts > 0 :
266278 self .publish ({"tgt_type" : "glob" , "tgt" : "*" , "jid" : - 1 , "start" : True })
267279 if self .collector .running .wait (1 ) is True :
@@ -279,12 +291,14 @@ def __exit__(self, *args):
279291 # We can safely wait here without a timeout because the Collector instance has a
280292 # hard timeout set, so eventually Collector.stopped will be set
281293 self .collector .stopped .wait ()
294+ self .collector .join ()
282295 # Stop our own processing
283296 self .queue .put (None )
284297 # Wait at most 10 secs for the above `None` in the queue to be processed
285298 self .stopped .wait (10 )
286299 self .close ()
287300 self .terminate ()
301+ self .join ()
288302 log .info ("The PubServerChannelProcess has terminated" )
289303
290304
0 commit comments