1+ use futures_util:: FutureExt ;
2+ use tokio:: { task:: LocalSet , task_local} ;
13use tonic:: IntoRequest ;
2- use tracing:: { error, warn} ;
4+ use tracing:: { debug , error, info , warn} ;
35
46use crate :: {
57 worker:: { grpc:: ActionType , DEFAULT_ACTION_TIMEOUT } ,
@@ -37,10 +39,16 @@ fn step_action_event(
3739 }
3840}
3941
42+ #[ derive( serde:: Deserialize ) ]
43+ struct ActionInput < T > {
44+ input : T ,
45+ }
46+
4047async fn handle_start_step_run < F > (
4148 dispatcher : & mut DispatcherClient <
4249 tonic:: service:: interceptor:: InterceptedService < tonic:: transport:: Channel , F > ,
4350 > ,
51+ local_set : & tokio:: task:: LocalSet ,
4452 namespace : & str ,
4553 worker_id : & str ,
4654 workflows : & [ Workflow ] ,
@@ -59,45 +67,53 @@ where
5967 return Ok ( ( ) ) ;
6068 } ;
6169
70+ debug ! ( "Received a new action: {action:?}." ) ;
71+
6272 dispatcher
6373 . send_step_action_event ( step_action_event (
6474 worker_id,
6575 & action,
6676 StepActionEventType :: StepEventTypeStarted ,
6777 Default :: default ( ) ,
6878 ) )
69- . await ?
79+ . await
80+ . map_err ( crate :: Error :: CouldNotSendStepStatus ) ?
7081 . into_inner ( ) ;
7182
72- let input = serde_json:: from_str ( & action. action_payload )
83+ let input: ActionInput < serde_json :: Value > = serde_json:: from_str ( & action. action_payload )
7384 . map_err ( crate :: Error :: CouldNotDecodeActionPayload ) ?;
7485
7586 // FIXME: Obviously, run this asynchronously rather than blocking the main listening loop.
76- let action_event =
77- match tokio:: task:: spawn_local ( async move { action_callable ( input) . await } ) . await {
78- Ok ( Ok ( output_value) ) => step_action_event (
79- worker_id,
80- & action,
81- StepActionEventType :: StepEventTypeCompleted ,
82- serde_json:: to_string ( & output_value) . expect ( "must succeed" ) ,
83- ) ,
84- Ok ( Err ( error) ) => step_action_event (
85- worker_id,
86- & action,
87- StepActionEventType :: StepEventTypeFailed ,
88- error. to_string ( ) ,
89- ) ,
90- Err ( join_error) => step_action_event (
91- worker_id,
92- & action,
93- StepActionEventType :: StepEventTypeFailed ,
94- join_error. to_string ( ) ,
95- ) ,
96- } ;
87+ let action_event = match local_set
88+ . run_until ( async move {
89+ tokio:: task:: spawn_local ( async move { action_callable ( input. input ) . await } ) . await
90+ } )
91+ . await
92+ {
93+ Ok ( Ok ( output_value) ) => step_action_event (
94+ worker_id,
95+ & action,
96+ StepActionEventType :: StepEventTypeCompleted ,
97+ serde_json:: to_string ( & output_value) . expect ( "must succeed" ) ,
98+ ) ,
99+ Ok ( Err ( error) ) => step_action_event (
100+ worker_id,
101+ & action,
102+ StepActionEventType :: StepEventTypeFailed ,
103+ error. to_string ( ) ,
104+ ) ,
105+ Err ( join_error) => step_action_event (
106+ worker_id,
107+ & action,
108+ StepActionEventType :: StepEventTypeFailed ,
109+ join_error. to_string ( ) ,
110+ ) ,
111+ } ;
97112
98113 dispatcher
99114 . send_step_action_event ( action_event)
100- . await ?
115+ . await
116+ . map_err ( crate :: Error :: CouldNotSendStepStatus ) ?
101117 . into_inner ( ) ;
102118
103119 Ok ( ( ) )
@@ -110,7 +126,7 @@ pub(crate) async fn run<F>(
110126 namespace : & str ,
111127 worker_id : & str ,
112128 workflows : Vec < Workflow > ,
113- listener_v2_timeout : u64 ,
129+ listener_v2_timeout : Option < u64 > ,
114130 mut interrupt_receiver : tokio:: sync:: mpsc:: Receiver < ( ) > ,
115131 _heartbeat_interrupt_sender : tokio:: sync:: mpsc:: Sender < ( ) > ,
116132) -> crate :: Result < ( ) >
@@ -125,6 +141,8 @@ where
125141 let connection_attempt = tokio:: time:: Instant :: now ( ) ;
126142
127143 ' main_loop: loop {
144+ info ! ( "Listening…" ) ;
145+
128146 if connection_attempt. elapsed ( ) > DEFAULT_ACTION_LISTENER_RETRY_INTERVAL {
129147 retries = 0 ;
130148 }
@@ -134,25 +152,46 @@ where
134152 ) ) ;
135153 }
136154
137- let mut stream = match listen_strategy {
155+ let response = match listen_strategy {
138156 ListenStrategy :: V1 => {
157+ info ! ( "Using strategy v1" ) ;
158+
139159 let mut request = WorkerListenRequest {
140160 worker_id : worker_id. to_owned ( ) ,
141161 }
142162 . into_request ( ) ;
143163 request. set_timeout ( DEFAULT_ACTION_TIMEOUT ) ;
144- dispatcher. listen ( request) . await ? . into_inner ( )
164+ Box :: new ( dispatcher. listen ( request) ) . boxed ( )
145165 }
146166 ListenStrategy :: V2 => {
167+ info ! ( "Using strategy v2" ) ;
168+
147169 let mut request = WorkerListenRequest {
148170 worker_id : worker_id. to_owned ( ) ,
149171 }
150172 . into_request ( ) ;
151- request. set_timeout ( std:: time:: Duration :: from_millis ( listener_v2_timeout) ) ;
152- dispatcher. listen_v2 ( request) . await ?. into_inner ( )
173+ if let Some ( listener_v2_timeout) = listener_v2_timeout {
174+ request. set_timeout ( std:: time:: Duration :: from_millis ( listener_v2_timeout) ) ;
175+ }
176+ dispatcher. listen_v2 ( request) . boxed ( )
153177 }
154178 } ;
155179
180+ let mut stream = tokio:: select! {
181+ response = response => {
182+ response
183+ . map_err( crate :: Error :: CouldNotListenToDispatcher ) ?
184+ . into_inner( )
185+ }
186+ result = interrupt_receiver. recv( ) => {
187+ assert!( result. is_some( ) ) ;
188+ warn!( "Interrupt received." ) ;
189+ break ' main_loop;
190+ }
191+ } ;
192+
193+ let local_set = LocalSet :: new ( ) ;
194+
156195 loop {
157196 tokio:: select! {
158197 element = stream. next( ) => {
@@ -190,7 +229,7 @@ where
190229
191230 match action_type {
192231 ActionType :: StartStepRun => {
193- handle_start_step_run( & mut dispatcher, namespace, worker_id, & workflows, action) . await ?;
232+ handle_start_step_run( & mut dispatcher, & local_set , namespace, worker_id, & workflows, action) . await ?;
194233 }
195234 ActionType :: CancelStepRun => {
196235 todo!( )
@@ -200,7 +239,9 @@ where
200239 }
201240 }
202241 }
203- _ = interrupt_receiver. recv( ) => {
242+ result = interrupt_receiver. recv( ) => {
243+ assert!( result. is_some( ) ) ;
244+ warn!( "Interrupt received." ) ;
204245 break ' main_loop;
205246 }
206247 }
0 commit comments