Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit cbc72c3

Browse files
authored
Use INSERT ON CONFLICT for retention task upsert (#60)
* Use INSERT ON CONFLICT for retention task upsert **What** - When deploying some of the apps with the retention policy, we saw the startup get stuck during the retention task assertion. Moving a an INSERT ON CONFLICT upsert method avoids the table lock and has the same effect. - Add a partial unique index to the schedules table that is required for the upsert flow - Any project using the retention policies will need to add this migration to their startup ```sql CREATE UNIQUE INDEX IF NOT EXISTS unique_retention_idx ON schedules (task_queue, task_type, (task_spec->>'queueName'), (task_spec->>'taskType'), (task_spec->>'status')) WHERE task_type = 'retention'; ``` Signed-off-by: Lucas Roesler <[email protected]>
1 parent ede17e2 commit cbc72c3

File tree

4 files changed

+89
-70
lines changed

4 files changed

+89
-70
lines changed

pkg/queue/postgres/retention.go

Lines changed: 33 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
cdb "github.com/contiamo/go-base/v2/pkg/db"
1313
"github.com/contiamo/go-base/v2/pkg/queue"
1414
"github.com/contiamo/go-base/v2/pkg/queue/handlers"
15+
cvalidation "github.com/contiamo/go-base/v2/pkg/validation"
1516
"github.com/opentracing/opentracing-go"
17+
uuid "github.com/satori/go.uuid"
1618
)
1719

1820
const (
@@ -59,67 +61,43 @@ func AssertRetentionSchedule(ctx context.Context, db *sql.DB, queueName string,
5961
},
6062
CronSchedule: fmt.Sprintf("%d * * * *", when), // every hour at minute "when"
6163
}
62-
tx, err := db.BeginTx(ctx, nil)
63-
if err != nil {
64-
return fmt.Errorf("can not start transaction for scheduling: %w", err)
65-
}
66-
defer func() {
67-
if err != nil {
68-
_ = tx.Rollback()
69-
return
70-
}
71-
err = tx.Commit()
72-
}()
73-
74-
_, err = tx.ExecContext(ctx, `LOCK TABLE schedules IN ACCESS EXCLUSIVE MODE;`)
64+
65+
err = cvalidation.CronTab(retentionSchedule.CronSchedule)
7566
if err != nil {
76-
return fmt.Errorf("failed to lock `schedules`: %w", err)
67+
return err
7768
}
7869

7970
builder := squirrel.StatementBuilder.
8071
PlaceholderFormat(squirrel.Dollar).
81-
RunWith(cdb.WrapWithTracing(tx))
82-
83-
var exists int
84-
// use a unique error name here otherwise the sql.ErrNoRows might shadow
85-
// us and things will break. This is also handled by the named error return
86-
// variable, but this makes the code easier to copy and paste
87-
existsErr := builder.Select("1").
88-
From("schedules").
89-
Where(squirrel.Eq{
90-
"task_queue": MaintenanceTaskQueue,
91-
"task_type": RetentionTask,
92-
"task_spec->>'queueName'": queueName,
93-
"task_spec->>'taskType'": taskType,
94-
"task_spec->>'status'": status,
95-
}).ScanContext(ctx, &exists)
96-
if existsErr != nil && existsErr != sql.ErrNoRows {
97-
return fmt.Errorf("can not verify existing schedule: %w", existsErr)
98-
}
99-
100-
// will only non-zero if err is nil and task is not found
101-
if exists == 0 {
102-
span.SetTag("created", true)
103-
// pass nil db because it doesn't need the raw db
104-
return NewScheduler(nil).Schedule(ctx, builder, retentionSchedule)
105-
}
106-
107-
span.SetTag("updated", true)
108-
res, err := builder.Update("schedules").
109-
Where(squirrel.Eq{
110-
"task_queue": MaintenanceTaskQueue,
111-
"task_type": RetentionTask,
112-
"task_spec->>'queueName'": queueName,
113-
"task_spec->>'taskType'": taskType,
114-
"task_spec->>'status'": status,
115-
}).
116-
Set("updated_at", time.Now()).
117-
Set("task_spec", retentionSchedule.Spec).
118-
Set("cron_schedule", retentionSchedule.CronSchedule).
119-
Set("next_execution_time", time.Now()).
120-
ExecContext(ctx)
72+
RunWith(cdb.WrapWithTracing(db))
73+
74+
scheduleID := uuid.NewV4().String()
75+
res, err := builder.Insert("schedules").
76+
Columns(
77+
"schedule_id",
78+
"task_queue",
79+
"task_type",
80+
"task_spec",
81+
"cron_schedule",
82+
"next_execution_time",
83+
).
84+
Values(
85+
scheduleID,
86+
retentionSchedule.Queue,
87+
retentionSchedule.Type,
88+
retentionSchedule.Spec,
89+
retentionSchedule.CronSchedule,
90+
time.Now(), // the schedule will enqueue the task immediately
91+
).Suffix(`
92+
ON CONFLICT (task_queue,task_type,(task_spec->>'queueName'),(task_spec->>'taskType'),(task_spec->>'status')) WHERE task_type='retention'
93+
DO UPDATE SET
94+
updated_at=now(),
95+
next_execution_time=now(),
96+
task_spec=EXCLUDED.task_spec,
97+
cron_schedule=EXCLUDED.cron_schedule
98+
`).ExecContext(ctx)
12199
if err != nil {
122-
return fmt.Errorf("can not update existing schdule: %w", err)
100+
return fmt.Errorf("can not upsert retention schedule: %w", err)
123101
}
124102

