1+ use futures_util:: FutureExt ;
12use tonic:: IntoRequest ;
2- use tracing:: { error, warn} ;
3+ use tracing:: { debug , error, info , warn} ;
34
45use crate :: {
56 worker:: { grpc:: ActionType , DEFAULT_ACTION_TIMEOUT } ,
@@ -37,6 +38,11 @@ fn step_action_event(
3738 }
3839}
3940
41+ #[ derive( serde:: Deserialize ) ]
42+ struct ActionInput < T > {
43+ input : T ,
44+ }
45+
4046async fn handle_start_step_run < F > (
4147 dispatcher : & mut DispatcherClient <
4248 tonic:: service:: interceptor:: InterceptedService < tonic:: transport:: Channel , F > ,
@@ -59,22 +65,25 @@ where
5965 return Ok ( ( ) ) ;
6066 } ;
6167
68+ debug ! ( "Received a new action: {action:?}." ) ;
69+
6270 dispatcher
6371 . send_step_action_event ( step_action_event (
6472 worker_id,
6573 & action,
6674 StepActionEventType :: StepEventTypeStarted ,
6775 Default :: default ( ) ,
6876 ) )
69- . await ?
77+ . await
78+ . map_err ( crate :: Error :: CouldNotSendStepStatus ) ?
7079 . into_inner ( ) ;
7180
72- let input = serde_json:: from_str ( & action. action_payload )
81+ let input: ActionInput < serde_json :: Value > = serde_json:: from_str ( & action. action_payload )
7382 . map_err ( crate :: Error :: CouldNotDecodeActionPayload ) ?;
7483
7584 // FIXME: Obviously, run this asynchronously rather than blocking the main listening loop.
7685 let action_event =
77- match tokio:: task:: spawn_local ( async move { action_callable ( input) . await } ) . await {
86+ match tokio:: task:: spawn ( async move { action_callable ( input . input ) . await } ) . await {
7887 Ok ( Ok ( output_value) ) => step_action_event (
7988 worker_id,
8089 & action,
97106
98107 dispatcher
99108 . send_step_action_event ( action_event)
100- . await ?
109+ . await
110+ . map_err ( crate :: Error :: CouldNotSendStepStatus ) ?
101111 . into_inner ( ) ;
102112
103113 Ok ( ( ) )
@@ -110,7 +120,7 @@ pub(crate) async fn run<F>(
110120 namespace : & str ,
111121 worker_id : & str ,
112122 workflows : Vec < Workflow > ,
113- listener_v2_timeout : u64 ,
123+ listener_v2_timeout : Option < u64 > ,
114124 mut interrupt_receiver : tokio:: sync:: mpsc:: Receiver < ( ) > ,
115125 _heartbeat_interrupt_sender : tokio:: sync:: mpsc:: Sender < ( ) > ,
116126) -> crate :: Result < ( ) >
@@ -125,6 +135,8 @@ where
125135 let connection_attempt = tokio:: time:: Instant :: now ( ) ;
126136
127137 ' main_loop: loop {
138+ info ! ( "Listening…" ) ;
139+
128140 if connection_attempt. elapsed ( ) > DEFAULT_ACTION_LISTENER_RETRY_INTERVAL {
129141 retries = 0 ;
130142 }
@@ -134,22 +146,41 @@ where
134146 ) ) ;
135147 }
136148
137- let mut stream = match listen_strategy {
149+ let response = match listen_strategy {
138150 ListenStrategy :: V1 => {
151+ info ! ( "Using strategy v1" ) ;
152+
139153 let mut request = WorkerListenRequest {
140154 worker_id : worker_id. to_owned ( ) ,
141155 }
142156 . into_request ( ) ;
143157 request. set_timeout ( DEFAULT_ACTION_TIMEOUT ) ;
144- dispatcher. listen ( request) . await ? . into_inner ( )
158+ Box :: new ( dispatcher. listen ( request) ) . boxed ( )
145159 }
146160 ListenStrategy :: V2 => {
161+ info ! ( "Using strategy v2" ) ;
162+
147163 let mut request = WorkerListenRequest {
148164 worker_id : worker_id. to_owned ( ) ,
149165 }
150166 . into_request ( ) ;
151- request. set_timeout ( std:: time:: Duration :: from_millis ( listener_v2_timeout) ) ;
152- dispatcher. listen_v2 ( request) . await ?. into_inner ( )
167+ if let Some ( listener_v2_timeout) = listener_v2_timeout {
168+ request. set_timeout ( std:: time:: Duration :: from_millis ( listener_v2_timeout) ) ;
169+ }
170+ dispatcher. listen_v2 ( request) . boxed ( )
171+ }
172+ } ;
173+
174+ let mut stream = tokio:: select! {
175+ response = response => {
176+ response
177+ . map_err( crate :: Error :: CouldNotListenToDispatcher ) ?
178+ . into_inner( )
179+ }
180+ result = interrupt_receiver. recv( ) => {
181+ assert!( result. is_some( ) ) ;
182+ warn!( "Interrupt received." ) ;
183+ break ' main_loop;
153184 }
154185 } ;
155186
@@ -200,7 +231,9 @@ where
200231 }
201232 }
202233 }
203- _ = interrupt_receiver. recv( ) => {
234+ result = interrupt_receiver. recv( ) => {
235+ assert!( result. is_some( ) ) ;
236+ warn!( "Interrupt received." ) ;
204237 break ' main_loop;
205238 }
206239 }
0 commit comments