Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions relay-server/src/processing/client_reports/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::sync::Arc;

use relay_quotas::RateLimits;

use crate::envelope::{EnvelopeHeaders, Item, ItemType};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{
self, Context, CountRateLimited, Forward, ForwardContext, Output, QuotaRateLimiter,
};
use crate::services::outcome::Outcome;

mod process;

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The client-reports are rate limited.
#[error("rate limited")]
RateLimited(RateLimits),
}

impl OutcomeError for Error {
type Error = Self;

fn consume(self) -> (Option<Outcome>, Self::Error) {
let outcome = match &self {
Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
};
(outcome, self)
}
}

impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}

/// A processor for Client-Reports.
pub struct ClientReportsProcessor {
limiter: Arc<QuotaRateLimiter>,
}

impl ClientReportsProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self {
Self { limiter }
}
}

impl processing::Processor for ClientReportsProcessor {
type UnitOfWork = SerializedClientReport;
type Output = ClientReportOutput;
type Error = Error;

fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let headers = envelope.envelope().headers().clone();

let client_reports = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport))
.into_vec();

let work = SerializedClientReport {
headers,
client_reports,
};
Some(Managed::from_envelope(envelope, work))
}

async fn process(
&self,
mut client_reports: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
// FIXME: Decide if we want to make the TrackedOutcomes the output of this processor.
let outcomes =
process::process_client_reports(&mut client_reports, ctx.config, ctx.project_info);

// FIXME: Are there even quotas on a client_report (the old code did check quotas but seems strange)
self.limiter
.enforce_quotas(&mut client_reports, ctx)
.await?;
Comment on lines +91 to +93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rate limiter will never do anything (as there is no DataCategory on the items themselves), let's in a follow up replace this call, remove the CountRateLimited implementation and add a comment that client reports aren't rate limited as they do not have a corresponding data category.

For now it's fine to keep as this PR is a 1:1 port of the original code, but since the new processor is a bit more explicit, I think we should change it overall.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we can remove the call in this PR, no reason to wait. Also because ownership of the outcomes is already passed on inside of process_client_reports, as cursor pointed out.


// FIXME: Looking at the 'old' processing code seems like we might still need to emit some
// metrics here
Ok(Output::just(ClientReportOutput(client_reports)))
}
}

// FIXME: The correct output might actually be the TrackedOutcomes that we generate
/// Output produced by the [`ClientReportsProcessor`].
#[derive(Debug)]
pub struct ClientReportOutput(Managed<SerializedClientReport>);

impl Forward for ClientReportOutput {
fn serialize_envelope(
self,
ctx: ForwardContext<'_>,
) -> Result<Managed<Box<crate::Envelope>>, Rejected<()>> {
// FIXME: Understand what should happen here
todo!()
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: &relay_system::Addr<crate::services::store::Store>,
ctx: ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
// FIXME: Understand what should happen here
todo!()
}
}

/// Client-Reports in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedClientReport {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub struct SerializedClientReport {
pub struct SerializedClientReports {

/// Original envelope headers.
headers: EnvelopeHeaders,

/// A list of client-reports waiting to be processed.
///
/// All items contained here must be client-reports.
client_reports: Vec<Item>,
}

impl Counted for SerializedClientReport {
fn quantities(&self) -> Quantities {
smallvec::smallvec![]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken from:

ItemType::ClientReport => smallvec![],

}
}

impl CountRateLimited for Managed<SerializedClientReport> {
type Error = Error;
}
217 changes: 217 additions & 0 deletions relay-server/src/processing/client_reports/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
use std::collections::BTreeMap;
use std::error::Error;

use chrono::{Duration as SignedDuration, Utc};
use relay_common::time::UnixTimestamp;
use relay_config::Config;
use relay_event_normalization::ClockDriftProcessor;
use relay_event_schema::protocol::ClientReport;
use relay_filter::FilterStatKey;
use relay_quotas::ReasonCode;
use relay_sampling::evaluation::MatchedRuleIds;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::managed::Managed;
use crate::processing::client_reports::SerializedClientReport;
use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome};
use crate::services::processor::MINIMUM_CLOCK_DRIFT;
use crate::services::projects::project::ProjectInfo;

use crate::processing::client_reports;

/// Fields of client reports that map to specific [`Outcome`]s without content.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ClientReportField {
/// The event has been filtered by an inbound data filter.
Filtered,

/// The event has been filtered by a sampling rule.
FilteredSampling,

/// The event has been rate limited.
RateLimited,

/// The event has already been discarded on the client side.
ClientDiscard,
}

