Skip to content
This repository was archived by the owner on Jun 12, 2024. It is now read-only.

Commit 0ba02e4

Browse files
authored
Change the way queue metrics are initialized (#44)
* There are now default metrics, so the queue does not panic in runtime. * The service name label of the metrics can be easily switched by calling `SwitchMetricsServiceName`, (executed binary name by default) * `Setup` function is renamed to `SetupTables` because that's what it's supposed to do. * Use queue metrics like in HTTP middleware
1 parent da4774a commit 0ba02e4

File tree

8 files changed

+146
-93
lines changed

8 files changed

+146
-93
lines changed

pkg/queue/manager_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ func TestQueueMetrics(t *testing.T) {
1515
defer goleak.VerifyNone(t)
1616
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
1717
defer cancel()
18-
SetupTaskQueueMetrics("test")
19-
SetupSchedulerMetrics("test")
2018

2119
numOfTasks := 2
2220
qCh := make(chan *Task, numOfTasks)

pkg/queue/metrics.go

Lines changed: 130 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
package queue
22

33
import (
4-
"sync"
4+
"os"
5+
"path/filepath"
56

67
"github.com/prometheus/client_golang/prometheus"
78
"github.com/prometheus/client_golang/prometheus/promauto"
8-
)
9-
10-
// largest bucket is 5 seconds
11-
var durationMsBuckets = []float64{10, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000}
12-
13-
var (
14-
queueMetricLabels = []string{"queue"}
15-
taskMetricLabels = []string{"queue", "type"}
16-
oneTaskQueueMetricsSetup sync.Once
17-
oneSchedulerMetricsSetup sync.Once
9+
"github.com/sirupsen/logrus"
1810
)
1911

2012
// TaskQueueMetricsType provides access to the prometheus metric objects for the task queue
@@ -24,70 +16,138 @@ type TaskQueueMetricsType struct {
2416
EnqueueDuration *prometheus.HistogramVec
2517
}
2618

27-
// TaskQueueMetrics is the global metrics instance for the task queue of this instance
28-
var TaskQueueMetrics TaskQueueMetricsType
29-
30-
// SetupTaskQueueMetrics must be called before any other call to the metric subsystem happens
31-
func SetupTaskQueueMetrics(namespace string) {
32-
oneTaskQueueMetricsSetup.Do(func() {
33-
TaskQueueMetrics = TaskQueueMetricsType{
34-
Labels: queueMetricLabels,
35-
TaskCounter: promauto.NewCounterVec(
36-
prometheus.CounterOpts{
37-
Namespace: namespace,
38-
Subsystem: "queue",
39-
Name: "task",
40-
Help: "count of tasks that have been enqueued",
41-
},
42-
taskMetricLabels,
43-
),
44-
EnqueueDuration: promauto.NewHistogramVec(
45-
prometheus.HistogramOpts{
46-
Namespace: namespace,
47-
Subsystem: "queue",
48-
Name: "enqueue_duration_ms",
49-
Help: "duration the enqueue action in ms",
50-
Buckets: durationMsBuckets,
51-
},
52-
queueMetricLabels,
53-
),
54-
}
55-
})
56-
}
57-
5819
// TaskQueueMetricsType provides access to the prometheus metric objects for the scheduler
5920
type SchedulerMetricsType struct {
6021
Labels []string
6122
ScheduleCounter *prometheus.CounterVec
6223
ErrorCounter *prometheus.CounterVec
6324
}
6425

65-
// SchedulerMetrics is the global metrics instance for the scheduler of this instance
66-
var SchedulerMetrics SchedulerMetricsType
67-
68-
// SetupSchedulerMetrics must be called before any other call to the metric subsystem happens
69-
func SetupSchedulerMetrics(namespace string) {
70-
oneSchedulerMetricsSetup.Do(func() {
71-
SchedulerMetrics = SchedulerMetricsType{
72-
Labels: queueMetricLabels,
73-
ScheduleCounter: promauto.NewCounterVec(
74-
prometheus.CounterOpts{
75-
Namespace: namespace,
76-
Subsystem: "scheduler",
77-
Name: "task",
78-
Help: "count of tasks that have been scheduled",
79-
},
80-
taskMetricLabels,
81-
),
82-
ErrorCounter: promauto.NewCounterVec(
83-
prometheus.CounterOpts{
84-
Namespace: namespace,
85-
Subsystem: "scheduler",
86-
Name: "error",
87-
Help: "count of errors while scheduling",
88-
},
89-
taskMetricLabels,
90-
),
91-
}
92-
})
26+
const (
27+
instanceKey = "instance"
28+
serviceKey = "service"
29+
)
30+
31+
var (
32+
// largest bucket is 5 seconds
33+
durationMsBuckets = []float64{10, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000}
34+
processName = filepath.Base(os.Args[0])
35+
constLabels = prometheus.Labels{
36+
serviceKey: processName,
37+
instanceKey: getHostname(),
38+
}
39+
queueMetricLabels = []string{"queue"}
40+
taskMetricLabels = []string{"queue", "type"}
41+
42+
defTaskCounterOpts = prometheus.CounterOpts{
43+
Namespace: "queue",
44+
Subsystem: "task",
45+
Name: "total_count",
46+
Help: "count of tasks that have been enqueued",
47+
ConstLabels: constLabels,
48+
}
49+
defEnqueueDurationOpts = prometheus.HistogramOpts{
50+
Namespace: "queue",
51+
Subsystem: "task",
52+
Name: "enqueue_duration_ms",
53+
Help: "duration the enqueue action in ms",
54+
Buckets: durationMsBuckets,
55+
ConstLabels: constLabels,
56+
}
57+
defScheduleCounterOpts = prometheus.CounterOpts{
58+
Namespace: "queue",
59+
Subsystem: "scheduler",
60+
Name: "total_scheduled",
61+
Help: "count of tasks that have been scheduled",
62+
ConstLabels: constLabels,
63+
}
64+
65+
defSchedulerErrorCounterOpts = prometheus.CounterOpts{
66+
Namespace: "queue",
67+
Subsystem: "scheduler",
68+
Name: "total_errors",
69+
Help: "count of errors while scheduling",
70+
ConstLabels: constLabels,
71+
}
72+
73+
// TaskQueueMetrics is the global metrics instance for the task queue of this instance
74+
TaskQueueMetrics = TaskQueueMetricsType{
75+
Labels: queueMetricLabels,
76+
TaskCounter: promauto.NewCounterVec(
77+
defTaskCounterOpts,
78+
taskMetricLabels,
79+
),
80+
EnqueueDuration: promauto.NewHistogramVec(
81+
defEnqueueDurationOpts,
82+
queueMetricLabels,
83+
),
84+
}
85+
86+
// SchedulerMetrics is the global metrics instance for the scheduler of this instance
87+
SchedulerMetrics = SchedulerMetricsType{
88+
Labels: queueMetricLabels,
89+
ScheduleCounter: promauto.NewCounterVec(
90+
defScheduleCounterOpts,
91+
taskMetricLabels,
92+
),
93+
ErrorCounter: promauto.NewCounterVec(
94+
defSchedulerErrorCounterOpts,
95+
taskMetricLabels,
96+
),
97+
}
98+
)
99+
100+
// SwitchMetricsServiceName changes the service label used in the metrics,
101+
// so it can be customized
102+
func SwitchMetricsServiceName(serviceName string) {
103+
newConstLabels := prometheus.Labels{
104+
instanceKey: constLabels[instanceKey],
105+
serviceKey: serviceName,
106+
}
107+
newTaskCounterOpts := defTaskCounterOpts
108+
newTaskCounterOpts.ConstLabels = newConstLabels
109+
110+
newEnqueueDurationOpts := defEnqueueDurationOpts
111+
newEnqueueDurationOpts.ConstLabels = newConstLabels
112+
113+
newScheduleCounterOpts := defScheduleCounterOpts
114+
newScheduleCounterOpts.ConstLabels = newConstLabels
115+
116+
newSchedulerErrorCounterOpts := defSchedulerErrorCounterOpts
117+
newSchedulerErrorCounterOpts.ConstLabels = newConstLabels
118+
119+
TaskQueueMetrics = TaskQueueMetricsType{
120+
Labels: queueMetricLabels,
121+
TaskCounter: promauto.NewCounterVec(
122+
newTaskCounterOpts,
123+
taskMetricLabels,
124+
),
125+
EnqueueDuration: promauto.NewHistogramVec(
126+
newEnqueueDurationOpts,
127+
queueMetricLabels,
128+
),
129+
}
130+
131+
// SchedulerMetrics is the global metrics instance for the scheduler of this instance
132+
SchedulerMetrics = SchedulerMetricsType{
133+
Labels: queueMetricLabels,
134+
ScheduleCounter: promauto.NewCounterVec(
135+
newScheduleCounterOpts,
136+
taskMetricLabels,
137+
),
138+
ErrorCounter: promauto.NewCounterVec(
139+
newSchedulerErrorCounterOpts,
140+
taskMetricLabels,
141+
),
142+
}
143+
}
144+
145+
func getHostname() string {
146+
hostname, err := os.Hostname()
147+
if err != nil {
148+
logrus.Errorf("unable to retrieve hostname - setting to unknown")
149+
hostname = "unknown"
150+
}
151+
152+
return hostname
93153
}

pkg/queue/postgres/dequeuer_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestFinish(t *testing.T) {
2828

2929
name, db := dbtest.GetDatabase(t)
3030
defer db.Close()
31-
require.NoError(t, Setup(ctx, "test", db, nil))
31+
require.NoError(t, SetupTables(ctx, db, nil))
3232

3333
taskID := uuid.NewV4().String()
3434
connStr := "user=contiamo_test password=localdev sslmode=disable dbname=" + name
@@ -95,7 +95,7 @@ func TestFail(t *testing.T) {
9595

9696
name, db := dbtest.GetDatabase(t)
9797
defer db.Close()
98-
require.NoError(t, Setup(ctx, "test", db, nil))
98+
require.NoError(t, SetupTables(ctx, db, nil))
9999

100100
taskID := uuid.NewV4().String()
101101

@@ -211,7 +211,7 @@ func TestHeartbeat(t *testing.T) {
211211

212212
name, db := dbtest.GetDatabase(t)
213213
defer db.Close()
214-
require.NoError(t, Setup(ctx, "test", db, nil))
214+
require.NoError(t, SetupTables(ctx, db, nil))
215215

216216
connStr := "user=contiamo_test password=localdev sslmode=disable dbname=" + name
217217
dbListener := pq.NewListener(
@@ -352,7 +352,7 @@ func TestDequeueTicker(t *testing.T) {
352352
t.Run(tt.name, func(t *testing.T) {
353353
name, db := dbtest.GetDatabase(t)
354354
defer db.Close()
355-
require.NoError(t, Setup(ctx, "test", db, nil))
355+
require.NoError(t, SetupTables(ctx, db, nil))
356356

357357
connStr := "user=contiamo_test password=localdev sslmode=disable dbname=" + name
358358
dbListener := pq.NewListener(
@@ -477,7 +477,7 @@ func TestDequeue(t *testing.T) {
477477
t.Run(tc.name, func(t *testing.T) {
478478
name, db := dbtest.GetDatabase(t)
479479
defer db.Close()
480-
require.NoError(t, Setup(ctx, "test", db, nil))
480+
require.NoError(t, SetupTables(ctx, db, nil))
481481

482482
connStr := "user=contiamo_test password=localdev sslmode=disable dbname=" + name
483483
dbListener := pq.NewListener(
@@ -630,7 +630,7 @@ func TestQueueList(t *testing.T) {
630630

631631
name, db := dbtest.GetDatabase(t)
632632
defer db.Close()
633-
require.NoError(t, Setup(ctx, "test", db, nil))
633+
require.NoError(t, SetupTables(ctx, db, nil))
634634
connStr := "user=contiamo_test password=localdev sslmode=disable dbname=" + name
635635
dbListener := pq.NewListener(
636636
connStr,

pkg/queue/postgres/queuer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestEnqueue(t *testing.T) {
2323

2424
_, db := dbtest.GetDatabase(t)
2525
defer db.Close()
26-
require.NoError(t, Setup(ctx, "test", db, nil))
26+
require.NoError(t, SetupTables(ctx, db, nil))
2727
_, err := db.ExecContext(ctx, `ALTER TABLE tasks ADD column test_id uuid;`)
2828
require.NoError(t, err)
2929

pkg/queue/postgres/scheduler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestSchedule(t *testing.T) {
2424

2525
_, db := dbtest.GetDatabase(t)
2626
defer db.Close()
27-
require.NoError(t, Setup(ctx, "test", db, nil))
27+
require.NoError(t, SetupTables(ctx, db, nil))
2828
_, err := db.ExecContext(ctx, `ALTER TABLE schedules ADD column test_id uuid;`)
2929
require.NoError(t, err)
3030

@@ -161,7 +161,7 @@ func TestEnsure(t *testing.T) {
161161

162162
_, db := dbtest.GetDatabase(t)
163163
defer db.Close()
164-
require.NoError(t, Setup(ctx, "test", db, nil))
164+
require.NoError(t, SetupTables(ctx, db, nil))
165165
_, err := db.ExecContext(ctx, `ALTER TABLE schedules ADD column test_id uuid;`)
166166
require.NoError(t, err)
167167

pkg/queue/postgres/setup.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"text/template"
77

88
"github.com/contiamo/go-base/pkg/db"
9-
"github.com/contiamo/go-base/pkg/queue"
109
)
1110

1211
type ForeignReference struct {
@@ -16,11 +15,7 @@ type ForeignReference struct {
1615
ReferencedColumn string
1716
}
1817

19-
func Setup(ctx context.Context, metricsNamespace string, db db.SQLDB, references []ForeignReference) error {
20-
// setup metrics
21-
queue.SetupTaskQueueMetrics(metricsNamespace)
22-
queue.SetupSchedulerMetrics(metricsNamespace)
23-
18+
func SetupTables(ctx context.Context, db db.SQLDB, references []ForeignReference) error {
2419
// setup database
2520
buf := &bytes.Buffer{}
2621
err := dbSetupTemplate.Execute(buf, references)

pkg/queue/postgres/setup_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16-
func TestSetupWithoutReferences(t *testing.T) {
16+
func TestSetupTablesWithoutReferences(t *testing.T) {
1717
logrus.SetOutput(ioutil.Discard)
1818
defer logrus.SetOutput(os.Stdout)
1919

@@ -22,7 +22,7 @@ func TestSetupWithoutReferences(t *testing.T) {
2222

2323
_, db := dbtest.GetDatabase(t)
2424
defer db.Close()
25-
require.NoError(t, Setup(ctx, "test", db, nil))
25+
require.NoError(t, SetupTables(ctx, db, nil))
2626
}
2727

2828
func TestStupWithReferences(t *testing.T) {
@@ -41,7 +41,7 @@ func TestStupWithReferences(t *testing.T) {
4141
require.NoError(t, err)
4242

4343
// setup with foreign reference
44-
err = Setup(ctx, "test", db, []ForeignReference{
44+
err = SetupTables(ctx, db, []ForeignReference{
4545
{
4646
ColumnName: "entity_id",
4747
ColumnType: "UUID",

pkg/queue/workers/schedule_worker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestScheduleTask(t *testing.T) {
2424
t.Run("Takes a schedule from the queue and enqueues tasks", func(t *testing.T) {
2525
_, db := dbtest.GetDatabase(t)
2626
defer db.Close()
27-
require.NoError(t, postgres.Setup(ctx, "test", db, nil))
27+
require.NoError(t, postgres.SetupTables(ctx, db, nil))
2828

2929
scheduleID1 := uuid.NewV4().String()
3030
scheduleID2 := uuid.NewV4().String()
@@ -114,7 +114,7 @@ func TestScheduleTask(t *testing.T) {
114114
t.Run("Returns ErrScheduleQueueIsEmpty if there is no task to schedule", func(t *testing.T) {
115115
_, db := dbtest.GetDatabase(t)
116116
defer db.Close()
117-
require.NoError(t, postgres.Setup(ctx, "test", db, nil))
117+
require.NoError(t, postgres.SetupTables(ctx, db, nil))
118118

119119
qm := &queueMock{}
120120
w := newScheduleWorker(db, qm, time.Second)
@@ -136,7 +136,7 @@ func TestMetrics(t *testing.T) {
136136

137137
_, db := dbtest.GetDatabase(t)
138138
defer db.Close()
139-
require.NoError(t, postgres.Setup(context.TODO(), "test", db, nil))
139+
require.NoError(t, postgres.SetupTables(context.TODO(), db, nil))
140140

141141
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
142142
defer cancel()

0 commit comments

Comments
 (0)