@@ -44,8 +44,6 @@ type CuckooTraceChecker struct {
4444const (
4545 // This is how many items can be in the Add Queue before we start blocking on Add.
4646 AddQueueDepth = 1000
47- // This is how long we'll sleep between possible lock cycles.
48- AddQueueSleepTime = 100 * time .Microsecond
4947)
5048
5149var cuckooTraceCheckerMetrics = []metrics.Metadata {
@@ -56,6 +54,12 @@ var cuckooTraceCheckerMetrics = []metrics.Metadata{
5654 {Name : AddQueueLockTime , Type : metrics .Histogram , Unit : metrics .Microseconds , Description : "the time spent holding the add queue lock" },
5755}
5856
57+ var batchPool = sync.Pool {
58+ New : func () interface {} {
59+ return make ([]string , 0 , AddQueueDepth )
60+ },
61+ }
62+
5963func NewCuckooTraceChecker (capacity uint , m metrics.Metrics ) * CuckooTraceChecker {
6064 c := & CuckooTraceChecker {
6165 capacity : capacity ,
@@ -75,16 +79,21 @@ func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker
7579 go func () {
7680 defer c .shutdownWG .Done ()
7781
78- ticker := time .NewTicker (AddQueueSleepTime )
7982 for {
8083 select {
81- case <- ticker .C :
82- for len (c .addch ) > 0 {
83- c .drain ()
84+ case t := <- c .addch :
85+ batch := batchPool .Get ().([]string )
86+ batch = append (batch , t )
87+ // insert 100 traceIDs at a time to reduce
88+ // the amount of time this goroutine holding the write lock
89+ for len (batch ) < 100 && len (c .addch ) > 0 {
90+ batch = append (batch , <- c .addch )
8491 }
92+
93+ c .insertBatch (batch )
94+
8595 case <- c .done :
8696 return
87-
8897 }
8998 }
9099 }()
@@ -104,46 +113,59 @@ func (c *CuckooTraceChecker) Stop() {
104113 }
105114}
106115
107- // This function records all the traces that were in the channel at the start of
108- // the call. The idea is to add as many as possible under a single lock. We do
109- // limit our lock hold time to 1ms, so if we can't add them all in that time, we
110- // stop and let the next call pick up the rest. We track a histogram metric
111- // about lock time.
116+ // insertBatch inserts a batch of traceIDs into the filters.
117+ // once the batch has been inserted, the batch slice is reset and returned
118+ // to the sync.Pool
119+ func (c * CuckooTraceChecker ) insertBatch (batch []string ) {
120+ if len (batch ) == 0 {
121+ batchPool .Put (batch )
122+ return
123+ }
124+
125+ c .mut .Lock ()
126+ lockStart := time .Now ()
127+
128+ for _ , b := range batch {
129+ c .current .Insert ([]byte (b ))
130+ // don't add anything to future if it doesn't exist yet
131+ if c .future != nil {
132+ c .future .Insert ([]byte (b ))
133+ }
134+ }
135+ c .mut .Unlock ()
136+
137+ qlt := time .Since (lockStart )
138+ c .met .Histogram (AddQueueLockTime , float64 (qlt .Microseconds ()))
139+
140+ // Reset batch and return to pool
141+ batch = batch [:0 ]
142+ batchPool .Put (batch )
143+ }
144+
145+ // drain collects all pending items from the channel and inserts them.
146+ // This is called by Maintain() and Stop() to ensure all queued items are processed.
112147func (c * CuckooTraceChecker ) drain () {
113148 n := len (c .addch )
114149 if n == 0 {
115150 return
116151 }
117- c .mut .Lock ()
118- // we don't start the timer until we have the lock, because we don't want to be counting
119- // the time we're waiting for the lock.
120- lockStart := time .Now ()
121- timeout := time .NewTimer (1 * time .Millisecond )
122- outer:
152+
153+ batch := batchPool .Get ().([]string )
123154 for i := 0 ; i < n ; i ++ {
124155 select {
125156 case t , ok := <- c .addch :
126157 // if the channel is closed, we will stop processing
127158 if ! ok {
128- break outer
159+ break
129160 }
130- s := []byte (t )
131- c .current .Insert (s )
132- // don't add anything to future if it doesn't exist yet
133- if c .future != nil {
134- c .future .Insert (s )
135- }
136- case <- timeout .C :
137- break outer
161+ batch = append (batch , t )
138162 default :
139163 // if the channel is empty, stop
140- break outer
164+ break
141165 }
142166 }
143- c .mut .Unlock ()
144- timeout .Stop ()
145- qlt := time .Since (lockStart )
146- c .met .Histogram (AddQueueLockTime , float64 (qlt .Microseconds ()))
167+
168+ c .insertBatch (batch )
147169}
148170
149171// Add puts a traceID into the filter. We need this to be fast
@@ -162,10 +184,9 @@ func (c *CuckooTraceChecker) Add(traceID string) {
162184
163185// Check tests if a traceID is (very probably) in the filter.
164186func (c * CuckooTraceChecker ) Check (traceID string ) bool {
165- b := []byte (traceID )
166187 c .mut .RLock ()
167188 defer c .mut .RUnlock ()
168- return c .current .Lookup (b )
189+ return c .current .Lookup ([] byte ( traceID ) )
169190}
170191
171192// Maintain should be called periodically; if the current filter is full, it replaces
@@ -175,17 +196,17 @@ func (c *CuckooTraceChecker) Maintain() {
175196 // possible before we start messing with the filters.
176197 c .drain ()
177198
199+ var futureLoadFactor float64
178200 c .mut .RLock ()
179201 currentLoadFactor := c .current .LoadFactor ()
180- c .met .Gauge (CurrentLoadFactor , currentLoadFactor )
181202 if c .future != nil {
182- c . met . Gauge ( FutureLoadFactor , c .future .LoadFactor () )
203+ futureLoadFactor = c .future .LoadFactor ()
183204 }
184- c . met . Gauge ( CurrentCapacity , float64 ( c .capacity ))
205+ currentCapacity := c .capacity
185206 c .mut .RUnlock ()
186207
187208 // once the current one is half loaded, we can start using the future one too
188- if c . future == nil && currentLoadFactor > 0.5 {
209+ if futureLoadFactor != 0 && currentLoadFactor > 0.5 {
189210 c .mut .Lock ()
190211 c .future = cuckoo .NewFilter (c .capacity )
191212 c .mut .Unlock ()
@@ -194,10 +215,15 @@ func (c *CuckooTraceChecker) Maintain() {
194215 // if the current one is full, cycle the filters
195216 if currentLoadFactor > 0.99 {
196217 c .mut .Lock ()
197- defer c .mut .Unlock ()
198218 c .current = c .future
199219 c .future = cuckoo .NewFilter (c .capacity )
220+ c .mut .Unlock ()
200221 }
222+
223+ c .met .Gauge (FutureLoadFactor , futureLoadFactor )
224+ c .met .Gauge (CurrentLoadFactor , currentLoadFactor )
225+ c .met .Gauge (CurrentCapacity , float64 (currentCapacity ))
226+
201227}
202228
203229// SetNextCapacity adjusts the capacity that will be set for the future filter on the next replacement.
0 commit comments