22import threading
33import signal
44import time
5- from multiprocessing import Process
5+ from pathlib import Path
6+
7+ from watchfiles import watch
8+ from watchfiles .filters import PythonFilter
9+ from watchfiles .run import start_process
610
711from connect .eaas .runner .config import ConfigHelper
812from connect .eaas .runner .constants import (
2832logger = logging .getLogger ('connect.eaas' )
2933
3034
35+ HANDLED_SIGNALS = (signal .SIGINT , signal .SIGTERM )
36+
37+
38+ def _display_path (path ):
39+ try :
40+ return f'"{ path .relative_to (Path .cwd ())} "'
41+ except ValueError : # pragma: no cover
42+ return f'"{ path } "'
43+
44+
3145class Master :
3246
3347 HANDLER_CLASSES = {
@@ -44,15 +58,31 @@ class Master:
4458 ANVILAPP_WORKER : start_anvilapp_worker_process ,
4559 }
4660
47- def __init__ (self , secure = True ):
61+ def __init__ (self , secure = True , debug = False , no_rich = False , reload = False ):
4862 self .config = ConfigHelper (secure = secure )
4963 self .handlers = {
5064 worker_type : self .HANDLER_CLASSES [worker_type ](self .config )
5165 for worker_type in WORKER_TYPES
5266 }
67+ self .reload = reload
68+ self .debug = debug
69+ self .no_rich = no_rich
5370 self .workers = {}
54- self .exited_workers = {}
5571 self .stop_event = threading .Event ()
72+ self .monitor_event = threading .Event ()
73+ self .watch_filter = PythonFilter (ignore_paths = None )
74+ self .watcher = watch (
75+ Path .cwd (),
76+ watch_filter = self .watch_filter ,
77+ stop_event = self .stop_event ,
78+ yield_on_timeout = True ,
79+ )
80+ self .monitor_thread = None
81+ self .setup_signals_handler ()
82+
83+ def setup_signals_handler (self ):
84+ for sig in HANDLED_SIGNALS :
85+ signal .signal (sig , self .handle_signal )
5686
5787 def get_available_features (self ):
5888 have_features = False
@@ -67,38 +97,70 @@ def get_available_features(self):
6797 features [handler .__class__ .__name__ ] = worker_info
6898 return have_features , features
6999
70- def start_worker_process (self , worker_type , handler ):
71- p = Process (
72- target = self .PROCESS_TARGETS [worker_type ],
73- args = (handler ,),
74- )
75- self .workers [worker_type ] = p
76- p .start ()
77- logger .info (f'{ worker_type .capitalize ()} worker pid: { p .pid } ' )
100+ def handle_signal (self , * args , ** kwargs ):
101+ self .stop_event .set ()
78102
79- def run (self ):
103+ def start (self ):
80104 for worker_type , handler in self .handlers .items ():
81105 if handler .should_start :
82- self .exited_workers [worker_type ] = False
83106 self .start_worker_process (worker_type , handler )
107+ self .monitor_thread = threading .Thread (target = self .monitor_processes )
108+ self .monitor_event .set ()
109+ self .monitor_thread .start ()
84110
85- def _terminate (* args , ** kwargs ): # pragma: no cover
86- for p in self .workers .values ():
87- p .join ()
88- self .stop_event .set ()
89-
90- signal .signal (signal .SIGINT , _terminate )
91- signal .signal (signal .SIGTERM , _terminate )
111+ def start_worker_process (self , worker_type , handler ):
112+ p = start_process (
113+ self .PROCESS_TARGETS [worker_type ],
114+ 'function' ,
115+ (handler , self .debug , self .no_rich ),
116+ {},
117+ )
118+ self .workers [worker_type ] = p
119+ logger .info (f'{ worker_type .capitalize ()} worker pid: { p .pid } ' )
92120
93- while not (self .stop_event .is_set () or all (self .exited_workers .values ())):
121+ def monitor_processes (self ):
122+ while self .monitor_event .is_set ():
94123 for worker_type , p in self .workers .items ():
95124 if not p .is_alive ():
96125 if p .exitcode != 0 :
97126 notify_process_restarted (worker_type )
98127 logger .info (f'Process of type { worker_type } is dead, restart it' )
99128 self .start_worker_process (worker_type , self .handlers [worker_type ])
100129 else :
101- self .exited_workers [worker_type ] = True
102130 logger .info (f'{ worker_type .capitalize ()} worker exited' )
103131
104132 time .sleep (PROCESS_CHECK_INTERVAL_SECS )
133+
134+ def stop (self ):
135+ self .monitor_event .clear ()
136+ self .monitor_thread .join ()
137+ for process in self .workers .values ():
138+ process .stop (sigint_timeout = 5 , sigkill_timeout = 1 )
139+ logger .info (f'Consumer process with pid { process .pid } stopped.' )
140+
141+ def restart (self ):
142+ self .stop ()
143+ self .start ()
144+
145+ def __iter__ (self ):
146+ return self
147+
148+ def __next__ (self ):
149+ changes = next (self .watcher )
150+ if changes :
151+ return list ({Path (c [1 ]) for c in changes })
152+ return None
153+
154+ def run (self ):
155+ self .start ()
156+ if self .reload :
157+ for files_changed in self :
158+ if files_changed :
159+ logger .warning (
160+ 'Detected changes in %s. Reloading...' ,
161+ ', ' .join (map (_display_path , files_changed )),
162+ )
163+ self .restart ()
164+ else :
165+ self .stop_event .wait ()
166+ self .stop ()
0 commit comments