Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit 15cb624

Browse files
authored
Add AssertSchedule functionality (#46)
Now you can use this single function to upsert a schedule.
1 parent d49a368 commit 15cb624

File tree

4 files changed

+119
-1
lines changed

4 files changed

+119
-1
lines changed

pkg/queue/mock_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ type SchedulerMock struct {
6060

6161
EnsureError error
6262
EnsureCount int
63+
64+
AssertError error
65+
AssertCount int
6366
}
6467

6568
func (s *SchedulerMock) Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error) {
@@ -71,3 +74,8 @@ func (s *SchedulerMock) EnsureSchedule(ctx context.Context, builder cdb.SQLBuild
7174
s.EnsureCount = s.EnsureCount + 1
7275
return s.EnsureError
7376
}
77+
78+
func (s *SchedulerMock) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error {
79+
s.AssertCount = s.AssertCount + 1
80+
return s.AssertError
81+
}

pkg/queue/postgres/scheduler.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,23 @@ package postgres
33
import (
44
"context"
55
"database/sql"
6+
"fmt"
67
"time"
78

89
"github.com/Masterminds/squirrel"
10+
"github.com/contiamo/go-base/pkg/db"
911
cdb "github.com/contiamo/go-base/pkg/db"
12+
"github.com/contiamo/go-base/pkg/queue"
1013
"github.com/contiamo/go-base/pkg/tracing"
1114
cvalidation "github.com/contiamo/go-base/pkg/validation"
12-
"github.com/contiamo/go-base/pkg/queue"
1315
uuid "github.com/satori/go.uuid"
1416
)
1517

1618
// NewScheduler creates a new postgres task scheduler
1719
func NewScheduler(db *sql.DB) queue.Scheduler {
1820
return &scheduler{
1921
Tracer: tracing.NewTracer("queue", "PostgresScheduler"),
22+
db: db,
2023
}
2124
}
2225

@@ -28,6 +31,7 @@ func NewSchedulerWithMetrics(db *sql.DB) queue.Scheduler {
2831
// scheduler is a postgres backed implementation of the task scheduler
2932
type scheduler struct {
3033
tracing.Tracer
34+
db *sql.DB
3135
}
3236

3337
func (q *scheduler) Schedule(ctx context.Context, builder cdb.SQLBuilder, task queue.TaskScheduleRequest) (err error) {
@@ -142,3 +146,51 @@ func (q *scheduler) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder,
142146
// either value was 0 or err == sql.ErrorNoRows
143147
return queue.ErrNotScheduled
144148
}
149+
150+
func (q *scheduler) AssertSchedule(ctx context.Context, schedule queue.TaskScheduleRequest) (err error) {
151+
span, ctx := q.StartSpan(ctx, "AssertSchedule")
152+
defer func() {
153+
q.FinishSpan(span, err)
154+
}()
155+
156+
span.SetTag("queue", schedule.Queue)
157+
span.SetTag("type", schedule.Type)
158+
span.SetTag("cron", schedule.CronSchedule)
159+
160+
tx, err := q.db.BeginTx(ctx, nil)
161+
if err != nil {
162+
return err
163+
}
164+
defer func() {
165+
if err != nil {
166+
_ = tx.Rollback()
167+
return
168+
}
169+
err = tx.Commit()
170+
}()
171+
172+
builder := squirrel.StatementBuilder.
173+
PlaceholderFormat(squirrel.Dollar).
174+
RunWith(db.WrapWithTracing(tx))
175+
176+
_, err = tx.ExecContext(ctx, `LOCK TABLE schedules IN ACCESS EXCLUSIVE MODE;`)
177+
if err != nil {
178+
return fmt.Errorf("failed to lock `schedules`: %w", err)
179+
}
180+
181+
err = q.EnsureSchedule(ctx, builder, schedule)
182+
if err == nil {
183+
span.SetTag("existed", true)
184+
return nil
185+
}
186+
if err == queue.ErrNotScheduled {
187+
span.SetTag("existed", false)
188+
err = q.Schedule(ctx, builder, schedule)
189+
}
190+
191+
if err != nil {
192+
return err
193+
}
194+
195+
return nil
196+
}

pkg/queue/postgres/scheduler_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,55 @@ func TestEnsure(t *testing.T) {
271271
})
272272
}
273273
}
274+
275+
func TestAssertSchedule(t *testing.T) {
276+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
277+
defer cancel()
278+
279+
logrus.SetOutput(ioutil.Discard)
280+
defer logrus.SetOutput(os.Stdout)
281+
282+
schedule := queue.TaskScheduleRequest{
283+
TaskBase: queue.TaskBase{
284+
Queue: "test",
285+
Type: "type",
286+
Spec: queue.Spec(`{"some":"value"}`),
287+
},
288+
CronSchedule: "* * * * *", // every minute
289+
}
290+
291+
t.Run("asserts and schedules a non existent schedule", func(t *testing.T) {
292+
_, db := dbtest.GetDatabase(t)
293+
defer db.Close()
294+
require.NoError(t, SetupTables(ctx, db, nil))
295+
296+
q := NewScheduler(db)
297+
err := q.AssertSchedule(ctx, schedule)
298+
require.NoError(t, err)
299+
dbtest.EqualCount(t, db, 1, "schedules", squirrel.Eq{
300+
"task_queue": schedule.Queue,
301+
"task_type": schedule.Type,
302+
"cron_schedule": schedule.CronSchedule,
303+
"task_spec": []byte(schedule.Spec),
304+
})
305+
})
306+
307+
t.Run("asserts and does not create a schedule if exists", func(t *testing.T) {
308+
_, db := dbtest.GetDatabase(t)
309+
defer db.Close()
310+
require.NoError(t, SetupTables(ctx, db, nil))
311+
312+
q := NewScheduler(db)
313+
err := q.AssertSchedule(ctx, schedule)
314+
require.NoError(t, err)
315+
err = q.AssertSchedule(ctx, schedule)
316+
require.NoError(t, err)
317+
dbtest.EqualCount(t, db, 1, "schedules", squirrel.Eq{
318+
"task_queue": schedule.Queue,
319+
"task_type": schedule.Type,
320+
"cron_schedule": schedule.CronSchedule,
321+
"task_spec": []byte(schedule.Spec),
322+
})
323+
})
324+
325+
}

pkg/queue/scheduler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ type Scheduler interface {
2323
// currently exists. An error ErrNotScheduled is returned if a current task cannot be found.
2424
// implementation and validation errors may also be returned and should be checked for.
2525
EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
26+
// AssertSchedule makes sure a schedule with the given parameters exists, if it does not
27+
// this function will create one.
28+
AssertSchedule(ctx context.Context, task TaskScheduleRequest) (err error)
2629
}
2730

2831
type schedulerWithMetrics struct {
@@ -48,3 +51,6 @@ func (s *schedulerWithMetrics) Schedule(ctx context.Context, builder cdb.SQLBuil
4851
func (s *schedulerWithMetrics) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error {
4952
return s.s.EnsureSchedule(ctx, builder, task)
5053
}
54+
func (s *schedulerWithMetrics) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error {
55+
return s.s.AssertSchedule(ctx, task)
56+
}

0 commit comments

Comments
 (0)