Skip to content

Commit d00c697

Browse files
authored
feat: support background runner prunning (#50)
Enables automatic pruning of Docker images.
1 parent a296da2 commit d00c697

File tree

4 files changed

+108
-38
lines changed

4 files changed

+108
-38
lines changed

cmd/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func initRunner() runner.Runner {
155155
runner.WithHost(config.RunnerHost()),
156156
runner.WithRegistryUser(config.RunnerRegistryUser()),
157157
runner.WithRegistryPass(config.RunnerRegistryPass()),
158+
runner.WithPruneInterval(config.RunnerPruneInterval()),
158159
)
159160
}
160161

internal/engine/clients/runner/docker/runner.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"strings"
1212
"sync"
13+
"time"
1314

1415
"github.com/docker/docker/api/types/container"
1516
"github.com/docker/docker/api/types/filters"
@@ -310,6 +311,38 @@ func (r *dockerRunner) remove(ctx context.Context, id string) error {
310311
return nil
311312
}
312313

314+
func (r *dockerRunner) pruneImages() {
315+
r.wg.Add(1)
316+
defer r.wg.Done()
317+
318+
ticker := time.NewTicker(r.options.PruneInterval)
319+
defer ticker.Stop()
320+
321+
for {
322+
select {
323+
case <-ticker.C:
324+
ctx := context.Background()
325+
r.prune(ctx)
326+
case <-r.exit:
327+
return
328+
}
329+
}
330+
}
331+
332+
func (r *dockerRunner) prune(ctx context.Context) {
333+
filters := filters.NewArgs()
334+
335+
filters.Add("until", "24h")
336+
337+
pruneReport, err := r.client.ImagesPrune(ctx, filters)
338+
if err != nil {
339+
slog.ErrorContext(ctx, "failed to prune images", "error", err)
340+
return
341+
}
342+
343+
slog.InfoContext(ctx, "pruning complete", "images-deleted", len(pruneReport.ImagesDeleted), "space-reclaimed", pruneReport.SpaceReclaimed)
344+
}
345+
313346
func NewRunner(opts ...runner.Option) runner.Runner {
314347
options := runner.NewOptions(opts...)
315348

@@ -333,5 +366,7 @@ func NewRunner(opts ...runner.Option) runner.Runner {
333366
wg: sync.WaitGroup{},
334367
}
335368

369+
go dr.pruneImages()
370+
336371
return dr
337372
}

internal/engine/clients/runner/options.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package runner
22

3-
import "context"
3+
import (
4+
"context"
5+
"time"
6+
)
47

58
type Option func(o *Options)
69

710
type Options struct {
8-
Host string
9-
RegistryUser string
10-
RegistryPass string
11-
Context context.Context
11+
Host string
12+
RegistryUser string
13+
RegistryPass string
14+
PruneInterval time.Duration
15+
Context context.Context
1216
}
1317

1418
func WithHost(host string) Option {
@@ -29,9 +33,16 @@ func WithRegistryPass(pass string) Option {
2933
}
3034
}
3135

36+
func WithPruneInterval(interval time.Duration) Option {
37+
return func(o *Options) {
38+
o.PruneInterval = interval
39+
}
40+
}
41+
3242
func NewOptions(opts ...Option) Options {
3343
options := Options{
34-
Context: context.Background(),
44+
Context: context.Background(),
45+
PruneInterval: 24 * time.Hour,
3546
}
3647

3748
for _, fn := range opts {

internal/engine/config/config.go

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strconv"
77
"strings"
88
"sync"
9+
"time"
910

1011
"github.com/w-h-a/workflow/internal/engine/clients/broker"
1112
"github.com/w-h-a/workflow/internal/engine/clients/readwriter"
@@ -19,43 +20,45 @@ var (
1920
)
2021

2122
type config struct {
22-
env string
23-
name string
24-
version string
25-
mode string
26-
coordinatorHttp string
27-
workerHttp string
28-
workerQueues map[string]int
29-
broker string
30-
brokerLocation string
31-
brokerDurable bool
32-
runner string
33-
runnerHost string
34-
runnerRegistryUser string
35-
runnerRegistryPass string
36-
readwriter string
37-
readwriterLocation string
23+
env string
24+
name string
25+
version string
26+
mode string
27+
coordinatorHttp string
28+
workerHttp string
29+
workerQueues map[string]int
30+
broker string
31+
brokerLocation string
32+
brokerDurable bool
33+
runner string
34+
runnerHost string
35+
runnerRegistryUser string
36+
runnerRegistryPass string
37+
runnerPruneInterval time.Duration
38+
readwriter string
39+
readwriterLocation string
3840
}
3941

4042
func New() {
4143
once.Do(func() {
4244
instance = &config{
43-
env: "dev",
44-
name: "workflow",
45-
version: "0.1.0-alpha.0",
46-
mode: "standalone",
47-
coordinatorHttp: ":4000",
48-
workerHttp: ":4001",
49-
workerQueues: map[string]int{string(task.Scheduled): 1, string(task.Cancelled): 1},
50-
broker: "memory",
51-
brokerLocation: "",
52-
brokerDurable: false,
53-
runner: "docker",
54-
runnerHost: "unix:///var/run/docker.sock",
55-
runnerRegistryUser: "",
56-
runnerRegistryPass: "",
57-
readwriter: "memory",
58-
readwriterLocation: "",
45+
env: "dev",
46+
name: "workflow",
47+
version: "0.1.0-alpha.0",
48+
mode: "standalone",
49+
coordinatorHttp: ":4000",
50+
workerHttp: ":4001",
51+
workerQueues: map[string]int{string(task.Scheduled): 1, string(task.Cancelled): 1},
52+
broker: "memory",
53+
brokerLocation: "",
54+
brokerDurable: false,
55+
runner: "docker",
56+
runnerHost: "unix:///var/run/docker.sock",
57+
runnerRegistryUser: "",
58+
runnerRegistryPass: "",
59+
runnerPruneInterval: 24 * time.Hour,
60+
readwriter: "memory",
61+
readwriterLocation: "",
5962
}
6063

6164
env := os.Getenv("ENV")
@@ -154,6 +157,18 @@ func New() {
154157
instance.runnerRegistryPass = runnerRegistryPass
155158
}
156159

160+
runnerPruneInterval := os.Getenv("RUNNER_PRUNE_INTERVAL")
161+
if len(runnerPruneInterval) > 0 {
162+
dur, err := time.ParseDuration(runnerPruneInterval)
163+
if err != nil {
164+
panic("invalid runner prune interval")
165+
}
166+
if dur <= 0 {
167+
panic("runner prune interval must be a positive duration")
168+
}
169+
instance.runnerPruneInterval = dur
170+
}
171+
157172
rw := os.Getenv("READ_WRITER")
158173
if len(rw) > 0 {
159174
if _, ok := readwriter.ReadWriterTypes[rw]; ok {
@@ -286,6 +301,14 @@ func RunnerRegistryPass() string {
286301
return instance.runnerRegistryPass
287302
}
288303

304+
func RunnerPruneInterval() time.Duration {
305+
if instance == nil {
306+
panic("cfg is nil")
307+
}
308+
309+
return instance.runnerPruneInterval
310+
}
311+
289312
func ReadWriter() string {
290313
if instance == nil {
291314
panic("cfg is nil")

0 commit comments

Comments
 (0)