@@ -2,10 +2,11 @@ package rueidis
22
33import (
44 "context"
5- "runtime"
65 "sync"
6+ "sync/atomic"
77 "time"
8- "unsafe"
8+
9+ "github.com/redis/rueidis/internal/cache"
910)
1011
1112// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
@@ -191,250 +192,73 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
191192 }
192193}
193194
194- type flatentry struct {
195- ovfl * flatentry
196- next unsafe.Pointer
197- prev unsafe.Pointer
198- cmd string
199- key string
200- val []byte
201- ttl int64
202- size int64
203- mark int64
204- mu sync.RWMutex
205- }
206-
207- func (f * flatentry ) insert (e * flatentry ) {
208- f .size += e .size
209- f .ttl = e .ttl
210- f .mu .Lock ()
211- e .ovfl = f .ovfl
212- f .ovfl = e
213- f .mu .Unlock ()
214- }
215-
216- func (f * flatentry ) find (cmd string , ts int64 ) ([]byte , bool ) {
217- if f != nil && ts >= f .ttl {
218- return nil , true
219- }
220- for next := f ; next != nil ; {
221- if cmd == next .cmd {
222- return next .val , false
223- }
224- next .mu .RLock ()
225- ovfl := next .ovfl
226- next .mu .RUnlock ()
227- next = ovfl
228- }
229- return nil , false
230- }
231-
232- const lrBatchSize = 64
233- const flattEntrySize = unsafe .Sizeof (flatentry {})
234-
235- type lrBatch struct {
236- m map [* flatentry ]struct {}
237- }
238-
239195func NewFlattenCache (limit int ) CacheStore {
240- f := & flatten {
241- flights : make (map [string ]* adapterEntry ),
242- cache : make (map [string ]* flatentry ),
243- head : & flatentry {},
244- tail : & flatentry {},
245- size : 0 ,
246- limit : int64 (limit ),
196+ return & flatten {
197+ flights : cache.NewDoubleMap [* adapterEntry ](64 ),
198+ cache : cache .NewLRUDoubleMap [[]byte ](64 , int64 (limit )),
247199 }
248- f .head .next = unsafe .Pointer (f .tail )
249- f .tail .prev = unsafe .Pointer (f .head )
250- f .lrup = sync.Pool {New : func () any {
251- b := & lrBatch {m : make (map [* flatentry ]struct {}, lrBatchSize )}
252- runtime .SetFinalizer (b , func (b * lrBatch ) {
253- if len (b .m ) >= 0 {
254- f .mu .Lock ()
255- f .llTailBatch (b )
256- f .mu .Unlock ()
257- }
258- })
259- return b
260- }}
261- return f
262200}
263201
264202type flatten struct {
265- flights map [string ]* adapterEntry
266- cache map [string ]* flatentry
267- head * flatentry
268- tail * flatentry
269- lrup sync.Pool
270- mark int64
271- size int64
272- limit int64
273- mu sync.RWMutex
274- }
275-
276- func (f * flatten ) llAdd (e * flatentry ) {
277- e .mark = f .mark
278- e .prev = f .tail .prev
279- e .next = unsafe .Pointer (f .tail )
280- f .tail .prev = unsafe .Pointer (e )
281- (* flatentry )(e .prev ).next = unsafe .Pointer (e )
282- }
283-
284- func (f * flatten ) llDel (e * flatentry ) {
285- (* flatentry )(e .prev ).next = e .next
286- (* flatentry )(e .next ).prev = e .prev
287- e .mark = - 1
288- }
289-
290- func (f * flatten ) llTail (e * flatentry ) {
291- f .llDel (e )
292- f .llAdd (e )
293- }
294-
295- func (f * flatten ) llTailBatch (b * lrBatch ) {
296- for e := range b .m {
297- if e .mark == f .mark {
298- f .llTail (e )
299- }
300- }
301- clear (b .m )
302- }
303-
304- func (f * flatten ) remove (e * flatentry ) {
305- f .size -= e .size
306- f .llDel (e )
307- delete (f .cache , e .key )
203+ flights * cache.DoubleMap [* adapterEntry ]
204+ cache * cache.LRUDoubleMap [[]byte ]
205+ close int32
308206}
309207
310208func (f * flatten ) Flight (key , cmd string , ttl time.Duration , now time.Time ) (RedisMessage , CacheEntry ) {
311- f .mu .RLock ()
312- e := f .cache [key ]
313- f .mu .RUnlock ()
314- ts := now .UnixMilli ()
315- if v , _ := e .find (cmd , ts ); v != nil {
316- batch := f .lrup .Get ().(* lrBatch )
317- batch .m [e ] = struct {}{}
318- if len (batch .m ) >= lrBatchSize {
319- f .mu .Lock ()
320- f .llTailBatch (batch )
321- f .mu .Unlock ()
322- }
323- f .lrup .Put (batch )
324- var ret RedisMessage
325- _ = ret .CacheUnmarshalView (v )
326- return ret , nil
209+ if atomic .LoadInt32 (& f .close ) == 1 {
210+ return RedisMessage {}, nil
327211 }
328- fk := key + cmd
329- f .mu .RLock ()
330- af := f .flights [fk ]
331- f .mu .RUnlock ()
332- if af != nil {
333- return RedisMessage {}, af
334- }
335- f .mu .Lock ()
336- e = f .cache [key ]
337- v , expired := e .find (cmd , ts )
338- if v != nil {
339- f .llTail (e )
340- f .mu .Unlock ()
212+ ts := now .UnixMilli ()
213+ if e , ok := f .cache .Find (key , cmd , ts ); ok {
341214 var ret RedisMessage
342- _ = ret .CacheUnmarshalView (v )
215+ _ = ret .CacheUnmarshalView (e )
343216 return ret , nil
344217 }
345- defer f .mu .Unlock ()
346- if expired {
347- f .remove (e )
348- }
349- if af = f .flights [fk ]; af != nil {
218+ xat := ts + ttl .Milliseconds ()
219+ if af , ok := f .flights .FindOrInsert (key , cmd , func () * adapterEntry {
220+ return & adapterEntry {ch : make (chan struct {}), xat : xat }
221+ }); ok {
350222 return RedisMessage {}, af
351223 }
352- if f .flights != nil {
353- f .flights [fk ] = & adapterEntry {ch : make (chan struct {}), xat : ts + ttl .Milliseconds ()}
354- }
355224 return RedisMessage {}, nil
356225}
357226
358227func (f * flatten ) Update (key , cmd string , val RedisMessage ) (sxat int64 ) {
359- fk := key + cmd
360- f .mu .RLock ()
361- af := f .flights [fk ]
362- f .mu .RUnlock ()
363- if af != nil {
228+ if af , ok := f .flights .Find (key , cmd ); ok {
364229 sxat = val .getExpireAt ()
365230 if af .xat < sxat || sxat == 0 {
366231 sxat = af .xat
367232 val .setExpireAt (sxat )
368233 }
369234 bs := val .CacheMarshal (nil )
370- fe := & flatentry {cmd : cmd , val : bs , ttl : sxat , size : int64 (len (bs )+ len (key )+ len (cmd )) + int64 (flattEntrySize ) + 64 } // 64 for 2 map entries
371- f .mu .Lock ()
372- if f .flights != nil {
373- delete (f .flights , fk )
374- f .size += fe .size
375- for ep := f .head .next ; f .size > f .limit && ep != unsafe .Pointer (f .tail ); {
376- e := (* flatentry )(ep )
377- f .remove (e )
378- ep = e .next
379- }
380- e := f .cache [key ]
381- if e != nil && e .cmd == cmd {
382- f .size -= e .size
383- f .llDel (e )
384- e = nil
385- }
386- if e == nil {
387- fe .key = key
388- f .cache [key ] = fe
389- f .llAdd (fe )
390- } else {
391- e .insert (fe )
392- }
393- }
394- f .mu .Unlock ()
235+ f .cache .Insert (key , cmd , int64 (len (bs )+ len (key )+ len (cmd ))+ int64 (cache .LRUEntrySize )+ 64 , sxat , bs )
236+ f .flights .Delete (key , cmd )
395237 af .setVal (val )
396238 }
397239 return sxat
398240}
399241
400242func (f * flatten ) Cancel (key , cmd string , err error ) {
401- fk := key + cmd
402- f .mu .Lock ()
403- defer f .mu .Unlock ()
404- if af := f .flights [fk ]; af != nil {
405- delete (f .flights , fk )
243+ if af , ok := f .flights .Find (key , cmd ); ok {
244+ f .flights .Delete (key , cmd )
406245 af .setErr (err )
407246 }
408247}
409248
410249func (f * flatten ) Delete (keys []RedisMessage ) {
411- f .mu .Lock ()
412- defer f .mu .Unlock ()
413250 if keys == nil {
414- f .cache = make (map [string ]* flatentry , len (f .cache ))
415- f .head .next = unsafe .Pointer (f .tail )
416- f .tail .prev = unsafe .Pointer (f .head )
417- f .mark ++
418- f .size = 0
251+ f .cache .DeleteAll ()
419252 } else {
420253 for _ , k := range keys {
421- if e := f .cache [k .string ]; e != nil {
422- f .remove (e )
423- }
254+ f .cache .Delete (k .string )
424255 }
425256 }
426257}
427258
428259func (f * flatten ) Close (err error ) {
429- f .mu .Lock ()
430- flights := f .flights
431- f .flights = nil
432- f .cache = nil
433- f .tail = nil
434- f .head = nil
435- f .mark ++
436- f .mu .Unlock ()
437- for _ , entry := range flights {
260+ atomic .StoreInt32 (& f .close , 1 )
261+ f .flights .Iterate (func (entry * adapterEntry ) {
438262 entry .setErr (err )
439- }
263+ })
440264}
0 commit comments