22//! proxies other requests through
33
44use crate :: node_client:: { NodeClient , NodeClientExt } ;
5- use async_lock:: { Mutex as AsyncMutex , RwLock as AsyncRwLock } ;
5+ use async_lock:: {
6+ Mutex as AsyncMutex , RwLock as AsyncRwLock ,
7+ RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard ,
8+ RwLockWriteGuardArc as AsyncRwLockWriteGuard ,
9+ } ;
610use async_trait:: async_trait;
711use futures:: { FutureExt , Stream , StreamExt , select} ;
812use std:: pin:: Pin ;
@@ -29,13 +33,18 @@ struct SegmentHeaders {
2933}
3034
3135impl SegmentHeaders {
36+ /// Push a new segment header to the cache, if it is the next segment header.
37+ /// Otherwise, skip the push.
3238 fn push ( & mut self , archived_segment_header : SegmentHeader ) {
3339 if self . segment_headers . len ( ) == u64:: from ( archived_segment_header. segment_index ( ) ) as usize
3440 {
3541 self . segment_headers . push ( archived_segment_header) ;
3642 }
3743 }
3844
45+ /// Get cached segment headers for the given segment indices.
46+ ///
47+ /// Returns `None` for segment indices that are not in the cache.
3948 fn get_segment_headers ( & self , segment_indices : & [ SegmentIndex ] ) -> Vec < Option < SegmentHeader > > {
4049 segment_indices
4150 . iter ( )
@@ -47,6 +56,7 @@ impl SegmentHeaders {
4756 . collect :: < Vec < _ > > ( )
4857 }
4958
59+ /// Get the last `limit` segment headers from the cache.
5060 fn last_segment_headers ( & self , limit : u32 ) -> Vec < Option < SegmentHeader > > {
5161 self . segment_headers
5262 . iter ( )
@@ -58,17 +68,24 @@ impl SegmentHeaders {
5868 . collect ( )
5969 }
6070
61- async fn sync < NC > ( & mut self , client : & NC ) -> anyhow:: Result < ( ) >
71+ /// Get uncached headers from the node, if we're not rate-limited.
72+ /// This only requires a read lock.
73+ ///
74+ /// Returns any extra segment headers if the download succeeds, or an error if it fails.
75+ /// The caller must write the returned segment headers to the cache, and reset the sync
76+ /// rate-limit timer.
77+ async fn request_uncached_headers < NC > ( & self , client : & NC ) -> anyhow:: Result < Vec < SegmentHeader > >
6278 where
6379 NC : NodeClient ,
6480 {
81+ // Skip the sync if we're still within the sync rate limit.
6582 if let Some ( last_synced) = & self . last_synced
6683 && last_synced. elapsed ( ) < SEGMENT_HEADERS_SYNC_INTERVAL
6784 {
68- return Ok ( ( ) ) ;
85+ return Ok ( Vec :: new ( ) ) ;
6986 }
70- self . last_synced . replace ( Instant :: now ( ) ) ;
7187
88+ let mut extra_segment_headers = Vec :: new ( ) ;
7289 let mut segment_index_offset = SegmentIndex :: from ( self . segment_headers . len ( ) as u64 ) ;
7390 let segment_index_step = SegmentIndex :: from ( MAX_SEGMENT_HEADERS_PER_REQUEST as u64 ) ;
7491
@@ -91,13 +108,19 @@ impl SegmentHeaders {
91108 break ' outer;
92109 } ;
93110
94- self . push ( segment_header) ;
111+ extra_segment_headers . push ( segment_header) ;
95112 }
96113
97114 segment_index_offset += segment_index_step;
98115 }
99116
100- Ok ( ( ) )
117+ Ok ( extra_segment_headers)
118+ }
119+
120+ /// Write the sync results to the cache, and reset the sync rate-limit timer.
121+ fn write_cache ( & mut self , extra_segment_headers : Vec < SegmentHeader > ) {
122+ self . segment_headers . extend ( extra_segment_headers) ;
123+ self . last_synced . replace ( Instant :: now ( ) ) ;
101124 }
102125}
103126
@@ -130,7 +153,9 @@ where
130153 client. subscribe_archived_segment_headers ( ) . await ?;
131154
132155 info ! ( "Downloading all segment headers from node..." ) ;
133- segment_headers. sync ( & client) . await ?;
156+ // No locking is needed, we are the first and only instance right now.
157+ let headers = segment_headers. request_uncached_headers ( & client) . await ?;
158+ segment_headers. write_cache ( headers) ;
134159 info ! ( "Downloaded all segment headers from node successfully" ) ;
135160
136161 let segment_headers = Arc :: new ( AsyncRwLock :: new ( segment_headers) ) ;
@@ -307,6 +332,10 @@ where
307332 ) )
308333 }
309334
335+ /// Gets segment headers for the given segment indices, updating the cache from the node if
336+ /// needed.
337+ ///
338+ /// Returns `None` for segment indices that are not in the cache.
310339 async fn segment_headers (
311340 & self ,
312341 segment_indices : Vec < SegmentIndex > ,
@@ -320,11 +349,41 @@ where
320349 if retrieved_segment_headers. iter ( ) . all ( Option :: is_some) {
321350 Ok ( retrieved_segment_headers)
322351 } else {
323- // Re-sync segment headers
324- let mut segment_headers = self . segment_headers . write ( ) . await ;
325- segment_headers. sync ( & self . inner ) . await ?;
352+ // We might be missing a requested segment header.
353+ // Sync the cache with the node, applying a rate limit, and return cached segment headers.
354+
355+ // If we took a write lock here, a queue of writers could starve all the readers, even if
356+ // those writers would be rate-limited. So we take an upgradable read lock for the rate
357+ // limit check.
358+ let segment_headers = self . segment_headers . upgradable_read_arc ( ) . await ;
359+
360+ // Try again after acquiring the upgradeable read lock, in case another caller already
361+ // synced the headers.
362+ let retrieved_segment_headers = segment_headers. get_segment_headers ( & segment_indices) ;
363+ if retrieved_segment_headers. iter ( ) . all ( Option :: is_some) {
364+ return Ok ( retrieved_segment_headers) ;
365+ }
366+
367+ // Try to sync the cache with the node.
368+ let extra_segment_headers = segment_headers
369+ . request_uncached_headers ( & self . inner )
370+ . await ?;
371+
372+ if extra_segment_headers. is_empty ( ) {
373+ // No extra segment headers on the node, or we are rate-limited.
374+ // So just return what we have in the cache.
375+ return Ok ( retrieved_segment_headers) ;
376+ }
326377
327- Ok ( segment_headers. get_segment_headers ( & segment_indices) )
378+ // We need to update the cached segment headers, so take the write lock.
379+ let mut segment_headers =
380+ AsyncRwLockUpgradableReadGuard :: upgrade ( segment_headers) . await ;
381+ segment_headers. write_cache ( extra_segment_headers) ;
382+
383+ // Downgrade the write lock to a read lock to get the updated segment headers for the
384+ // query.
385+ Ok ( AsyncRwLockWriteGuard :: downgrade ( segment_headers)
386+ . get_segment_headers ( & segment_indices) )
328387 }
329388 }
330389
@@ -346,6 +405,19 @@ impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
346405where
347406 NC : NodeClientExt ,
348407{
408+ async fn cached_segment_headers (
409+ & self ,
410+ segment_indices : Vec < SegmentIndex > ,
411+ ) -> anyhow:: Result < Vec < Option < SegmentHeader > > > {
412+ // To avoid remote denial of service, we don't update the cache here, because it is called
413+ // from network code.
414+ Ok ( self
415+ . segment_headers
416+ . read ( )
417+ . await
418+ . get_segment_headers ( & segment_indices) )
419+ }
420+
349421 async fn last_segment_headers ( & self , limit : u32 ) -> anyhow:: Result < Vec < Option < SegmentHeader > > > {
350422 Ok ( self
351423 . segment_headers
0 commit comments