Skip to content

Commit 6380a63

Browse files
committed
Implements durabletask ListInstanceIDs & GetInstanceHistory
Today, there is no ways of discovering the list of workflow instances that are currently running or have completed in the past without using external storage queries. The Dapr CLI [introduced list and workflow history commands](dapr/cli#1560) to get information about running and completed workflows, however these commands rely on direct queries to the underlying storage provider. By introducing this functionality into the durabletask framework itself, these commands need only talk to Daprd, removing the requirement for direct access to the storage provider as well as authentication. Daprd can make these queries itself, and use the Actor State Store component to access the underlying storage. Implements the new durabletask APIs according to dapr/proposals#93 Signed-off-by: joshvanl <[email protected]>
1 parent 8e1a73b commit 6380a63

File tree

32 files changed

+1086
-89
lines changed

32 files changed

+1086
-89
lines changed

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,3 +525,7 @@ replace (
525525
//
526526
// Then, run `make modtidy-all` in this repository.
527527
// This ensures that go.mod and go.sum are up-to-date for each go.mod file.
528+
529+
replace github.com/dapr/durabletask-go => github.com/joshvanl/durabletask-go v0.0.0-20251105162850-0096c0be3cae
530+
531+
replace github.com/dapr/components-contrib => github.com/joshvanl/components-contrib v0.0.0-20251105114938-38d2fb353cc5

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,10 +515,6 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
515515
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
516516
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
517517
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
518-
github.com/dapr/components-contrib v1.16.1 h1:EpntHk5qUXTbB55kec97cMQbC2QXBwbBU6QMI3ljV8g=
519-
github.com/dapr/components-contrib v1.16.1/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4=
520-
github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547 h1:bD4JBlXDHURsgvhIB1HQ1q0k8kYfVo/iNSBi0guSoe0=
521-
github.com/dapr/durabletask-go v0.10.2-0.20251113171253-87ecdf8f0547/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
522518
github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524 h1:SQ7VeWGnypENpGjsL94wN2IgH+oYHx9ULpYrpFoRExQ=
523519
github.com/dapr/kit v0.16.2-0.20251117143824-2fd5d0c93524/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM=
524520
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
@@ -1132,6 +1128,10 @@ github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbd
11321128
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
11331129
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
11341130
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
1131+
github.com/joshvanl/components-contrib v0.0.0-20251105114938-38d2fb353cc5 h1:IgQW5uO5mvT2jTfpAeVecJ9aTSuRPS/y9g1SU8XmPn0=
1132+
github.com/joshvanl/components-contrib v0.0.0-20251105114938-38d2fb353cc5/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4=
1133+
github.com/joshvanl/durabletask-go v0.0.0-20251105162850-0096c0be3cae h1:cWA+L6X+Kq3CzzwB3U4kQ8nmPs7e6UwhAEnTIh8hSOo=
1134+
github.com/joshvanl/durabletask-go v0.0.0-20251105162850-0096c0be3cae/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
11351135
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
11361136
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
11371137
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=

pkg/runtime/runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func newDaprRuntime(ctx context.Context,
307307
Resiliency: resiliencyProvider,
308308
EventSink: runtimeConfig.workflowEventSink,
309309
EnableClusteredDeployment: globalConfig.IsFeatureEnabled(config.WorkflowsClusteredDeployment),
310+
ComponentStore: compStore,
310311
})
311312

312313
jobsManager, err := scheduler.New(scheduler.Options{

pkg/runtime/wfengine/backends/actors/actors.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ import (
4343
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
4444
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
4545
"github.com/dapr/dapr/pkg/resiliency"
46+
"github.com/dapr/dapr/pkg/runtime/compstore"
4647
"github.com/dapr/dapr/pkg/runtime/wfengine/state"
48+
"github.com/dapr/dapr/pkg/runtime/wfengine/state/list"
4749
"github.com/dapr/dapr/pkg/runtime/wfengine/todo"
4850
"github.com/dapr/dapr/utils"
4951
"github.com/dapr/durabletask-go/api"
@@ -66,11 +68,12 @@ const (
6668
)
6769

6870
type Options struct {
69-
AppID string
70-
Namespace string
71-
Actors actors.Interface
72-
Resiliency resiliency.Provider
73-
EventSink orchestrator.EventSink
71+
AppID string
72+
Namespace string
73+
Actors actors.Interface
74+
Resiliency resiliency.Provider
75+
EventSink orchestrator.EventSink
76+
ComponentStore *compstore.ComponentStore
7477
// experimental feature
7578
// enabling this will use the cluster tasks backend for pending tasks, instead of the default local implementation
7679
// the cluster tasks backend uses actors to share the state of pending tasks
@@ -90,6 +93,7 @@ type Actors struct {
9093
resiliency resiliency.Provider
9194
actors actors.Interface
9295
eventSink orchestrator.EventSink
96+
compStore *compstore.ComponentStore
9397

9498
orchestrationWorkItemChan chan *backend.OrchestrationWorkItem
9599
activityWorkItemChan chan *backend.ActivityWorkItem
@@ -115,6 +119,7 @@ func New(opts Options) *Actors {
115119
resiliency: opts.Resiliency,
116120
pendingTasksBackend: pendingTasksBackend,
117121
enableClusteredDeployment: opts.EnableClusteredDeployment,
122+
compStore: opts.ComponentStore,
118123
orchestrationWorkItemChan: make(chan *backend.OrchestrationWorkItem, 1),
119124
activityWorkItemChan: make(chan *backend.ActivityWorkItem, 1),
120125
eventSink: opts.EventSink,
@@ -630,6 +635,42 @@ func (abe *Actors) WaitForOrchestratorCompletion(ctx context.Context, request *p
630635
return abe.pendingTasksBackend.WaitForOrchestratorCompletion(ctx, request)
631636
}
632637

638+
func (abe *Actors) ListInstanceIDs(ctx context.Context, req *protos.ListInstanceIDsRequest) (*protos.ListInstanceIDsResponse, error) {
639+
resp, err := list.ListInstanceIDs(ctx, list.ListOptions{
640+
ComponentStore: abe.compStore,
641+
Namespace: abe.namespace,
642+
AppID: abe.appID,
643+
PageSize: req.PageSize,
644+
ContinuationToken: req.ContinuationToken,
645+
})
646+
if err != nil {
647+
return nil, err
648+
}
649+
650+
return &protos.ListInstanceIDsResponse{
651+
InstanceIds: resp.Keys,
652+
ContinuationToken: resp.ContinuationToken,
653+
}, nil
654+
}
655+
656+
func (abe *Actors) GetInstanceHistory(ctx context.Context, req *protos.GetInstanceHistoryRequest) (*protos.GetInstanceHistoryResponse, error) {
657+
ss, err := abe.actors.State(ctx)
658+
if err != nil {
659+
return nil, err
660+
}
661+
662+
resp, err := state.LoadWorkflowState(ctx, ss, req.GetInstanceId(), state.Options{
663+
AppID: abe.appID,
664+
WorkflowActorType: abe.workflowActorType,
665+
ActivityActorType: abe.activityActorType,
666+
})
667+
if err != nil {
668+
return nil, err
669+
}
670+
671+
return &protos.GetInstanceHistoryResponse{Events: resp.History}, nil
672+
}
673+
633674
func (abe *Actors) purgeWorkflow(ctx context.Context, id api.InstanceID) error {
634675
req := internalsv1pb.
635676
NewInternalInvokeRequest(todo.PurgeWorkflowStateMethod).
@@ -702,11 +743,3 @@ func (abe *Actors) purgeWorkflowForce(ctx context.Context, id api.InstanceID) er
702743
},
703744
)
704745
}
705-
706-
func (abe *Actors) GetInstanceHistory(ctx context.Context, req *protos.GetInstanceHistoryRequest) (*protos.GetInstanceHistoryResponse, error) {
707-
return nil, status.Error(codes.Unimplemented, "GetInstanceHistory is not implemented in the Actors backend")
708-
}
709-
710-
func (abe *Actors) ListInstanceIDs(ctx context.Context, req *protos.ListInstanceIDsRequest) (*protos.ListInstanceIDsResponse, error) {
711-
return nil, status.Error(codes.Unimplemented, "ListInstanceIDs is not implemented in the Actors backend")
712-
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package list
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"strings"
21+
22+
"github.com/dapr/components-contrib/state"
23+
"github.com/dapr/dapr/pkg/runtime/compstore"
24+
)
25+
26+
type ListOptions struct {
27+
ComponentStore *compstore.ComponentStore
28+
Namespace string
29+
AppID string
30+
31+
PageSize *uint32
32+
ContinuationToken *string
33+
}
34+
35+
type ListInstanceIDsResult struct {
36+
Keys []string
37+
ContinuationToken *string
38+
}
39+
40+
func ListInstanceIDs(ctx context.Context, opts ListOptions) (*ListInstanceIDsResult, error) {
41+
store, _, ok := opts.ComponentStore.GetStateStoreActor()
42+
if !ok {
43+
return nil, errors.New("no state store with actor support found")
44+
}
45+
46+
ks, ok := store.(state.KeysLiker)
47+
if !ok {
48+
return nil, fmt.Errorf("state store %T does not support listing keys", store)
49+
}
50+
51+
like := opts.AppID + "||dapr.internal." + opts.Namespace + "." + opts.AppID + ".workflow||%||metadata"
52+
53+
resp, err := ks.KeysLike(ctx, &state.KeysLikeRequest{
54+
Pattern: like,
55+
ContinueToken: opts.ContinuationToken,
56+
PageSize: opts.PageSize,
57+
})
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
keys := make([]string, 0, len(resp.Keys))
63+
for _, key := range resp.Keys {
64+
split := strings.Split(key, "||")
65+
if len(split) != 4 {
66+
return nil, fmt.Errorf("invalid key format: %s", key)
67+
}
68+
69+
keys = append(keys, split[2])
70+
}
71+
72+
return &ListInstanceIDsResult{
73+
Keys: keys,
74+
ContinuationToken: resp.ContinueToken,
75+
}, nil
76+
}

pkg/runtime/wfengine/wfengine.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/dapr/dapr/pkg/config"
3131
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
3232
"github.com/dapr/dapr/pkg/resiliency"
33+
"github.com/dapr/dapr/pkg/runtime/compstore"
3334
"github.com/dapr/dapr/pkg/runtime/processor"
3435
backendactors "github.com/dapr/dapr/pkg/runtime/wfengine/backends/actors"
3536
"github.com/dapr/durabletask-go/backend"
@@ -59,6 +60,7 @@ type Options struct {
5960
Resiliency resiliency.Provider
6061
EventSink orchestrator.EventSink
6162
EnableClusteredDeployment bool
63+
ComponentStore *compstore.ComponentStore
6264
}
6365

6466
type engine struct {
@@ -83,6 +85,7 @@ func New(opts Options) Interface {
8385
Resiliency: opts.Resiliency,
8486
EventSink: opts.EventSink,
8587
EnableClusteredDeployment: opts.EnableClusteredDeployment,
88+
ComponentStore: opts.ComponentStore,
8689
})
8790

8891
var getWorkItemsCount atomic.Int32

tests/integration/framework/process/workflow/workflow.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ func New(t *testing.T, fopts ...Option) *Workflow {
8484
}
8585
}
8686

87+
// TODO: @joshvanl
88+
//if i > 0 {
89+
// dopts = append(dopts, daprd.WithAppID(daprds[0].AppID()))
90+
//}
91+
8792
daprds[i] = daprd.New(t, dopts...)
8893
}
8994

@@ -177,6 +182,17 @@ func (w *Workflow) BackendClient(t *testing.T, ctx context.Context) *client.Task
177182
return w.BackendClientN(t, ctx, 0)
178183
}
179184

185+
func (w *Workflow) WorkflowClient(t *testing.T, ctx context.Context) *workflow.Client {
186+
t.Helper()
187+
return workflow.NewClient(w.Dapr().GRPCConn(t, ctx))
188+
}
189+
190+
func (w *Workflow) WorkflowClientN(t *testing.T, ctx context.Context, index int) *workflow.Client {
191+
t.Helper()
192+
require.Less(t, index, len(w.daprds), "index out of range")
193+
return workflow.NewClient(w.DaprN(index).GRPCConn(t, ctx))
194+
}
195+
180196
// BackendClient returns a backend client for the specified index
181197
func (w *Workflow) BackendClientN(t *testing.T, ctx context.Context, index int) *client.TaskHubGrpcClient {
182198
t.Helper()

tests/integration/suite/daprd/hotreload/operator/actorstate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (a *actorstate) Run(t *testing.T, ctx context.Context) {
176176
require.ElementsMatch(t, []*rtv1.RegisteredComponents{
177177
{
178178
Name: "mystore", Type: "state.in-memory", Version: "v1",
179-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
179+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
180180
},
181181
}, comps)
182182
inmemStore.Spec.Metadata = []common.NameValuePair{}
@@ -188,7 +188,7 @@ func (a *actorstate) Run(t *testing.T, ctx context.Context) {
188188
require.ElementsMatch(t, []*rtv1.RegisteredComponents{
189189
{
190190
Name: "mystore", Type: "state.in-memory", Version: "v1",
191-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
191+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
192192
},
193193
}, comps)
194194
a.operatorDelete.SetComponents()

tests/integration/suite/daprd/hotreload/operator/crypto.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (c *crypto) Run(t *testing.T, ctx context.Context) {
217217
{Name: "crypto3", Type: "crypto.dapr.localstorage", Version: "v1"},
218218
{
219219
Name: "crypto2", Type: "state.in-memory", Version: "v1",
220-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
220+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
221221
},
222222
}, resp.GetRegisteredComponents())
223223
}, time.Second*10, time.Millisecond*10)
@@ -255,7 +255,7 @@ func (c *crypto) Run(t *testing.T, ctx context.Context) {
255255
assert.ElementsMatch(c, []*rtv1.RegisteredComponents{
256256
{
257257
Name: "crypto3", Type: "state.in-memory", Version: "v1",
258-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
258+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
259259
},
260260
}, resp.GetRegisteredComponents())
261261
}, time.Second*10, time.Millisecond*10)

tests/integration/suite/daprd/hotreload/operator/informer/components.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (c *components) Run(t *testing.T, ctx context.Context) {
141141
exp := []*rtv1.RegisteredComponents{
142142
{
143143
Name: "123", Type: "state.in-memory", Version: "v1",
144-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "ACTOR"},
144+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "DELETE_WITH_PREFIX", "KEYS_LIKE", "ACTOR"},
145145
},
146146
}
147147
require.EventuallyWithT(t, func(ct *assert.CollectT) {
@@ -179,7 +179,7 @@ func (c *components) Run(t *testing.T, ctx context.Context) {
179179
exp := []*rtv1.RegisteredComponents{
180180
{
181181
Name: "123", Type: "state.sqlite", Version: "v1",
182-
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "ACTOR"},
182+
Capabilities: []string{"ETAG", "TRANSACTIONAL", "TTL", "KEYS_LIKE", "ACTOR"},
183183
},
184184
}
185185
assert.ElementsMatch(ct, exp, c.daprd1.GetMetaRegisteredComponents(ct, ctx))

0 commit comments

Comments
 (0)