-
Notifications
You must be signed in to change notification settings - Fork 104
WIP ref(client-reports): Move Client Reports to the new processing #5338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
91ba01a
5c1fb4b
70eb032
673958e
5505925
709bdaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,141 @@ | ||||||
| use std::sync::Arc; | ||||||
|
|
||||||
| use relay_quotas::RateLimits; | ||||||
|
|
||||||
| use crate::envelope::{EnvelopeHeaders, Item, ItemType}; | ||||||
| use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; | ||||||
| use crate::processing::{ | ||||||
| self, Context, CountRateLimited, Forward, ForwardContext, Output, QuotaRateLimiter, | ||||||
| }; | ||||||
| use crate::services::outcome::Outcome; | ||||||
|
|
||||||
| mod process; | ||||||
|
|
||||||
| #[derive(Debug, thiserror::Error)] | ||||||
| pub enum Error { | ||||||
| /// The client-reports are rate limited. | ||||||
| #[error("rate limited")] | ||||||
| RateLimited(RateLimits), | ||||||
| } | ||||||
|
|
||||||
| impl OutcomeError for Error { | ||||||
| type Error = Self; | ||||||
|
|
||||||
| fn consume(self) -> (Option<Outcome>, Self::Error) { | ||||||
| let outcome = match &self { | ||||||
| Self::RateLimited(limits) => { | ||||||
| let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); | ||||||
| Some(Outcome::RateLimited(reason_code)) | ||||||
| } | ||||||
| }; | ||||||
| (outcome, self) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl From<RateLimits> for Error { | ||||||
| fn from(value: RateLimits) -> Self { | ||||||
| Self::RateLimited(value) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// A processor for Client-Reports. | ||||||
| pub struct ClientReportsProcessor { | ||||||
| limiter: Arc<QuotaRateLimiter>, | ||||||
| } | ||||||
|
|
||||||
| impl ClientReportsProcessor { | ||||||
| /// Creates a new [`Self`]. | ||||||
| pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self { | ||||||
| Self { limiter } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl processing::Processor for ClientReportsProcessor { | ||||||
| type UnitOfWork = SerializedClientReport; | ||||||
| type Output = ClientReportOutput; | ||||||
| type Error = Error; | ||||||
|
|
||||||
| fn prepare_envelope( | ||||||
| &self, | ||||||
| envelope: &mut ManagedEnvelope, | ||||||
| ) -> Option<Managed<Self::UnitOfWork>> { | ||||||
| let headers = envelope.envelope().headers().clone(); | ||||||
|
|
||||||
| let client_reports = envelope | ||||||
| .envelope_mut() | ||||||
| .take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport)) | ||||||
| .into_vec(); | ||||||
|
|
||||||
| let work = SerializedClientReport { | ||||||
| headers, | ||||||
| client_reports, | ||||||
| }; | ||||||
| Some(Managed::from_envelope(envelope, work)) | ||||||
| } | ||||||
|
|
||||||
| async fn process( | ||||||
| &self, | ||||||
| mut client_reports: Managed<Self::UnitOfWork>, | ||||||
| ctx: Context<'_>, | ||||||
| ) -> Result<Output<Self::Output>, Rejected<Self::Error>> { | ||||||
| // FIXME: Decide if we want to make the TrackedOutcomes the output of this processor. | ||||||
| let outcomes = | ||||||
| process::process_client_reports(&mut client_reports, ctx.config, ctx.project_info); | ||||||
|
|
||||||
| // FIXME: Are there even quotas on a client_report (the old code did check quotas but seems strange) | ||||||
| self.limiter | ||||||
| .enforce_quotas(&mut client_reports, ctx) | ||||||
| .await?; | ||||||
|
|
||||||
| // FIXME: Looking at the 'old' processing code seems like we might still need to emit some | ||||||
| // metrics here | ||||||
| Ok(Output::just(ClientReportOutput(client_reports))) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // FIXME: The correct output might actually be the TrackedOutcomes that we generate | ||||||
| /// Output produced by the [`ClientReportsProcessor`]. | ||||||
| #[derive(Debug)] | ||||||
| pub struct ClientReportOutput(Managed<SerializedClientReport>); | ||||||
|
|
||||||
| impl Forward for ClientReportOutput { | ||||||
| fn serialize_envelope( | ||||||
| self, | ||||||
| ctx: ForwardContext<'_>, | ||||||
| ) -> Result<Managed<Box<crate::Envelope>>, Rejected<()>> { | ||||||
| // FIXME: Understand what should happen here | ||||||
| todo!() | ||||||
| } | ||||||
|
|
||||||
| #[cfg(feature = "processing")] | ||||||
| fn forward_store( | ||||||
| self, | ||||||
| s: &relay_system::Addr<crate::services::store::Store>, | ||||||
| ctx: ForwardContext<'_>, | ||||||
| ) -> Result<(), Rejected<()>> { | ||||||
| // FIXME: Understand what should happen here | ||||||
| todo!() | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Client-Reports in their serialized state, as transported in an envelope. | ||||||
| #[derive(Debug)] | ||||||
| pub struct SerializedClientReport { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| /// Original envelope headers. | ||||||
| headers: EnvelopeHeaders, | ||||||
|
|
||||||
| /// A list of client-reports waiting to be processed. | ||||||
| /// | ||||||
| /// All items contained here must be client-reports. | ||||||
| client_reports: Vec<Item>, | ||||||
| } | ||||||
|
|
||||||
| impl Counted for SerializedClientReport { | ||||||
| fn quantities(&self) -> Quantities { | ||||||
| smallvec::smallvec![] | ||||||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken from: relay/relay-server/src/envelope/item.rs Line 142 in 5505925
|
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl CountRateLimited for Managed<SerializedClientReport> { | ||||||
| type Error = Error; | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| use std::collections::BTreeMap; | ||
| use std::error::Error; | ||
|
|
||
| use chrono::{Duration as SignedDuration, Utc}; | ||
| use relay_common::time::UnixTimestamp; | ||
| use relay_config::Config; | ||
| use relay_event_normalization::ClockDriftProcessor; | ||
| use relay_event_schema::protocol::ClientReport; | ||
| use relay_filter::FilterStatKey; | ||
| use relay_quotas::ReasonCode; | ||
| use relay_sampling::evaluation::MatchedRuleIds; | ||
|
|
||
| use crate::constants::DEFAULT_EVENT_RETENTION; | ||
| use crate::managed::Managed; | ||
| use crate::processing::client_reports::SerializedClientReport; | ||
| use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; | ||
| use crate::services::processor::MINIMUM_CLOCK_DRIFT; | ||
| use crate::services::projects::project::ProjectInfo; | ||
|
|
||
| use crate::processing::client_reports; | ||
|
|
||
| /// Fields of client reports that map to specific [`Outcome`]s without content. | ||
| #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] | ||
| pub enum ClientReportField { | ||
| /// The event has been filtered by an inbound data filter. | ||
| Filtered, | ||
|
|
||
| /// The event has been filtered by a sampling rule. | ||
| FilteredSampling, | ||
|
|
||
| /// The event has been rate limited. | ||
| RateLimited, | ||
|
|
||
| /// The event has already been discarded on the client side. | ||
| ClientDiscard, | ||
| } | ||
|
|
||
| /// Validates and extracts client reports. | ||
| /// | ||
| /// At the moment client reports are primarily used to transfer outcomes from | ||
| /// client SDKs. The outcomes are removed here and sent directly to the outcomes | ||
| /// system. | ||
| pub fn process_client_reports( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is taken almost 1-1 from the old processing so if we think this is better broken up or rewritten to make it more elegant can still do that.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do that, left an explaining comment on why here: #5338 (comment) |
||
| client_reports: &mut Managed<SerializedClientReport>, | ||
| config: &Config, | ||
| project_info: &ProjectInfo, | ||
| ) -> Vec<TrackOutcome> { | ||
| // if client outcomes are disabled we leave the client reports unprocessed | ||
| // and pass them on. | ||
| if !config.emit_outcomes().any() || !config.emit_client_outcomes() { | ||
| // if a processing relay has client outcomes disabled we drop them. | ||
| if config.processing_enabled() { | ||
| // FIXME: Understand how to best drop them here silently | ||
| todo!("Drop all the items silently"); | ||
| } | ||
| return vec![]; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Outcomes Emission Fails When Disabled in Non-Processing ModeThe early return condition only triggers when outcomes are disabled AND processing is enabled, but when outcomes are disabled in non-processing mode, the function continues to send outcomes to the aggregator. This violates the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to me that cursor is correct here—in the old logic, if you had |
||
|
|
||
| let mut timestamp = None; | ||
| let mut output_events = BTreeMap::new(); | ||
| let received = client_reports.received_at(); | ||
|
|
||
| let clock_drift_processor = | ||
| ClockDriftProcessor::new(client_reports.headers.sent_at(), received) | ||
| .at_least(MINIMUM_CLOCK_DRIFT); | ||
|
|
||
| // we're going through all client reports but we're effectively just merging | ||
| // them into the first one. | ||
| client_reports.retain( | ||
| |client_reports| &mut client_reports.client_reports, | ||
| |item, _| { | ||
| match ClientReport::parse(&item.payload()) { | ||
| Ok(ClientReport { | ||
| timestamp: report_timestamp, | ||
| discarded_events, | ||
| rate_limited_events, | ||
| filtered_events, | ||
| filtered_sampling_events, | ||
| }) => { | ||
| // Glue all discarded events together and give them the appropriate outcome type | ||
| let input_events = discarded_events | ||
| .into_iter() | ||
| .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) | ||
| .chain( | ||
| filtered_events.into_iter().map(|discarded_event| { | ||
| (ClientReportField::Filtered, discarded_event) | ||
| }), | ||
| ) | ||
| .chain(filtered_sampling_events.into_iter().map(|discarded_event| { | ||
| (ClientReportField::FilteredSampling, discarded_event) | ||
| })) | ||
| .chain(rate_limited_events.into_iter().map(|discarded_event| { | ||
| (ClientReportField::RateLimited, discarded_event) | ||
| })); | ||
|
|
||
| for (outcome_type, discarded_event) in input_events { | ||
| if discarded_event.reason.len() > 200 { | ||
| relay_log::trace!("ignored client outcome with an overlong reason"); | ||
| continue; | ||
| } | ||
| *output_events | ||
| .entry(( | ||
| outcome_type, | ||
| discarded_event.reason, | ||
| discarded_event.category, | ||
| )) | ||
| .or_insert(0) += discarded_event.quantity; | ||
| } | ||
| if let Some(ts) = report_timestamp { | ||
| timestamp.get_or_insert(ts); | ||
| } | ||
| } | ||
| Err(err) => { | ||
| relay_log::trace!(error = &err as &dyn Error, "invalid client report received") | ||
| } | ||
| } | ||
| // FIXME: Understand what the equivalent of just dropping them here silently would be. | ||
| Ok::<_, client_reports::Error>(()) | ||
| }, | ||
| ); | ||
|
|
||
| if output_events.is_empty() { | ||
| return vec![]; | ||
| } | ||
|
|
||
| let timestamp = | ||
| timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); | ||
|
|
||
| if clock_drift_processor.is_drifted() { | ||
| relay_log::trace!("applying clock drift correction to client report"); | ||
| clock_drift_processor.process_timestamp(timestamp); | ||
| } | ||
|
|
||
| let retention_days = project_info | ||
| .config() | ||
| .event_retention | ||
| .unwrap_or(DEFAULT_EVENT_RETENTION); | ||
| let max_age = SignedDuration::days(retention_days.into()); | ||
| // also if we unable to parse the timestamp, we assume it's way too old here. | ||
| let in_past = timestamp | ||
| .as_datetime() | ||
| .map(|ts| (received - ts) > max_age) | ||
| .unwrap_or(true); | ||
| if in_past { | ||
| relay_log::trace!( | ||
| "skipping client outcomes older than {} days", | ||
| max_age.num_days() | ||
| ); | ||
| return vec![]; | ||
| } | ||
|
|
||
| let max_future = SignedDuration::seconds(config.max_secs_in_future()); | ||
| // also if we unable to parse the timestamp, we assume it's way far in the future here. | ||
| let in_future = timestamp | ||
| .as_datetime() | ||
| .map(|ts| (ts - received) > max_future) | ||
| .unwrap_or(true); | ||
| if in_future { | ||
| relay_log::trace!( | ||
| "skipping client outcomes more than {}s in the future", | ||
| max_future.num_seconds() | ||
| ); | ||
| return vec![]; | ||
| } | ||
|
|
||
| // FIXME: This can be done more elegantly | ||
| let mut outcome_collection: Vec<TrackOutcome> = vec![]; | ||
| for ((outcome_type, reason, category), quantity) in output_events.into_iter() { | ||
| let outcome = match outcome_from_parts(outcome_type, &reason) { | ||
| Ok(outcome) => outcome, | ||
| Err(_) => { | ||
| relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); | ||
| continue; | ||
| } | ||
| }; | ||
|
|
||
| outcome_collection.push(TrackOutcome { | ||
| // If we get to this point, the unwrap should not be used anymore, since we know by | ||
| // now that the timestamp can be parsed, but just incase we fallback to UTC current | ||
| // `DateTime`. | ||
| timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), | ||
| scoping: client_reports.scoping(), | ||
| outcome, | ||
| event_id: None, | ||
| remote_addr: None, // omitting the client address allows for better aggregation | ||
| category, | ||
| quantity, | ||
| }); | ||
| } | ||
|
|
||
| outcome_collection | ||
| } | ||
|
|
||
| /// Parse an outcome from an outcome ID and a reason string. | ||
| /// | ||
| /// Currently only used to reconstruct outcomes encoded in client reports. | ||
| fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result<Outcome, ()> { | ||
| match field { | ||
| ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { | ||
| Some(rule_ids) => MatchedRuleIds::parse(rule_ids) | ||
| .map(RuleCategories::from) | ||
| .map(Outcome::FilteredSampling) | ||
| .map_err(|_| ()), | ||
| None => Err(()), | ||
| }, | ||
| ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), | ||
| ClientReportField::Filtered => Ok(Outcome::Filtered( | ||
| FilterStatKey::try_from(reason).map_err(|_| ())?, | ||
| )), | ||
| ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { | ||
| "" => None, | ||
| other => Some(ReasonCode::new(other)), | ||
| })), | ||
| } | ||
| } | ||
|
|
||
| // FIXME: Move the test over and adapt them. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rate limiter will never do anything (as there is no
DataCategoryon the items themselves), let's in a follow up replace this call, remove theCountRateLimitedimplementation and add a comment that client reports aren't rate limited as they do not have a corresponding data category.For now it's fine to keep as this PR is a 1:1 port of the original code, but since the new processor is a bit more explicit, I think we should change it overall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we can remove the call in this PR, no reason to wait. Also because ownership of the outcomes is already passed on inside of
process_client_reports, as cursor pointed out.