Skip to content

Commit ac06b4b

Browse files
authored
feat: serialize docker image pulls (#39)
1 parent 615b52d commit ac06b4b

File tree

4 files changed

+69
-21
lines changed

4 files changed

+69
-21
lines changed

internal/engine/clients/runner/docker/runner.go

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@ import (
2121
type dockerRunner struct {
2222
options runner.Options
2323
client *client.Client
24+
sem chan struct{}
2425
images map[string]bool
2526
tasks map[string]string
2627
mtx sync.RWMutex
28+
ctx context.Context
29+
cancel context.CancelFunc
30+
wg sync.WaitGroup
2731
}
2832

2933
func (r *dockerRunner) Run(ctx context.Context, opts ...runner.RunOption) (string, error) {
@@ -158,40 +162,74 @@ func (r *dockerRunner) DeleteVolume(ctx context.Context, opts ...runner.DeleteVo
158162
return nil
159163
}
160164

165+
func (r *dockerRunner) Close() {
166+
r.cancel()
167+
r.wg.Wait()
168+
close(r.sem)
169+
}
170+
161171
func (r *dockerRunner) pullImage(ctx context.Context, tag string) error {
162-
r.mtx.Lock()
163-
defer r.mtx.Unlock()
172+
if r.ctx.Err() != nil {
173+
return r.ctx.Err()
174+
}
164175

176+
r.mtx.RLock()
165177
if _, ok := r.images[tag]; ok {
178+
r.mtx.RUnlock()
166179
return nil
167180
}
181+
r.mtx.RUnlock()
168182

169-
images, err := r.client.ImageList(ctx, image.ListOptions{All: true})
170-
if err != nil {
171-
return err
172-
}
183+
select {
184+
case <-ctx.Done():
185+
return ctx.Err()
186+
case <-r.ctx.Done():
187+
return r.ctx.Err()
188+
case r.sem <- struct{}{}:
189+
r.wg.Add(1)
190+
defer func() {
191+
r.wg.Done()
192+
<-r.sem
193+
}()
194+
195+
r.mtx.RLock()
196+
if _, ok := r.images[tag]; ok {
197+
r.mtx.RUnlock()
198+
return nil
199+
}
200+
r.mtx.RUnlock()
201+
202+
images, err := r.client.ImageList(ctx, image.ListOptions{All: true})
203+
if err != nil {
204+
return err
205+
}
173206

174-
for _, img := range images {
175-
for _, t := range img.RepoTags {
176-
if t == tag {
177-
r.images[t] = true
178-
return nil
207+
for _, img := range images {
208+
for _, t := range img.RepoTags {
209+
if t == tag {
210+
r.mtx.Lock()
211+
r.images[t] = true
212+
r.mtx.Unlock()
213+
return nil
214+
}
179215
}
180216
}
181-
}
182217

183-
reader, err := r.client.ImagePull(ctx, tag, image.PullOptions{})
184-
if err != nil {
185-
return err
186-
}
218+
reader, err := r.client.ImagePull(ctx, tag, image.PullOptions{})
219+
if err != nil {
220+
return err
221+
}
187222

188-
defer reader.Close()
223+
defer reader.Close()
189224

190-
if _, err := io.Copy(os.Stdout, reader); err != nil {
191-
return err
192-
}
225+
if _, err := io.Copy(os.Stdout, reader); err != nil {
226+
return err
227+
}
193228

194-
r.images[tag] = true
229+
r.mtx.Lock()
230+
r.images[tag] = true
231+
r.mtx.Unlock()
232+
}
195233

196234
return nil
197235
}
@@ -228,12 +266,18 @@ func NewRunner(opts ...runner.Option) runner.Runner {
228266
panic(err)
229267
}
230268

269+
ctx, cancel := context.WithCancel(context.Background())
270+
231271
dr := &dockerRunner{
232272
options: options,
233273
client: c,
274+
sem: make(chan struct{}, 1),
234275
images: map[string]bool{},
235276
tasks: map[string]string{},
236277
mtx: sync.RWMutex{},
278+
ctx: ctx,
279+
cancel: cancel,
280+
wg: sync.WaitGroup{},
237281
}
238282

239283
return dr
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package mock

internal/engine/clients/runner/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ type Runner interface {
1818
Run(ctx context.Context, opts ...RunOption) (string, error)
1919
CreateVolume(ctx context.Context, opts ...CreateVolumeOption) error
2020
DeleteVolume(ctx context.Context, opts ...DeleteVolumeOption) error
21+
Close()
2122
}

internal/engine/services/worker/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func (s *Service) Start(ch chan struct{}) error {
5050
close(exit)
5151
}
5252

53+
s.runner.Close()
54+
5355
return nil
5456
}
5557

0 commit comments

Comments
 (0)