Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::sync::Arc;

use relay_quotas::RateLimits;
use relay_system::Addr;

use crate::envelope::{EnvelopeHeaders, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, Context, CountRateLimited, Nothing, Output, QuotaRateLimiter};
use crate::services::outcome::{Outcome, TrackOutcome};

mod process;

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The client-reports are rate limited.
#[error("rate limited")]
RateLimited(RateLimits),
}

impl OutcomeError for Error {
type Error = Self;

fn consume(self) -> (Option<Outcome>, Self::Error) {
let outcome = match &self {
Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
};
(outcome, self)
}
}

impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}

/// A processor for Client-Reports.
pub struct ClientReportsProcessor {
limiter: Arc<QuotaRateLimiter>,
aggregator: Addr<TrackOutcome>,
}

impl ClientReportsProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>, aggregator: Addr<TrackOutcome>) -> Self {
Self {
limiter,
aggregator,
}
}
}

impl processing::Processor for ClientReportsProcessor {
type UnitOfWork = SerializedClientReport;
type Output = Nothing;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let headers = envelope.envelope().headers().clone();

let client_reports = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport))
.into_vec();

let work = SerializedClientReport {
headers,
client_reports,
};
Some(Managed::from_envelope(envelope, work))
}

async fn process(
&self,
mut client_reports: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
process::process_client_reports(
&mut client_reports,
ctx.config,
ctx.project_info,
&self.aggregator,
);

self.limiter
.enforce_quotas(&mut client_reports, ctx)
.await?;

Ok(Output::empty())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Premature Processing Bypasses Quota Enforcement Inconsistency

The process method calls process_client_reports before enforcing quotas, causing outcomes to be sent to the aggregator even when client reports are subsequently rate limited. This creates an inconsistency where outcomes are tracked for client reports that get rejected, potentially inflating outcome metrics for dropped data.

Fix in Cursor Fix in Web

}
}

/// Client-Reports in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedClientReport {
/// Original envelope headers.
headers: EnvelopeHeaders,

/// A list of client-reports waiting to be processed.
///
/// All items contained here must be client-reports.
client_reports: Vec<Item>,
}

impl Counted for SerializedClientReport {
fn quantities(&self) -> Quantities {
smallvec::smallvec![]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken from:

ItemType::ClientReport => smallvec![],

}
}

impl CountRateLimited for Managed<SerializedClientReport> {
type Error = Error;
}
Loading
Loading