Skip to content

Commit 94eaea9

Browse files
authored
[libbeat] Fix a shutdown race in the memory queue (#47248)
Fix a race in the memory queue where an event that was reported as unpublished during queue shutdown could still be enqueued, ingested, and have its acknowledgment callback triggered. In specific circumstances this could cause a panic during Filebeat shutdown, see #47246. A deterministic test is impossible since the behavior depended on precise timing of an internal select statement with multiple valid paths, but the accompanying unit test fails 90% of the time on the old code, and passes 100% of the time with this change.
1 parent 2deef7c commit 94eaea9

File tree

3 files changed

+190
-49
lines changed

3 files changed

+190
-49
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Fix potential Filebeat panic during memory queue shutdown
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: filebeat
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234

libbeat/publisher/queue/memqueue/produce.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,7 @@ func (st *openState) publish(req pushRequest) (queue.EntryID, bool) {
134134
}
135135
select {
136136
case st.events <- req:
137-
// The events channel is buffered, which means we may successfully
138-
// write to it even if the queue is shutting down. To avoid blocking
139-
// forever during shutdown, we also have to wait on the queue's
140-
// shutdown channel.
141-
select {
142-
case resp := <-req.resp:
143-
return resp, true
144-
case <-st.queueClosing:
145-
st.events = nil
146-
return 0, false
147-
}
137+
return st.handlePendingResponse(req.resp)
148138
case <-st.done:
149139
st.events = nil
150140
return 0, false
@@ -162,17 +152,7 @@ func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) {
162152
}
163153
select {
164154
case st.events <- req:
165-
// The events channel is buffered, which means we may successfully
166-
// write to it even if the queue is shutting down. To avoid blocking
167-
// forever during shutdown, we also have to wait on the queue's
168-
// shutdown channel.
169-
select {
170-
case resp := <-req.resp:
171-
return resp, true
172-
case <-st.queueClosing:
173-
st.events = nil
174-
return 0, false
175-
}
155+
return st.handlePendingResponse(req.resp)
176156
case <-st.done:
177157
st.events = nil
178158
return 0, false
@@ -181,3 +161,31 @@ func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) {
181161
return 0, false
182162
}
183163
}
164+
165+
func (st *openState) handlePendingResponse(respChan chan queue.EntryID) (queue.EntryID, bool) {
166+
// The events channel is buffered, which means we may successfully
167+
// write to it even if the queue is shutting down. To avoid blocking
168+
// forever during shutdown, we also have to wait on the queue's
169+
// shutdown channel.
170+
select {
171+
case resp := <-respChan:
172+
return resp, true
173+
case <-st.queueClosing:
174+
}
175+
176+
// Clear the request channel so we can't write to it again
177+
st.events = nil
178+
179+
// Once the queue starts closing, it will not handle any more push
180+
// requests, however it may have handled ours before the closing
181+
// channel was triggered (and both may have arrived concurrently
182+
// at the select statement above). So to know whether our entry was
183+
// accepted we also need to check if there's a buffered response in
184+
// our channel.
185+
select {
186+
case resp := <-respChan:
187+
return resp, true
188+
default:
189+
}
190+
return 0, false
191+
}

libbeat/publisher/queue/memqueue/queue_test.go

Lines changed: 128 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package memqueue
1919

2020
import (
21+
"context"
2122
"encoding/binary"
2223
"flag"
2324
"fmt"
24-
"math"
2525
"math/rand/v2"
2626
"sync"
2727
"sync/atomic"
@@ -278,32 +278,6 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu
278278
}
279279
}
280280