/// Validates and extracts client reports.
///
/// At the moment client reports are primarily used to transfer outcomes from
/// client SDKs. The outcomes are removed here and sent directly to the outcomes
/// system.
pub fn process_client_reports(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is taken almost 1-1 from the old processing so if we think this is better broken up or rewritten to make it more elegant can still do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do that, left an explaining comment on why here: #5338 (comment)

client_reports: &mut Managed<SerializedClientReport>,
config: &Config,
project_info: &ProjectInfo,
) -> Vec<TrackOutcome> {
// if client outcomes are disabled we leave the client reports unprocessed
// and pass them on.
if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
// if a processing relay has client outcomes disabled we drop them.
if config.processing_enabled() {
// FIXME: Understand how to best drop them here silently
todo!("Drop all the items silently");
}
return vec![];
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Outcomes Emission Fails When Disabled in Non-Processing Mode

The early return condition only triggers when outcomes are disabled AND processing is enabled, but when outcomes are disabled in non-processing mode, the function continues to send outcomes to the aggregator. This violates the emit_client_outcomes configuration setting, causing outcomes to be emitted even when explicitly disabled.

Fix in Cursor Fix in Web

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that cursor is correct here—in the old logic, if you had config.emit_client_outcomes() disabled in a non-processing Relay, you would bail here, but now you continue processing them in that case. That seems unintended.


let mut timestamp = None;
let mut output_events = BTreeMap::new();
let received = client_reports.received_at();

let clock_drift_processor =
ClockDriftProcessor::new(client_reports.headers.sent_at(), received)
.at_least(MINIMUM_CLOCK_DRIFT);

// we're going through all client reports but we're effectively just merging
// them into the first one.
client_reports.retain(
|client_reports| &mut client_reports.client_reports,
|item, _| {
match ClientReport::parse(&item.payload()) {
Ok(ClientReport {
timestamp: report_timestamp,
discarded_events,
rate_limited_events,
filtered_events,
filtered_sampling_events,
}) => {
// Glue all discarded events together and give them the appropriate outcome type
let input_events = discarded_events
.into_iter()
.map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event))
.chain(
filtered_events.into_iter().map(|discarded_event| {
(ClientReportField::Filtered, discarded_event)
}),
)
.chain(filtered_sampling_events.into_iter().map(|discarded_event| {
(ClientReportField::FilteredSampling, discarded_event)
}))
.chain(rate_limited_events.into_iter().map(|discarded_event| {
(ClientReportField::RateLimited, discarded_event)
}));

for (outcome_type, discarded_event) in input_events {
if discarded_event.reason.len() > 200 {
relay_log::trace!("ignored client outcome with an overlong reason");
continue;
}
*output_events
.entry((
outcome_type,
discarded_event.reason,
discarded_event.category,
))
.or_insert(0) += discarded_event.quantity;
}
if let Some(ts) = report_timestamp {
timestamp.get_or_insert(ts);
}
}
Err(err) => {
relay_log::trace!(error = &err as &dyn Error, "invalid client report received")
}
}
// FIXME: Understand what the equivalent of just dropping them here silently would be.
Ok::<_, client_reports::Error>(())
},
);

if output_events.is_empty() {
return vec![];
}

let timestamp =
timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64));

if clock_drift_processor.is_drifted() {
relay_log::trace!("applying clock drift correction to client report");
clock_drift_processor.process_timestamp(timestamp);
}

let retention_days = project_info
.config()
.event_retention
.unwrap_or(DEFAULT_EVENT_RETENTION);
let max_age = SignedDuration::days(retention_days.into());
// also if we unable to parse the timestamp, we assume it's way too old here.
let in_past = timestamp
.as_datetime()
.map(|ts| (received - ts) > max_age)
.unwrap_or(true);
if in_past {
relay_log::trace!(
"skipping client outcomes older than {} days",
max_age.num_days()
);
return vec![];
}

let max_future = SignedDuration::seconds(config.max_secs_in_future());
// also if we unable to parse the timestamp, we assume it's way far in the future here.
let in_future = timestamp
.as_datetime()
.map(|ts| (ts - received) > max_future)
.unwrap_or(true);
if in_future {
relay_log::trace!(
"skipping client outcomes more than {}s in the future",
max_future.num_seconds()
);
return vec![];
}

// FIXME: This can be done more elegantly
let mut outcome_collection: Vec<TrackOutcome> = vec![];
for ((outcome_type, reason, category), quantity) in output_events.into_iter() {
let outcome = match outcome_from_parts(outcome_type, &reason) {
Ok(outcome) => outcome,
Err(_) => {
relay_log::trace!(?outcome_type, reason, "invalid outcome combination");
continue;
}
};

outcome_collection.push(TrackOutcome {
// If we get to this point, the unwrap should not be used anymore, since we know by
// now that the timestamp can be parsed, but just incase we fallback to UTC current
// `DateTime`.
timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now),
scoping: client_reports.scoping(),
outcome,
event_id: None,
remote_addr: None, // omitting the client address allows for better aggregation
category,
quantity,
});
}

outcome_collection
}

/// Parse an outcome from an outcome ID and a reason string.
///
/// Currently only used to reconstruct outcomes encoded in client reports.
fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> {
match field {
ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") {
Some(rule_ids) => MatchedRuleIds::parse(rule_ids)
.map(RuleCategories::from)
.map(Outcome::FilteredSampling)
.map_err(|_| ()),
None => Err(()),
},
ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())),
ClientReportField::Filtered => Ok(Outcome::Filtered(
FilterStatKey::try_from(reason).map_err(|_| ())?,
)),
ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason {
"" => None,
other => Some(ReasonCode::new(other)),
})),
}
}

// FIXME: Move the test over and adapt them.
2 changes: 2 additions & 0 deletions relay-server/src/processing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::Envelope;
use crate::managed::{Managed, Rejected};
use crate::processing::ForwardContext;
use crate::processing::check_ins::CheckInsProcessor;
use crate::processing::client_reports::ClientReportsProcessor;
use crate::processing::logs::LogsProcessor;
use crate::processing::sessions::SessionsProcessor;
use crate::processing::spans::SpansProcessor;
Expand Down Expand Up @@ -57,4 +58,5 @@ outputs!(
TraceMetrics => TraceMetricsProcessor,
Spans => SpansProcessor,
Sessions => SessionsProcessor,
ClientReports => ClientReportsProcessor,
);
1 change: 1 addition & 0 deletions relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use self::forward::*;
pub use self::limits::*;

pub mod check_ins;
pub mod client_reports;
pub mod logs;
pub mod sessions;
pub mod spans;
Expand Down
Loading
Loading