@@ -2,8 +2,10 @@ package rueidis
22
33import (
44 "context"
5+ "runtime"
56 "sync"
67 "time"
8+ "unsafe"
79)
810
911// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
@@ -178,3 +180,230 @@ func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
178180 return a .val , a .err
179181 }
180182}
183+
184+ type flatentry struct {
185+ ovfl * flatentry
186+ next unsafe.Pointer
187+ prev unsafe.Pointer
188+ cmd string
189+ key string
190+ val []byte
191+ ttl int64
192+ size int64
193+ mark int64
194+ mu sync.Mutex
195+ }
196+
197+ func (f * flatentry ) insert (e * flatentry ) {
198+ f .size += e .size
199+ f .mu .Lock ()
200+ defer f .mu .Unlock ()
201+ e .ovfl = f .ovfl
202+ f .ovfl = e
203+ }
204+
205+ func (f * flatentry ) find (cmd string , ts int64 ) (ret RedisMessage , expired bool ) {
206+ if f == nil {
207+ return
208+ }
209+ if ts >= f .ttl {
210+ expired = true
211+ return
212+ }
213+ if cmd == f .cmd {
214+ _ = ret .CacheUnmarshalView (f .val )
215+ return
216+ }
217+ f .mu .Lock ()
218+ ovfl := f .ovfl
219+ f .mu .Unlock ()
220+ return ovfl .find (cmd , ts )
221+ }
222+
223+ const lrBatchSize = 64
224+
225+ type lrBatch struct {
226+ m map [* flatentry ]struct {}
227+ }
228+
229+ func NewFlattenCache (limit int64 ) CacheStore {
230+ f := & flatten {
231+ flights : make (map [string ]* adapterEntry ),
232+ cache : make (map [string ]* flatentry ),
233+ head : & flatentry {},
234+ tail : & flatentry {},
235+ size : 0 ,
236+ limit : limit ,
237+ }
238+ f .head .next = unsafe .Pointer (f .tail )
239+ f .tail .prev = unsafe .Pointer (f .head )
240+ f .lrup = sync.Pool {New : func () any {
241+ b := & lrBatch {m : make (map [* flatentry ]struct {}, lrBatchSize )}
242+ runtime .SetFinalizer (b , func () {
243+ f .llTailBatch (b )
244+ })
245+ return b
246+ }}
247+ return f
248+ }
249+
250+ type flatten struct {
251+ flights map [string ]* adapterEntry
252+ cache map [string ]* flatentry
253+ head * flatentry
254+ tail * flatentry
255+ lrup sync.Pool
256+ mark int64
257+ size int64
258+ limit int64
259+ mu sync.RWMutex
260+ }
261+
262+ func (f * flatten ) llAdd (e * flatentry ) {
263+ e .mark = f .mark
264+ e .prev = f .tail .prev
265+ e .next = unsafe .Pointer (f .tail )
266+ f .tail .prev = unsafe .Pointer (e )
267+ (* flatentry )(e .prev ).next = unsafe .Pointer (e )
268+ }
269+
270+ func (f * flatten ) llDel (e * flatentry ) {
271+ (* flatentry )(e .prev ).next = e .next
272+ (* flatentry )(e .next ).prev = e .prev
273+ e .mark = 0
274+ }
275+
276+ func (f * flatten ) llTail (e * flatentry ) {
277+ if e .mark == f .mark {
278+ f .llDel (e )
279+ f .llAdd (e )
280+ }
281+ }
282+
283+ func (f * flatten ) llTailBatch (b * lrBatch ) {
284+ f .mu .Lock ()
285+ for e := range b .m {
286+ f .llTail (e )
287+ }
288+ f .mu .Unlock ()
289+ clear (b .m )
290+ }
291+
292+ func (f * flatten ) remove (e * flatentry ) {
293+ f .size -= e .size
294+ f .llDel (e )
295+ delete (f .cache , e .key )
296+ }
297+
298+ func (f * flatten ) Flight (key , cmd string , ttl time.Duration , now time.Time ) (RedisMessage , CacheEntry ) {
299+ f .mu .RLock ()
300+ e := f .cache [key ]
301+ f .mu .RUnlock ()
302+ ts := now .UnixMilli ()
303+ if v , _ := e .find (cmd , ts ); v .typ != 0 {
304+ batch := f .lrup .Get ().(* lrBatch )
305+ batch .m [e ] = struct {}{}
306+ if len (batch .m ) == lrBatchSize {
307+ f .llTailBatch (batch )
308+ }
309+ f .lrup .Put (batch )
310+ return v , nil
311+ }
312+ fk := key + cmd
313+ f .mu .RLock ()
314+ af := f .flights [fk ]
315+ f .mu .RUnlock ()
316+ if af != nil {
317+ return RedisMessage {}, af
318+ }
319+ f .mu .Lock ()
320+ defer f .mu .Unlock ()
321+ e = f .cache [key ]
322+ v , expired := e .find (cmd , ts )
323+ if v .typ != 0 {
324+ f .llTail (e )
325+ return v , nil
326+ }
327+ if expired {
328+ f .remove (e )
329+ }
330+ if af = f .flights [fk ]; af != nil {
331+ return RedisMessage {}, af
332+ }
333+ f .flights [fk ] = & adapterEntry {ch : make (chan struct {}), xat : ts + ttl .Milliseconds ()}
334+ return RedisMessage {}, nil
335+ }
336+
337+ func (f * flatten ) Update (key , cmd string , val RedisMessage ) int64 {
338+ fk := key + cmd
339+ bs := val .CacheMarshal (nil )
340+ fe := & flatentry {cmd : cmd , val : bs , ttl : val .CachePXAT (), size : int64 (len (bs )+ len (key )+ len (cmd )) + int64 (unsafe .Sizeof (flatentry {}))}
341+ f .mu .Lock ()
342+ af := f .flights [fk ]
343+ if af != nil {
344+ delete (f .flights , fk )
345+ if af .xat < fe .ttl {
346+ fe .ttl = af .xat
347+ }
348+ }
349+ f .size += fe .size
350+ for ep := f .head .next ; f .size > f .limit && ep != unsafe .Pointer (f .tail ); {
351+ e := (* flatentry )(ep )
352+ f .remove (e )
353+ ep = e .next
354+ }
355+ if e := f .cache [key ]; e == nil {
356+ fe .key = key
357+ f .cache [key ] = fe
358+ f .llAdd (fe )
359+ } else {
360+ e .insert (fe )
361+ }
362+ f .mu .Unlock ()
363+ if af != nil {
364+ af .set (val , nil )
365+ }
366+ return fe .ttl
367+ }
368+
369+ func (f * flatten ) Cancel (key , cmd string , err error ) {
370+ fk := key + cmd
371+ f .mu .Lock ()
372+ defer f .mu .Unlock ()
373+ if af := f .flights [fk ]; af != nil {
374+ delete (f .flights , fk )
375+ af .set (RedisMessage {}, err )
376+ }
377+ }
378+
379+ func (f * flatten ) Delete (keys []RedisMessage ) {
380+ f .mu .Lock ()
381+ defer f .mu .Unlock ()
382+ if keys == nil {
383+ f .cache = make (map [string ]* flatentry , len (f .cache ))
384+ f .head .next = unsafe .Pointer (f .tail )
385+ f .tail .prev = unsafe .Pointer (f .head )
386+ f .mark ++
387+ f .size = 0
388+ } else {
389+ for _ , k := range keys {
390+ if e := f .cache [k .string ]; e != nil {
391+ f .remove (e )
392+ }
393+ }
394+ }
395+ }
396+
397+ func (f * flatten ) Close (err error ) {
398+ f .mu .Lock ()
399+ flights := f .flights
400+ f .flights = nil
401+ f .cache = nil
402+ f .tail = nil
403+ f .head = nil
404+ f .mark ++
405+ f .mu .Unlock ()
406+ for _ , entry := range flights {
407+ entry .set (RedisMessage {}, err )
408+ }
409+ }
0 commit comments