281-
func TestAdjustInputQueueSize(t *testing.T) {
282-
t.Run("zero yields default value (main queue size=0)", func(t *testing.T) {
283-
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(0, 0))
284-
})
285-
t.Run("zero yields default value (main queue size=10)", func(t *testing.T) {
286-
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(0, 10))
287-
})
288-
t.Run("can't go below min", func(t *testing.T) {
289-
assert.Equal(t, minInputQueueSize, AdjustInputQueueSize(1, 0))
290-
})
291-
t.Run("can set any value within bounds", func(t *testing.T) {
292-
for q, mainQueue := minInputQueueSize+1, 4096; q < int(float64(mainQueue)*maxInputQueueSizeRatio); q += 10 {
293-
assert.Equal(t, q, AdjustInputQueueSize(q, mainQueue))
294-
}
295-
})
296-
t.Run("can set any value if no upper bound", func(t *testing.T) {
297-
for q := minInputQueueSize + 1; q < math.MaxInt32; q *= 2 {
298-
assert.Equal(t, q, AdjustInputQueueSize(q, 0))
299-
}
300-
})
301-
t.Run("can't go above upper bound", func(t *testing.T) {
302-
mainQueue := 4096
303-
assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue))
304-
})
305-
}
306-
307281
func TestBatchFreeEntries(t *testing.T) {
308282
const queueSize = 10
309283
const batchSize = 5
@@ -341,3 +315,130 @@ func TestBatchFreeEntries(t *testing.T) {
341315
require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches")
342316
}
343317
}
318+
319+
func TestProducerShutdown(t *testing.T) {
320+
// Test that the number of acknowledgment callbacks exactly matches the
321+
// number of published events when many goroutines are publishing during
322+
// queue shutdown.
323+
//
324+
// The numbers here (queue size, number of publisher workers, etc.) are
325+
// kind of magic since there's no deterministic way to verify this, but they
326+
// were chosen so that, when there _was_ a race in the queue shutdown that
327+
// could send an extra acknowledgment
328+
// (https://github.com/elastic/beats/issues/47246), this test failed about
329+
// 90% of the time.
330+
const queueSize = 1000
331+
const publishWorkers = 50
332+
var ackedCount atomic.Int64
333+
var publishedCount atomic.Int64
334+
testQueue := NewQueue(
335+
logp.NewNopLogger(),
336+
nil,
337+
Settings{
338+
Events: queueSize,
339+
MaxGetRequest: queueSize,
340+
FlushTimeout: time.Second},
341+
0,
342+
nil)
343+
344+
var wg sync.WaitGroup
345+
// Start workers to continuously publish events to the queue
346+
publishWorker := func() {
347+
defer wg.Done()
348+
// Continuously publish events until Publish returns false indicating queue
349+
// shutdown.
350+
producer := testQueue.Producer(
351+
queue.ProducerConfig{
352+
ACK: func(count int) { ackedCount.Add(int64(count)) },
353+
})
354+
for {
355+
_, published := producer.Publish(0)
356+
if published {
357+
publishedCount.Add(1)
358+
} else {
359+
return
360+
}
361+
}
362+
}
363+
for range publishWorkers {
364+
wg.Add(1)
365+
go publishWorker()
366+
}
367+
// Start a reader to continuously drain the queue and acknowledge the events
368+
wg.Add(1)
369+
go func() {
370+
defer wg.Done()
371+
// Continuously read and acknowledge events from the queue
372+
for {
373+
batch, err := testQueue.Get(queueSize)
374+
if err == nil {
375+
batch.Done()
376+
} else {
377+
return
378+
}
379+
}
380+
}()
381+
382+
// Wait for the queue to go through at least one full rotation
383+
require.Eventually(
384+
t,
385+
func() bool { return publishedCount.Load() > queueSize },
386+
time.Second,
387+
time.Millisecond,
388+
"events are not flowing through the queue")
389+
390+
// Trigger queue shutdown
391+
testQueue.Close(false)
392+
393+
// Wait for queue context to finish
394+
select {
395+
case <-testQueue.Done():
396+
case <-time.After(5 * time.Second):
397+
require.Fail(t, "queue never shut down")
398+
}
399+
400+
// Wait for helper routines to finish
401+
wg.Wait()
402+
403+
// Wait for the ack loop to finish processing callbacks
404+
testQueue.wg.Wait()
405+
406+
require.Equal(t, publishedCount.Load(), ackedCount.Load(), "published and acknowledged event counts should match")
407+
}
408+
409+
func BenchmarkProducerThroughput(b *testing.B) {
410+
const queueSize = 10000
411+
const publishWorkers = 10
412+
testQueue := NewQueue(
413+
logp.NewNopLogger(),
414+
nil,
415+
Settings{
416+
Events: queueSize,
417+
MaxGetRequest: queueSize,
418+
FlushTimeout: time.Second},
419+
0,
420+
nil)
421+
422+
ctx, cancel := context.WithCancel(context.Background())
423+
publishWorker := func() {
424+
producer := testQueue.Producer(queue.ProducerConfig{})
425+
for ctx.Err() == nil {
426+
producer.Publish(0)
427+
}
428+
}
429+
for range publishWorkers {
430+
go publishWorker()
431+
}
432+
for b.Loop() {
433+
// With a flush timeout of a second, we can confidently expect we'll get
434+
// a full batch each time, so each iteration is measuring the time for the
435+
// publish workers to fill the queue.
436+
batch, err := testQueue.Get(queueSize)
437+
if err != nil {
438+
b.Fatal("Fetching queue batch should succeed")
439+
}
440+
batch.Done()
441+
}
442+
cancel()
443+
testQueue.Close(true)
444+
}

0 commit comments

Comments
 (0)