Skip to content

Commit 79b3916

Browse files
authored
Merge pull request #22 from JoshVanL/workflow-rerun
Adds RerunWorkflowFromEvento
2 parents e60161a + 3410b32 commit 79b3916

File tree

14 files changed

+788
-345
lines changed

14 files changed

+788
-345
lines changed

api/orchestration.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type OrchestrationIdReusePolicy = protos.OrchestrationIdReusePolicy
4747
// InstanceID is a unique identifier for an orchestration instance.
4848
type InstanceID string
4949

50+
func (i InstanceID) String() string {
51+
return string(i)
52+
}
53+
5054
// NewOrchestrationOptions configures options for starting a new orchestration.
5155
type NewOrchestrationOptions func(*protos.CreateInstanceRequest) error
5256

@@ -62,6 +66,8 @@ type TerminateOptions func(*protos.TerminateRequest) error
6266
// PurgeOptions is a set of options for purging an orchestration.
6367
type PurgeOptions func(*protos.PurgeInstancesRequest) error
6468

69+
type RerunOptions func(*protos.RerunWorkflowFromEventRequest) error
70+
6571
// WithInstanceID configures an explicit orchestration instance ID. If not specified,
6672
// a random UUID value will be used for the orchestration instance ID.
6773
func WithInstanceID(id InstanceID) NewOrchestrationOptions {
@@ -181,8 +187,34 @@ func OrchestrationMetadataIsRunning(o *protos.OrchestrationMetadata) bool {
181187
}
182188

183189
func OrchestrationMetadataIsComplete(o *protos.OrchestrationMetadata) bool {
184-
return o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED ||
185-
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED ||
186-
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED ||
187-
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED
190+
return o.GetRuntimeStatus() == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED ||
191+
o.GetRuntimeStatus() == protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED ||
192+
o.GetRuntimeStatus() == protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED ||
193+
o.GetRuntimeStatus() == protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED
194+
}
195+
196+
func WithRerunInput(input any) RerunOptions {
197+
return func(req *protos.RerunWorkflowFromEventRequest) error {
198+
req.OverwriteInput = true
199+
200+
if input == nil {
201+
return nil
202+
}
203+
204+
bytes, err := json.Marshal(input)
205+
if err != nil {
206+
return err
207+
}
208+
209+
req.Input = wrapperspb.String(string(bytes))
210+
211+
return nil
212+
}
213+
}
214+
215+
func WithRerunNewInstanceID(id InstanceID) RerunOptions {
216+
return func(req *protos.RerunWorkflowFromEventRequest) error {
217+
req.NewInstanceID = id.String()
218+
return nil
219+
}
188220
}

api/protos/orchestrator_service.pb.go

Lines changed: 517 additions & 323 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/protos/orchestrator_service_grpc.pb.go

Lines changed: 39 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/backend.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type (
3232
DurableTimer = protos.DurableTimer
3333
OrchestrationRuntimeState = protos.OrchestrationRuntimeState
3434
OrchestrationRuntimeStateMessage = protos.OrchestrationRuntimeStateMessage
35+
RerunWorkflowFromEventRequest = protos.RerunWorkflowFromEventRequest
3536
)
3637

3738
type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy) error
@@ -68,6 +69,12 @@ type Backend interface {
6869
// wraps a ExecutionStarted event.
6970
CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error
7071

72+
// RerunWorkflowFromEvent reruns a workflow from a specific event ID of some
73+
// source instance ID. If not given, a random new instance ID will be
74+
// generated and returned. Can optionally give a new input to the target
75+
// event ID to rerun from.
76+
RerunWorkflowFromEvent(ctx context.Context, req *protos.RerunWorkflowFromEventRequest) (api.InstanceID, error)
77+
7178
// AddNewEvent adds a new orchestration event to the specified orchestration instance.
7279
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error
7380

backend/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type TaskHubClient interface {
2828
SuspendOrchestration(ctx context.Context, id api.InstanceID, reason string) error
2929
ResumeOrchestration(ctx context.Context, id api.InstanceID, reason string) error
3030
PurgeOrchestrationState(ctx context.Context, id api.InstanceID, opts ...api.PurgeOptions) error
31+
RerunWorkflowFromEvent(ctx context.Context, source api.InstanceID, eventID uint32, opts ...api.RerunOptions) (api.InstanceID, error)
3132
}
3233

3334
type backendClient struct {
@@ -257,3 +258,19 @@ func (c *backendClient) PurgeOrchestrationState(ctx context.Context, id api.Inst
257258
}
258259
return nil
259260
}
261+
262+
// RerunWorkflowFromEvent reruns a workflow from a specific event ID of some
263+
// source instance ID. If not given, a random new instance ID will be generated
264+
// and returned. Can optionally give a new input to the target event ID to
265+
// rerun from.
266+
func (c *backendClient) RerunWorkflowFromEvent(ctx context.Context, id api.InstanceID, eventID uint32, opts ...api.RerunOptions) (api.InstanceID, error) {
267+
req := &protos.RerunWorkflowFromEventRequest{SourceInstanceID: string(id), EventID: eventID}
268+
for _, configure := range opts {
269+
if err := configure(req); err != nil {
270+
return "", fmt.Errorf("failed to configure rerun request: %w", err)
271+
}
272+
}
273+
274+
id, err := c.be.RerunWorkflowFromEvent(ctx, req)
275+
return id, err
276+
}

backend/executor.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type Executor interface {
4848

4949
type grpcExecutor struct {
5050
protos.UnimplementedTaskHubSidecarServiceServer
51+
5152
workItemQueue chan *protos.WorkItem
5253
pendingOrchestrators *sync.Map // map[api.InstanceID]*ExecutionResults
5354
pendingActivities *sync.Map // map[string]*activityExecutionResult
@@ -488,6 +489,24 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
488489
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
489490
}
490491

492+
// RerunWorkflowFromEvent reruns a workflow from a specific event ID of some
493+
// source instance ID. If not given, a random new instance ID will be
494+
// generated and returned. Can optionally give a new input to the target
495+
// event ID to rerun from.
496+
func (g *grpcExecutor) RerunWorkflowFromEvent(ctx context.Context, req *protos.RerunWorkflowFromEventRequest) (*protos.RerunWorkflowFromEventResponse, error) {
497+
newInstanceID, err := g.backend.RerunWorkflowFromEvent(ctx, req)
498+
if err != nil {
499+
return nil, err
500+
}
501+
502+
_, err = g.WaitForInstanceStart(ctx, &protos.GetInstanceRequest{InstanceId: newInstanceID.String()})
503+
if err != nil {
504+
return nil, err
505+
}
506+
507+
return &protos.RerunWorkflowFromEventResponse{NewInstanceID: newInstanceID.String()}, nil
508+
}
509+
491510
// TerminateInstance implements protos.TaskHubSidecarServiceServer
492511
func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.TerminateRequest) (*protos.TerminateResponse, error) {
493512
e := &protos.HistoryEvent{
@@ -613,10 +632,6 @@ func (g *grpcExecutor) waitForInstance(ctx context.Context, req *protos.GetInsta
613632
return createGetInstanceResponse(req, metadata), nil
614633
}
615634

616-
// mustEmbedUnimplementedTaskHubSidecarServiceServer implements protos.TaskHubSidecarServiceServer
617-
func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() {
618-
}
619-
620635
func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *OrchestrationMetadata) *protos.GetInstanceResponse {
621636
state := &protos.OrchestrationState{
622637
InstanceId: req.InstanceId,
@@ -635,3 +650,45 @@ func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *Orchest
635650

636651
return &protos.GetInstanceResponse{Exists: true, OrchestrationState: state}
637652
}
653+
654+
func (g *grpcExecutor) AbandonTaskActivityWorkItem(ctx context.Context, req *protos.AbandonActivityTaskRequest) (*protos.AbandonActivityTaskResponse, error) {
655+
return nil, status.Error(codes.Unimplemented, "")
656+
}
657+
658+
func (g *grpcExecutor) AbandonTaskEntityWorkItem(ctx context.Context, req *protos.AbandonEntityTaskRequest) (*protos.AbandonEntityTaskResponse, error) {
659+
return nil, status.Error(codes.Unimplemented, "")
660+
}
661+
662+
func (g *grpcExecutor) AbandonTaskOrchestratorWorkItem(ctx context.Context, req *protos.AbandonOrchestrationTaskRequest) (*protos.AbandonOrchestrationTaskResponse, error) {
663+
return nil, status.Error(codes.Unimplemented, "")
664+
}
665+
666+
func (g *grpcExecutor) CleanEntityStorage(ctx context.Context, req *protos.CleanEntityStorageRequest) (*protos.CleanEntityStorageResponse, error) {
667+
return nil, status.Error(codes.Unimplemented, "")
668+
}
669+
670+
func (g *grpcExecutor) CompleteEntityTask(ctx context.Context, req *protos.EntityBatchResult) (*protos.CompleteTaskResponse, error) {
671+
return nil, status.Error(codes.Unimplemented, "")
672+
}
673+
674+
func (g *grpcExecutor) GetEntity(ctx context.Context, req *protos.GetEntityRequest) (*protos.GetEntityResponse, error) {
675+
return nil, status.Error(codes.Unimplemented, "")
676+
}
677+
678+
func (g *grpcExecutor) QueryEntities(ctx context.Context, req *protos.QueryEntitiesRequest) (*protos.QueryEntitiesResponse, error) {
679+
return nil, status.Error(codes.Unimplemented, "")
680+
}
681+
682+
func (g *grpcExecutor) RewindInstance(ctx context.Context, req *protos.RewindInstanceRequest) (*protos.RewindInstanceResponse, error) {
683+
return nil, status.Error(codes.Unimplemented, "")
684+
}
685+
686+
func (g *grpcExecutor) SignalEntity(ctx context.Context, req *protos.SignalEntityRequest) (*protos.SignalEntityResponse, error) {
687+
return nil, status.Error(codes.Unimplemented, "")
688+
}
689+
690+
func (g *grpcExecutor) StreamInstanceHistory(req *protos.StreamInstanceHistoryRequest, stream protos.TaskHubSidecarService_StreamInstanceHistoryServer) error {
691+
return status.Error(codes.Unimplemented, "")
692+
}
693+
694+
func (grpcExecutor) mustEmbedUnimplementedTaskHubSidecarServiceServer() {}

backend/postgres/postgres.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/dapr/durabletask-go/backend"
1717
"github.com/dapr/durabletask-go/backend/runtimestate"
1818
"github.com/google/uuid"
19+
"google.golang.org/grpc/codes"
20+
"google.golang.org/grpc/status"
1921
"google.golang.org/protobuf/proto"
2022
"google.golang.org/protobuf/types/known/timestamppb"
2123
"google.golang.org/protobuf/types/known/wrapperspb"
@@ -1123,3 +1125,7 @@ func (be *postgresBackend) String() string {
11231125
connectionURI := fmt.Sprintf("postgresql://%s:%s@%s:%d/%s", be.options.PgOptions.ConnConfig.User, be.options.PgOptions.ConnConfig.Password, be.options.PgOptions.ConnConfig.Host, be.options.PgOptions.ConnConfig.Port, be.options.PgOptions.ConnConfig.Database)
11241126
return connectionURI
11251127
}
1128+
1129+
func (be *postgresBackend) RerunWorkflowFromEvent(ctx context.Context, req *backend.RerunWorkflowFromEventRequest) (api.InstanceID, error) {
1130+
return "", status.Error(codes.Unimplemented, "not implemented")
1131+
}

backend/sqlite/sqlite.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/dapr/durabletask-go/backend"
1818
"github.com/dapr/durabletask-go/backend/runtimestate"
1919
"github.com/google/uuid"
20+
"google.golang.org/grpc/codes"
21+
"google.golang.org/grpc/status"
2022
"google.golang.org/protobuf/proto"
2123
"google.golang.org/protobuf/types/known/timestamppb"
2224
"google.golang.org/protobuf/types/known/wrapperspb"
@@ -1108,3 +1110,7 @@ func (be *sqliteBackend) ensureDB() error {
11081110
func (be *sqliteBackend) String() string {
11091111
return fmt.Sprintf("sqlite::%s", be.options.FilePath)
11101112
}
1113+
1114+
func (be *sqliteBackend) RerunWorkflowFromEvent(ctx context.Context, req *backend.RerunWorkflowFromEventRequest) (api.InstanceID, error) {
1115+
return "", status.Error(codes.Unimplemented, "not implemented")
1116+
}

client/client_grpc.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,32 @@ func (c *TaskHubGrpcClient) PurgeOrchestrationState(ctx context.Context, id api.
210210
return nil
211211
}
212212

213+
// RerunWorkflowFromEvent reruns a workflow from a specific event ID of some
214+
// source instance ID. If not given, a random new instance ID will be
215+
// generated and returned. Can optionally give a new input to the target
216+
// event ID to rerun from.
217+
func (c *TaskHubGrpcClient) RerunWorkflowFromEvent(ctx context.Context, id api.InstanceID, eventID uint32, opts ...api.RerunOptions) (api.InstanceID, error) {
218+
req := &protos.RerunWorkflowFromEventRequest{
219+
SourceInstanceID: string(id),
220+
EventID: eventID,
221+
}
222+
for _, configure := range opts {
223+
if err := configure(req); err != nil {
224+
return "", fmt.Errorf("failed to configure rerun request: %w", err)
225+
}
226+
}
227+
228+
resp, err := c.client.RerunWorkflowFromEvent(ctx, req)
229+
if err != nil {
230+
if ctx.Err() != nil {
231+
return "", ctx.Err()
232+
}
233+
return "", err
234+
}
235+
236+
return api.InstanceID(resp.GetNewInstanceID()), nil
237+
}
238+
213239
func makeGetInstanceRequest(id api.InstanceID, opts []api.FetchOrchestrationMetadataOptions) *protos.GetInstanceRequest {
214240
req := &protos.GetInstanceRequest{
215241
InstanceId: string(id),

0 commit comments

Comments
 (0)