@@ -926,6 +926,24 @@ impl BucketKey {
926926 }
927927}
928928
929+ /// Configuration value for [`AggregatorConfig::shift_key`].
930+ #[ derive( Clone , Debug , Default , Deserialize , Serialize ) ]
931+ #[ serde( rename_all = "lowercase" ) ]
932+ pub enum ShiftKey {
933+ /// Shifts the flush time by an offset based on the [`ProjectKey`].
934+ ///
935+ /// This allows buckets from the same project to be flushed together.
936+ #[ default]
937+ Project ,
938+
939+ /// Shifts the flush time by an offset based on the bucket key itself.
940+ ///
941+ /// This allows for a completely random distribution of bucket flush times.
942+ ///
943+ /// Only for use in processing Relays.
944+ Bucket ,
945+ }
946+
929947/// Parameters used by the [`AggregatorService`].
930948#[ derive( Clone , Debug , Deserialize , Serialize ) ]
931949#[ serde( default ) ]
@@ -1010,6 +1028,12 @@ pub struct AggregatorConfig {
10101028 ///
10111029 /// Defaults to `None`, i.e. no limit.
10121030 pub max_project_key_bucket_bytes : Option < usize > ,
1031+
1032+ /// Key used to shift the flush time of a bucket.
1033+ ///
1034+ /// This prevents flushing all buckets from a bucket interval at the same
1035+ /// time by computing an offset from the hash of the given key.
1036+ pub shift_key : ShiftKey ,
10131037}
10141038
10151039impl AggregatorConfig {
@@ -1068,28 +1092,21 @@ impl AggregatorConfig {
10681092 ///
10691093 /// Recent buckets are flushed after a grace period of `initial_delay`. Backdated buckets, that
10701094 /// is, buckets that lie in the past, are flushed after the shorter `debounce_delay`.
1071- fn get_flush_time ( & self , bucket_timestamp : UnixTimestamp , project_key : ProjectKey ) -> Instant {
1095+ fn get_flush_time ( & self , bucket_key : & BucketKey ) -> Instant {
10721096 let now = Instant :: now ( ) ;
10731097 let mut flush = None ;
10741098
1075- if let MonotonicResult :: Instant ( instant) = bucket_timestamp . to_instant ( ) {
1099+ if let MonotonicResult :: Instant ( instant) = bucket_key . timestamp . to_instant ( ) {
10761100 let instant = Instant :: from_std ( instant) ;
10771101 let bucket_end = instant + self . bucket_interval ( ) ;
10781102 let initial_flush = bucket_end + self . initial_delay ( ) ;
10791103 // If the initial flush is still pending, use that.
10801104 if initial_flush > now {
1081- // Shift deterministically within one bucket interval based on the project key. This
1082- // distributes buckets over time while also flushing all buckets of the same project
1083- // key together.
1084- let mut hasher = FnvHasher :: default ( ) ;
1085- hasher. write ( project_key. as_str ( ) . as_bytes ( ) ) ;
1086- let shift_millis = hasher. finish ( ) % ( self . bucket_interval * 1000 ) ;
1087-
1088- flush = Some ( initial_flush + Duration :: from_millis ( shift_millis) ) ;
1105+ flush = Some ( initial_flush + self . flush_time_shift ( bucket_key) ) ;
10891106 }
10901107 }
10911108
1092- let delay = UnixTimestamp :: now ( ) . as_secs ( ) as i64 - bucket_timestamp . as_secs ( ) as i64 ;
1109+ let delay = UnixTimestamp :: now ( ) . as_secs ( ) as i64 - bucket_key . timestamp . as_secs ( ) as i64 ;
10931110 relay_statsd:: metric!(
10941111 histogram( MetricHistograms :: BucketsDelay ) = delay as f64 ,
10951112 backdated = if flush. is_none( ) { "true" } else { "false" } ,
@@ -1102,6 +1119,23 @@ impl AggregatorConfig {
11021119 None => now + self . debounce_delay ( ) ,
11031120 }
11041121 }
1122+
1123+ // Shift deterministically within one bucket interval based on the project or bucket key.
1124+ //
1125+ // This distributes buckets over time to prevent peaks.
1126+ fn flush_time_shift ( & self , bucket : & BucketKey ) -> Duration {
1127+ let hash_value = match self . shift_key {
1128+ ShiftKey :: Project => {
1129+ let mut hasher = FnvHasher :: default ( ) ;
1130+ hasher. write ( bucket. project_key . as_str ( ) . as_bytes ( ) ) ;
1131+ hasher. finish ( )
1132+ }
1133+ ShiftKey :: Bucket => bucket. hash64 ( ) ,
1134+ } ;
1135+ let shift_millis = hash_value % ( self . bucket_interval * 1000 ) ;
1136+
1137+ Duration :: from_millis ( shift_millis)
1138+ }
11051139}
11061140
11071141impl Default for AggregatorConfig {
@@ -1119,6 +1153,7 @@ impl Default for AggregatorConfig {
11191153 max_tag_value_length : 200 ,
11201154 max_total_bucket_bytes : None ,
11211155 max_project_key_bucket_bytes : None ,
1156+ shift_key : ShiftKey :: default ( ) ,
11221157 }
11231158 }
11241159}
@@ -1671,7 +1706,6 @@ impl AggregatorService {
16711706 key : BucketKey ,
16721707 value : T ,
16731708 ) -> Result < ( ) , AggregateMetricsError > {
1674- let timestamp = key. timestamp ;
16751709 let project_key = key. project_key ;
16761710
16771711 let key = Self :: validate_bucket_key ( key, & self . config ) ?;
@@ -1734,7 +1768,7 @@ impl AggregatorService {
17341768 metric_name = metric_name_tag( & entry. key( ) . metric_name) ,
17351769 ) ;
17361770
1737- let flush_at = self . config . get_flush_time ( timestamp , project_key ) ;
1771+ let flush_at = self . config . get_flush_time ( entry . key ( ) ) ;
17381772 let bucket = value. into ( ) ;
17391773 added_cost = entry. key ( ) . cost ( ) + bucket. cost ( ) ;
17401774 entry. insert ( QueuedBucket :: new ( flush_at, bucket) ) ;
@@ -3241,4 +3275,11 @@ mod tests {
32413275 fn test_capped_iter_completeness_100 ( ) {
32423276 test_capped_iter_completeness ( 100 , 4 ) ;
32433277 }
3278+
3279+ #[ test]
3280+ fn test_parse_shift_key ( ) {
3281+ let json = r#"{"shift_key": "bucket"}"# ;
3282+ let parsed: AggregatorConfig = serde_json:: from_str ( json) . unwrap ( ) ;
3283+ assert ! ( matches!( parsed. shift_key, ShiftKey :: Bucket ) ) ;
3284+ }
32443285}
0 commit comments