22import logging
33import multiprocessing
44import signal
5+ import socket
56import time
67from concurrent .futures .thread import ThreadPoolExecutor
78
1415import salt .master
1516import salt .transport .client
1617import salt .transport .server
18+ import salt .transport .tcp
1719import salt .transport .zeromq
20+ import salt .utils .msgpack
1821import salt .utils .platform
1922import salt .utils .process
2023import salt .utils .stringutils
2528log = logging .getLogger (__name__ )
2629
2730
31+ class RecvError (Exception ):
32+ """
33+ Raised by the Collector's _recv method when there is a problem
34+ getting publishes from to the publisher.
35+ """
36+
37+
2838class Collector (salt .utils .process .SignalHandlingProcess ):
2939 def __init__ (
30- self , minion_config , pub_uri , aes_key , timeout = 30 , zmq_filtering = False
40+ self , minion_config , interface , port , aes_key , timeout = 300 , zmq_filtering = False
3141 ):
3242 super ().__init__ ()
3343 self .minion_config = minion_config
34- self .pub_uri = pub_uri
44+ self .interface = interface
45+ self .port = port
3546 self .aes_key = aes_key
3647 self .timeout = timeout
3748 self .hard_timeout = time .time () + timeout + 30
@@ -41,6 +52,16 @@ def __init__(
4152 self .stopped = multiprocessing .Event ()
4253 self .started = multiprocessing .Event ()
4354 self .running = multiprocessing .Event ()
55+ if salt .utils .msgpack .version >= (0 , 5 , 2 ):
56+ # Under Py2 we still want raw to be set to True
57+ msgpack_kwargs = {"raw" : False }
58+ else :
59+ msgpack_kwargs = {"encoding" : "utf-8" }
60+ self .unpacker = salt .utils .msgpack .Unpacker (** msgpack_kwargs )
61+
62+ @property
63+ def transport (self ):
64+ return self .minion_config ["transport" ]
4465
4566 def _rotate_secrets (self , now = None ):
4667 salt .master .SMaster .secrets ["aes" ] = {
@@ -57,47 +78,104 @@ def _rotate_secrets(self, now=None):
5778 "rotate_master_key" : self ._rotate_secrets ,
5879 }
5980
60- def run (self ):
61- """
62- Gather results until then number of seconds specified by timeout passes
63- without receiving a message
64- """
65- ctx = zmq .Context ()
66- sock = ctx .socket (zmq .SUB )
67- sock .setsockopt (zmq .LINGER , - 1 )
68- sock .setsockopt (zmq .SUBSCRIBE , b"" )
69- sock .connect (self .pub_uri )
81+ def _setup_listener (self ):
82+ if self .transport == "zeromq" :
83+ ctx = zmq .Context ()
84+ self .sock = ctx .socket (zmq .SUB )
85+ self .sock .setsockopt (zmq .LINGER , - 1 )
86+ self .sock .setsockopt (zmq .SUBSCRIBE , b"" )
87+ pub_uri = "tcp://{}:{}" .format (self .interface , self .port )
88+ self .sock .connect (pub_uri )
89+ else :
90+ end = time .time () + 300
91+ while True :
92+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
93+ try :
94+ sock .connect ((self .interface , self .port ))
95+ except ConnectionRefusedError :
96+ if time .time () >= end :
97+ raise
98+ time .sleep (1 )
99+ else :
100+ break
101+ self .sock = salt .ext .tornado .iostream .IOStream (sock )
102+
103+ @salt .ext .tornado .gen .coroutine
104+ def _recv (self ):
105+ exc = None
106+ if self .transport == "zeromq" :
107+ # test_zeromq_filtering requires catching the
108+ # SaltDeserializationError in order to pass.
109+ try :
110+ payload = self .sock .recv (zmq .NOBLOCK )
111+ serial_payload = salt .payload .Serial ({}).loads (payload )
112+ raise salt .ext .tornado .gen .Return (serial_payload )
113+ except (zmq .ZMQError , salt .exceptions .SaltDeserializationError ):
114+ exc = RecvError ("ZMQ Error" )
115+ else :
116+ for msg in self .unpacker :
117+ serial_payload = salt .payload .Serial ({}).loads (msg ["body" ])
118+ raise salt .ext .tornado .gen .Return (serial_payload )
119+ byts = yield self .sock .read_bytes (8096 , partial = True )
120+ self .unpacker .feed (byts )
121+ for msg in self .unpacker :
122+ serial_payload = salt .payload .Serial ({}).loads (msg ["body" ])
123+ raise salt .ext .tornado .gen .Return (serial_payload )
124+ exc = RecvError ("TCP Error" )
125+ raise exc
126+
127+ @salt .ext .tornado .gen .coroutine
128+ def _run (self , loop ):
129+ try :
130+ self ._setup_listener ()
131+ except Exception : # pylint: disable=broad-except
132+ self .started .set ()
133+ log .exception ("Failed to start listening" )
134+ return
135+ self .started .set ()
70136 last_msg = time .time ()
71- serial = salt .payload .Serial (self .minion_config )
72137 crypticle = salt .crypt .Crypticle (self .minion_config , self .aes_key )
73- self .started .set ()
74138 while True :
75139 curr_time = time .time ()
76140 if time .time () > self .hard_timeout :
141+ log .error ("Hard timeout reaced in test collector!" )
77142 break
78143 if curr_time - last_msg >= self .timeout :
144+ log .error ("Receive timeout reaced in test collector!" )
79145 break
80146 try :
81- payload = sock . recv ( zmq . NOBLOCK )
82- except zmq . ZMQError :
147+ payload = yield self . _recv ( )
148+ except RecvError :
83149 time .sleep (0.01 )
84150 else :
85151 try :
86- serial_payload = serial .loads (payload )
87- payload = crypticle .loads (serial_payload ["load" ])
152+ payload = crypticle .loads (payload ["load" ])
88153 if not payload :
89154 continue
90155 if "start" in payload :
156+ log .info ("Collector started" )
91157 self .running .set ()
92158 continue
93159 if "stop" in payload :
160+ log .info ("Collector stopped" )
94161 break
95162 last_msg = time .time ()
96163 self .results .append (payload ["jid" ])
97164 except salt .exceptions .SaltDeserializationError :
165+ log .error ("Deserializer Error" )
98166 if not self .zmq_filtering :
99167 log .exception ("Failed to deserialize..." )
100168 break
169+ loop .stop ()
170+
171+ def run (self ):
172+ """
173+ Gather results until then number of seconds specified by timeout passes
174+ without receiving a message
175+ """
176+ loop = salt .ext .tornado .ioloop .IOLoop ()
177+ loop .add_callback (self ._run , loop )
178+ loop .start ()
101179
102180 def __enter__ (self ):
103181 self .manager .__enter__ ()
@@ -140,18 +218,21 @@ def __init__(self, master_config, minion_config, **collector_kwargs):
140218 self .process_manager = salt .utils .process .ProcessManager (
141219 name = "ZMQ-PubServer-ProcessManager"
142220 )
143- self .pub_server_channel = salt .transport .zeromq . ZeroMQPubServerChannel (
221+ self .pub_server_channel = salt .transport .server . PubServerChannel . factory (
144222 self .master_config
145223 )
146224 self .pub_server_channel .pre_fork (
147225 self .process_manager ,
148226 kwargs = {"log_queue" : salt .log .setup .get_multiprocessing_logging_queue ()},
149227 )
150- self .pub_uri = "tcp://{interface}:{publish_port}" .format (** self .master_config )
151228 self .queue = multiprocessing .Queue ()
152229 self .stopped = multiprocessing .Event ()
153230 self .collector = Collector (
154- self .minion_config , self .pub_uri , self .aes_key , ** self .collector_kwargs
231+ self .minion_config ,
232+ self .master_config ["interface" ],
233+ self .master_config ["publish_port" ],
234+ self .aes_key ,
235+ ** self .collector_kwargs
155236 )
156237
157238 def run (self ):
@@ -179,8 +260,8 @@ def close(self):
179260 return
180261 self .process_manager .stop_restarting ()
181262 self .process_manager .send_signal_to_processes (signal .SIGTERM )
182- self .pub_server_channel . pub_close ()
183- self .process_manager . kill_children ()
263+ if hasattr ( self .pub_server_channel , " pub_close" ):
264+ self .pub_server_channel . pub_close ()
184265 # Really terminate any process still left behind
185266 for pid in self .process_manager ._process_map :
186267 terminate_process (pid = pid , kill_children = True , slow_stop = False )
@@ -192,7 +273,7 @@ def publish(self, payload):
192273 def __enter__ (self ):
193274 self .start ()
194275 self .collector .__enter__ ()
195- attempts = 30
276+ attempts = 300
196277 while attempts > 0 :
197278 self .publish ({"tgt_type" : "glob" , "tgt" : "*" , "jid" : - 1 , "start" : True })
198279 if self .collector .running .wait (1 ) is True :
@@ -210,33 +291,43 @@ def __exit__(self, *args):
210291 # We can safely wait here without a timeout because the Collector instance has a
211292 # hard timeout set, so eventually Collector.stopped will be set
212293 self .collector .stopped .wait ()
294+ self .collector .join ()
213295 # Stop our own processing
214296 self .queue .put (None )
215297 # Wait at most 10 secs for the above `None` in the queue to be processed
216298 self .stopped .wait (10 )
217299 self .close ()
218300 self .terminate ()
301+ self .join ()
219302 log .info ("The PubServerChannelProcess has terminated" )
220303
221304
305+ @pytest .fixture (params = ["tcp" , "zeromq" ])
306+ def transport (request ):
307+ yield request .param
308+
309+
222310@pytest .mark .skip_on_windows
223311@pytest .mark .slow_test
224- def test_publish_to_pubserv_ipc (salt_master , salt_minion ):
312+ def test_publish_to_pubserv_ipc (salt_master , salt_minion , transport ):
225313 """
226314 Test sending 10K messags to ZeroMQPubServerChannel using IPC transport
227315
228316 ZMQ's ipc transport not supported on Windows
229317 """
230- opts = dict (salt_master .config .copy (), ipc_mode = "ipc" , pub_hwm = 0 )
231- with PubServerChannelProcess (opts , salt_minion .config .copy ()) as server_channel :
318+ opts = dict (
319+ salt_master .config .copy (), ipc_mode = "ipc" , pub_hwm = 0 , transport = transport
320+ )
321+ minion_opts = dict (salt_minion .config .copy (), transport = transport )
322+ with PubServerChannelProcess (opts , minion_opts ) as server_channel :
232323 send_num = 10000
233324 expect = []
234325 for idx in range (send_num ):
235326 expect .append (idx )
236327 load = {"tgt_type" : "glob" , "tgt" : "*" , "jid" : idx }
237328 server_channel .publish (load )
238329 results = server_channel .collector .results
239- assert len (results ) == send_num , "{} != {}, difference: {}" .format (
330+ assert len (results ) == send_num , "{} != {}, difference: {:.40 }" .format (
240331 len (results ), send_num , set (expect ).difference (results )
241332 )
242333
@@ -252,15 +343,15 @@ def test_issue_36469_tcp(salt_master, salt_minion):
252343 """
253344
254345 def _send_small (opts , sid , num = 10 ):
255- server_channel = salt .transport .zeromq . ZeroMQPubServerChannel (opts )
346+ server_channel = salt .transport .server . PubServerChannel . factory (opts )
256347 for idx in range (num ):
257348 load = {"tgt_type" : "glob" , "tgt" : "*" , "jid" : "{}-s{}" .format (sid , idx )}
258349 server_channel .publish (load )
259350 time .sleep (0.3 )
260351 server_channel .pub_close ()
261352
262353 def _send_large (opts , sid , num = 10 , size = 250000 * 3 ):
263- server_channel = salt .transport .zeromq . ZeroMQPubServerChannel (opts )
354+ server_channel = salt .transport .server . PubServerChannel . factory (opts )
264355 for idx in range (num ):
265356 load = {
266357 "tgt_type" : "glob" ,
@@ -269,7 +360,7 @@ def _send_large(opts, sid, num=10, size=250000 * 3):
269360 "xdata" : "0" * size ,
270361 }
271362 server_channel .publish (load )
272- time .sleep (0. 3 )
363+ time .sleep (3 )
273364 server_channel .pub_close ()
274365
275366 opts = dict (salt_master .config .copy (), ipc_mode = "tcp" , pub_hwm = 0 )
0 commit comments