Skip to content

Commit d9f84c6

Browse files
authored
Backend Executor: wait for condition before returning (#14)
* Backend Executor: wait for condition before returning Signed-off-by: joshvanl <[email protected]> * Return bare wrapperspb from runtimestate Signed-off-by: joshvanl <[email protected]> * Fix runtimestate test Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent a1b42ae commit d9f84c6

File tree

3 files changed

+31
-11
lines changed

3 files changed

+31
-11
lines changed

backend/executor.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,10 @@ func (g *grpcExecutor) PurgeInstances(ctx context.Context, req *protos.PurgeInst
425425
if err != nil {
426426
return resp, fmt.Errorf("failed to purge orchestration state: %w", err)
427427
}
428-
return resp, nil
428+
429+
_, err = g.WaitForInstanceCompletion(ctx, &protos.GetInstanceRequest{InstanceId: req.GetInstanceId()})
430+
431+
return resp, err
429432
}
430433

431434
// QueryInstances implements protos.TaskHubSidecarServiceServer
@@ -493,7 +496,10 @@ func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.Termin
493496
if err := g.backend.AddNewOrchestrationEvent(ctx, api.InstanceID(req.InstanceId), e); err != nil {
494497
return nil, fmt.Errorf("failed to submit termination request: %w", err)
495498
}
496-
return &protos.TerminateResponse{}, nil
499+
500+
_, err := g.WaitForInstanceCompletion(ctx, &protos.GetInstanceRequest{InstanceId: req.InstanceId})
501+
502+
return &protos.TerminateResponse{}, err
497503
}
498504

499505
// SuspendInstance implements protos.TaskHubSidecarServiceServer
@@ -515,7 +521,14 @@ func (g *grpcExecutor) SuspendInstance(ctx context.Context, req *protos.SuspendR
515521
return nil, err
516522
}
517523

518-
return &protos.SuspendResponse{}, nil
524+
_, err := g.waitForInstance(ctx, &protos.GetInstanceRequest{
525+
InstanceId: req.InstanceId,
526+
}, func(metadata *OrchestrationMetadata) bool {
527+
return metadata.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED ||
528+
api.OrchestrationMetadataIsComplete(metadata)
529+
})
530+
531+
return &protos.SuspendResponse{}, err
519532
}
520533

521534
// ResumeInstance implements protos.TaskHubSidecarServiceServer
@@ -537,7 +550,14 @@ func (g *grpcExecutor) ResumeInstance(ctx context.Context, req *protos.ResumeReq
537550
return nil, err
538551
}
539552

540-
return &protos.ResumeResponse{}, nil
553+
_, err := g.waitForInstance(ctx, &protos.GetInstanceRequest{
554+
InstanceId: req.InstanceId,
555+
}, func(metadata *OrchestrationMetadata) bool {
556+
return metadata.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING ||
557+
api.OrchestrationMetadataIsComplete(metadata)
558+
})
559+
560+
return &protos.ResumeResponse{}, err
541561
}
542562

543563
// WaitForInstanceCompletion implements protos.TaskHubSidecarServiceServer

backend/runtimestate/runtimestate.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,22 +278,22 @@ func Name(s *protos.OrchestrationRuntimeState) (string, error) {
278278
return s.StartEvent.Name, nil
279279
}
280280

281-
func Input(s *protos.OrchestrationRuntimeState) (string, error) {
281+
func Input(s *protos.OrchestrationRuntimeState) (*wrapperspb.StringValue, error) {
282282
if s.StartEvent == nil {
283-
return "", api.ErrNotStarted
283+
return nil, api.ErrNotStarted
284284
}
285285

286286
// REVIEW: Should we distinguish between no input and the empty string?
287-
return s.StartEvent.Input.GetValue(), nil
287+
return s.StartEvent.Input, nil
288288
}
289289

290-
func Output(s *protos.OrchestrationRuntimeState) (string, error) {
290+
func Output(s *protos.OrchestrationRuntimeState) (*wrapperspb.StringValue, error) {
291291
if s.CompletedEvent == nil {
292-
return "", api.ErrNotCompleted
292+
return nil, api.ErrNotCompleted
293293
}
294294

295295
// REVIEW: Should we distinguish between no output and the empty string?
296-
return s.CompletedEvent.Result.GetValue(), nil
296+
return s.CompletedEvent.Result, nil
297297
}
298298

299299
func RuntimeStatus(s *protos.OrchestrationRuntimeState) protos.OrchestrationStatus {

tests/runtimestate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func Test_RuntimeState_ContinueAsNew(t *testing.T) {
231231
assert.Equal(t, expectedName, ec.Name)
232232
}
233233
if input, err := runtimestate.Input(state); assert.NoError(t, err) {
234-
assert.Equal(t, continueAsNewInput, input)
234+
assert.Equal(t, continueAsNewInput, input.GetValue())
235235
}
236236
}
237237
assert.NotNil(t, state.NewEvents[2].Timestamp)

0 commit comments

Comments
 (0)