125103
updated, err := res.RowsAffected()

pkg/queue/postgres/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (q *scheduler) AssertSchedule(ctx context.Context, schedule queue.TaskSched
175175
PlaceholderFormat(squirrel.Dollar).
176176
RunWith(cdb.WrapWithTracing(tx))
177177

178-
_, err = tx.ExecContext(ctx, `LOCK TABLE schedules IN ACCESS EXCLUSIVE MODE;`)
178+
_, err = tx.ExecContext(ctx, `LOCK TABLE schedules IN SHARE MODE;`)
179179
if err != nil {
180180
return fmt.Errorf("failed to lock `schedules`: %w", err)
181181
}

pkg/queue/postgres/setup.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ var (
102102
Table: SchedulesTable,
103103
Columns: []string{"created_at DESC", "updated_at DESC"},
104104
},
105+
{
106+
Table: SchedulesTable,
107+
Name: "unique_retention_idx",
108+
Columns: []string{"task_queue", "task_type", "(task_spec->>'queueName')", "(task_spec->>'taskType')", "(task_spec->>'status')"},
109+
Unique: true,
110+
Condition: fmt.Sprintf("task_type='%s'", RetentionTask),
111+
},
105112

106113
// tasks
107114
{
@@ -310,9 +317,10 @@ func syncTable(ctx context.Context, db db.SQLDB, tableName string, initColumns t
310317
}
311318

312319
func listColumns(ctx context.Context, db db.SQLDB, tableName string) (columns map[string]nothing, err error) {
313-
rows, err := db.QueryContext(ctx, `
314-
SELECT column_name FROM information_schema.columns WHERE table_name = $1;
315-
`, tableName)
320+
rows, err := db.QueryContext(ctx,
321+
`SELECT column_name FROM information_schema.columns WHERE table_name = $1;`,
322+
tableName,
323+
)
316324
if err != nil {
317325
return nil, err
318326
}
@@ -349,9 +357,12 @@ func (s tableColumnSet) generateStatements() []string {
349357

350358
// index describes all important properties of an index
351359
type index struct {
352-
Table string
353-
Columns []string
354-
Type string
360+
Name string
361+
Table string
362+
Columns []string
363+
Type string
364+
Unique bool
365+
Condition string
355366
}
356367
type indexList []index
357368

@@ -361,22 +372,37 @@ func (l indexList) generateStatements() []string {
361372
indexDefinitions := make([]string, 0, len(l))
362373
for _, index := range l {
363374
stmt := strings.Builder{}
364-
stmt.WriteString("CREATE INDEX IF NOT EXISTS ")
375+
stmt.WriteString("CREATE ")
376+
if index.Unique {
377+
stmt.WriteString("UNIQUE ")
378+
}
379+
stmt.WriteString("INDEX IF NOT EXISTS ")
365380

366381
columnList := strings.Join(index.Columns, ",")
367382

368-
stmt.WriteString(fmt.Sprintf(
369-
"%s_%s_idx ON %s",
370-
index.Table,
371-
cstrings.ToUnderscoreCase(columnList),
372-
index.Table,
373-
))
383+
// is a noop when string is empty
384+
stmt.WriteString(index.Name)
385+
if index.Name == "" {
386+
stmt.WriteString(fmt.Sprintf(
387+
"%s_%s_idx",
388+
index.Table,
389+
cstrings.ToUnderscoreCase(columnList),
390+
))
391+
}
392+
393+
stmt.WriteString(" ON ")
394+
stmt.WriteString(index.Table)
374395

375396
if index.Type != "" {
376397
stmt.WriteString(fmt.Sprintf(" USING %s ", index.Type))
377398
}
378399

379-
stmt.WriteString("(" + columnList + ");")
400+
stmt.WriteString("(" + columnList + ")")
401+
if index.Condition != "" {
402+
stmt.WriteString(" WHERE ")
403+
stmt.WriteString(index.Condition)
404+
}
405+
stmt.WriteString(";")
380406
indexDefinitions = append(indexDefinitions, stmt.String())
381407
}
382408

pkg/queue/postgres/setup_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/Masterminds/squirrel"
1011
dbtest "github.com/contiamo/go-base/v2/pkg/db/test"
1112
"github.com/contiamo/go-base/v2/pkg/queue"
1213
"github.com/sirupsen/logrus"
@@ -27,6 +28,20 @@ func TestSetupTables(t *testing.T) {
2728
_, db := dbtest.GetDatabase(t)
2829
defer db.Close()
2930
require.NoError(t, SetupTables(ctx, db, nil))
31+
32+
dbtest.EqualCount(t, db, 1, "pg_indexes", squirrel.And{
33+
squirrel.Eq{
34+
"indexname": "unique_retention_idx",
35+
"tablename": "schedules",
36+
},
37+
squirrel.Like{"indexdef": "CREATE UNIQUE INDEX%"},
38+
squirrel.Like{"indexdef": "%task_queue%"},
39+
squirrel.Like{"indexdef": "%task_spec%"},
40+
squirrel.Like{"indexdef": "%queueName%"},
41+
squirrel.Like{"indexdef": "%taskType%"},
42+
squirrel.Like{"indexdef": "%status%"},
43+
squirrel.Like{"indexdef": "%WHERE%"},
44+
})
3045
})
3146

3247
t.Run("bootstraps with references", func(t *testing.T) {

0 commit comments

Comments
 (0)