diff --git a/relay-server/src/processing/client_reports/mod.rs b/relay-server/src/processing/client_reports/mod.rs new file mode 100644 index 00000000000..7bc30cc5107 --- /dev/null +++ b/relay-server/src/processing/client_reports/mod.rs @@ -0,0 +1,81 @@ +use relay_system::Addr; + +use crate::envelope::{EnvelopeHeaders, Item, ItemType}; +use crate::managed::{Counted, Managed, ManagedEnvelope, Quantities, Rejected}; +use crate::processing::{self, Context, Nothing, Output}; +use crate::services::outcome::TrackOutcome; + +mod process; + +// TODO: Not sure there is a cleaner way to do this. +#[derive(Debug, thiserror::Error)] +pub enum Error {} + +/// A processor for Client-Reports. +pub struct ClientReportsProcessor { + aggregator: Addr, +} + +impl ClientReportsProcessor { + /// Creates a new [`Self`]. + pub fn new(aggregator: Addr) -> Self { + Self { aggregator } + } +} + +impl processing::Processor for ClientReportsProcessor { + type UnitOfWork = SerializedClientReports; + type Output = Nothing; + type Error = Error; + + fn prepare_envelope( + &self, + envelope: &mut ManagedEnvelope, + ) -> Option> { + let headers = envelope.envelope().headers().clone(); + + let client_reports = envelope + .envelope_mut() + .take_items_by(|item| matches!(*item.ty(), ItemType::ClientReport)) + .into_vec(); + + let work = SerializedClientReports { + headers, + client_reports, + }; + Some(Managed::from_envelope(envelope, work)) + } + + async fn process( + &self, + mut client_reports: Managed, + ctx: Context<'_>, + ) -> Result, Rejected> { + process::process_client_reports( + &mut client_reports, + ctx.config, + ctx.project_info, + &self.aggregator, + ); + + Ok(Output::empty()) + } +} + +/// Client-Reports in their serialized state, as transported in an envelope. +#[derive(Debug)] +pub struct SerializedClientReports { + /// Original envelope headers. + headers: EnvelopeHeaders, + + /// A list of client-reports waiting to be processed. + /// + /// All items contained here must be client-reports. + client_reports: Vec, +} + +impl Counted for SerializedClientReports { + fn quantities(&self) -> Quantities { + smallvec::smallvec![] + } +} diff --git a/relay-server/src/processing/client_reports/process.rs b/relay-server/src/processing/client_reports/process.rs new file mode 100644 index 00000000000..e88f29d325e --- /dev/null +++ b/relay-server/src/processing/client_reports/process.rs @@ -0,0 +1,307 @@ +use std::collections::BTreeMap; +use std::error::Error; + +use chrono::{Duration as SignedDuration, Utc}; +use relay_common::time::UnixTimestamp; +use relay_config::Config; +use relay_event_normalization::ClockDriftProcessor; +use relay_event_schema::protocol::ClientReport; +use relay_filter::FilterStatKey; +use relay_quotas::ReasonCode; +use relay_sampling::evaluation::MatchedRuleIds; +use relay_system::Addr; + +use crate::constants::DEFAULT_EVENT_RETENTION; +use crate::managed::Managed; +use crate::processing::client_reports::SerializedClientReports; +use crate::services::outcome::{Outcome, RuleCategories, TrackOutcome}; +use crate::services::processor::MINIMUM_CLOCK_DRIFT; +use crate::services::projects::project::ProjectInfo; + +/// Fields of client reports that map to specific [`Outcome`]s without content. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ClientReportField { + /// The event has been filtered by an inbound data filter. + Filtered, + + /// The event has been filtered by a sampling rule. + FilteredSampling, + + /// The event has been rate limited. + RateLimited, + + /// The event has already been discarded on the client side. + ClientDiscard, +} + +/// Validates and extracts client reports. +/// +/// At the moment client reports are primarily used to transfer outcomes from +/// client SDKs. The outcomes are removed here and sent directly to the outcomes +/// system. +pub fn process_client_reports( + client_reports: &mut Managed, + config: &Config, + project_info: &ProjectInfo, + outcome_aggregator: &Addr, +) { + if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) + && config.processing_enabled() + { + // if a processing relay has client outcomes disabled we drop them without processing. + return; + } + + let mut timestamp = None; + let mut output_events = BTreeMap::new(); + let received = client_reports.received_at(); + + let clock_drift_processor = + ClockDriftProcessor::new(client_reports.headers.sent_at(), received) + .at_least(MINIMUM_CLOCK_DRIFT); + + for item in &client_reports.client_reports { + match ClientReport::parse(&item.payload()) { + Ok(ClientReport { + timestamp: report_timestamp, + discarded_events, + rate_limited_events, + filtered_events, + filtered_sampling_events, + }) => { + // Glue all discarded events together and give them the appropriate outcome type + let input_events = + discarded_events + .into_iter() + .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) + .chain( + filtered_events.into_iter().map(|discarded_event| { + (ClientReportField::Filtered, discarded_event) + }), + ) + .chain(filtered_sampling_events.into_iter().map(|discarded_event| { + (ClientReportField::FilteredSampling, discarded_event) + })) + .chain(rate_limited_events.into_iter().map(|discarded_event| { + (ClientReportField::RateLimited, discarded_event) + })); + + for (outcome_type, discarded_event) in input_events { + if discarded_event.reason.len() > 200 { + relay_log::trace!("ignored client outcome with an overlong reason"); + continue; + } + *output_events + .entry(( + outcome_type, + discarded_event.reason, + discarded_event.category, + )) + .or_insert(0) += discarded_event.quantity; + } + if let Some(ts) = report_timestamp { + timestamp.get_or_insert(ts); + } + } + Err(err) => { + relay_log::trace!(error = &err as &dyn Error, "invalid client report received") + } + } + } + + if output_events.is_empty() { + return; + } + + let timestamp = + timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); + + if clock_drift_processor.is_drifted() { + relay_log::trace!("applying clock drift correction to client report"); + clock_drift_processor.process_timestamp(timestamp); + } + + let retention_days = project_info + .config() + .event_retention + .unwrap_or(DEFAULT_EVENT_RETENTION); + let max_age = SignedDuration::days(retention_days.into()); + // also if we unable to parse the timestamp, we assume it's way too old here. + let in_past = timestamp + .as_datetime() + .map(|ts| (received - ts) > max_age) + .unwrap_or(true); + if in_past { + relay_log::trace!( + "skipping client outcomes older than {} days", + max_age.num_days() + ); + return; + } + + let max_future = SignedDuration::seconds(config.max_secs_in_future()); + // also if we unable to parse the timestamp, we assume it's way far in the future here. + let in_future = timestamp + .as_datetime() + .map(|ts| (ts - received) > max_future) + .unwrap_or(true); + if in_future { + relay_log::trace!( + "skipping client outcomes more than {}s in the future", + max_future.num_seconds() + ); + return; + } + + for ((outcome_type, reason, category), quantity) in output_events.into_iter() { + let Ok(outcome) = outcome_from_parts(outcome_type, &reason) else { + relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); + continue; + }; + + outcome_aggregator.send(TrackOutcome { + // If we get to this point, the unwrap should not be used anymore, since we know by + // now that the timestamp can be parsed, but just incase we fallback to UTC current + // `DateTime`. + timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), + scoping: client_reports.scoping(), + outcome, + event_id: None, + remote_addr: None, // omitting the client address allows for better aggregation + category, + quantity, + }); + } +} + +/// Parse an outcome from an outcome ID and a reason string. +/// +/// Currently only used to reconstruct outcomes encoded in client reports. +fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { + match field { + ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { + Some(rule_ids) => MatchedRuleIds::parse(rule_ids) + .map(RuleCategories::from) + .map(Outcome::FilteredSampling) + .map_err(|_| ()), + None => Err(()), + }, + ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), + ClientReportField::Filtered => Ok(Outcome::Filtered( + FilterStatKey::try_from(reason).map_err(|_| ())?, + )), + ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { + "" => None, + other => Some(ReasonCode::new(other)), + })), + } +} + +#[cfg(test)] +mod tests { + + use crate::services::outcome::RuleCategory; + + use super::*; + + #[test] + fn test_from_outcome_type_sampled() { + assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err()); + + assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err()); + + assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err()); + + assert!(matches!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"), + Err(()) + )); + + assert!(matches!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"), + Err(()) + )); + + assert!(matches!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"), + Err(()) + )); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::Other].into() + ))) + ); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::Other].into() + ))) + ); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::Other].into() + ))) + ); + + assert_eq!( + outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"), + Ok(Outcome::FilteredSampling(RuleCategories( + [RuleCategory::BoostEnvironments].into() + ))) + ); + + assert_eq!( + outcome_from_parts( + ClientReportField::FilteredSampling, + "Sampled:1001,1456,1567,3333,4444" + ), + Ok(Outcome::FilteredSampling(RuleCategories( + [ + RuleCategory::BoostEnvironments, + RuleCategory::BoostLowVolumeTransactions, + RuleCategory::BoostLatestReleases, + RuleCategory::Custom + ] + .into() + ))) + ); + } + + #[test] + fn test_from_outcome_type_filtered() { + assert!(matches!( + outcome_from_parts(ClientReportField::Filtered, "error-message"), + Ok(Outcome::Filtered(FilterStatKey::ErrorMessage)) + )); + + assert!(matches!( + outcome_from_parts(ClientReportField::Filtered, "hydration-error"), + Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_))) + )); + } + + #[test] + fn test_from_outcome_type_client_discard() { + assert_eq!( + outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(), + Outcome::ClientDiscard("foo_reason".into()) + ); + } + + #[test] + fn test_from_outcome_type_rate_limited() { + assert!(matches!( + outcome_from_parts(ClientReportField::RateLimited, ""), + Ok(Outcome::RateLimited(None)) + )); + assert_eq!( + outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(), + Outcome::RateLimited(Some(ReasonCode::new("foo_reason"))) + ); + } +} diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index ae05848688d..e4dcb53a6aa 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -23,6 +23,7 @@ pub use self::forward::*; pub use self::limits::*; pub mod check_ins; +pub mod client_reports; pub mod logs; pub mod sessions; pub mod spans; @@ -155,6 +156,14 @@ impl Output { } } + /// Creates an new output with neither main nor metrics. + pub fn empty() -> Self { + Self { + main: None, + metrics: None, + } + } + /// Maps an `Output` to `Output` by applying a function to [`Self::main`]. pub fn map(self, f: F) -> Output where diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index c14bfa5433a..4afb63bd28c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -48,6 +48,7 @@ use crate::metrics::{MetricOutcomes, MetricsLimiter, MinimalTrackableBucket}; use crate::metrics_extraction::transactions::ExtractedMetrics; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::processing::check_ins::CheckInsProcessor; +use crate::processing::client_reports::ClientReportsProcessor; use crate::processing::logs::LogsProcessor; use crate::processing::sessions::SessionsProcessor; use crate::processing::spans::SpansProcessor; @@ -1146,6 +1147,7 @@ struct Processing { spans: SpansProcessor, check_ins: CheckInsProcessor, sessions: SessionsProcessor, + client_report: ClientReportsProcessor, } impl EnvelopeProcessorService { @@ -1204,6 +1206,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] let rate_limiter = rate_limiter.map(Arc::new); + let outcome_aggregator = addrs.outcome_aggregator.clone(); let inner = InnerProcessor { pool, global_config, @@ -1232,7 +1235,8 @@ impl EnvelopeProcessorService { trace_metrics: TraceMetricsProcessor::new(Arc::clone("a_limiter)), spans: SpansProcessor::new(Arc::clone("a_limiter), geoip_lookup.clone()), check_ins: CheckInsProcessor::new(Arc::clone("a_limiter)), - sessions: SessionsProcessor::new(quota_limiter), + sessions: SessionsProcessor::new(Arc::clone("a_limiter)), + client_report: ClientReportsProcessor::new(outcome_aggregator), }, geoip_lookup, config, @@ -1708,32 +1712,6 @@ impl EnvelopeProcessorService { Ok(Some(extracted_metrics)) } - /// Processes user and client reports. - async fn process_client_reports( - &self, - managed_envelope: &mut TypedEnvelope, - ctx: processing::Context<'_>, - ) -> Result, ProcessingError> { - let mut extracted_metrics = ProcessingExtractedMetrics::new(); - - self.enforce_quotas( - managed_envelope, - Annotated::empty(), - &mut extracted_metrics, - ctx, - ) - .await?; - - report::process_client_reports( - managed_envelope, - ctx.config, - ctx.project_info, - self.inner.addrs.outcome_aggregator.clone(), - ); - - Ok(Some(extracted_metrics)) - } - /// Processes replays. async fn process_replays( &self, @@ -1931,7 +1909,14 @@ impl EnvelopeProcessorService { .await } ProcessingGroup::Standalone => run!(process_standalone, project_id, ctx), - ProcessingGroup::ClientReport => run!(process_client_reports, ctx), + ProcessingGroup::ClientReport => { + self.process_with_processor( + &self.inner.processing.client_report, + managed_envelope, + ctx, + ) + .await + } ProcessingGroup::Replay => { run!(process_replays, ctx) } diff --git a/relay-server/src/services/processor/report.rs b/relay-server/src/services/processor/report.rs index ad5b7f6dc7d..6459e4de409 100644 --- a/relay-server/src/services/processor/report.rs +++ b/relay-server/src/services/processor/report.rs @@ -1,195 +1,12 @@ //! Contains code related to validation and normalization of the user and client reports. -use std::collections::BTreeMap; use std::error::Error; -use chrono::{Duration as SignedDuration, Utc}; -use relay_common::time::UnixTimestamp; -use relay_config::Config; -use relay_event_normalization::ClockDriftProcessor; -use relay_event_schema::protocol::{ClientReport, UserReport}; -use relay_filter::FilterStatKey; -use relay_quotas::ReasonCode; -use relay_sampling::evaluation::MatchedRuleIds; -use relay_system::Addr; - -use crate::constants::DEFAULT_EVENT_RETENTION; +use relay_event_schema::protocol::UserReport; + use crate::envelope::{ContentType, ItemType}; use crate::managed::{ItemAction, TypedEnvelope}; -use crate::services::outcome::{DiscardReason, Outcome, RuleCategories, TrackOutcome}; -use crate::services::processor::{ClientReportGroup, MINIMUM_CLOCK_DRIFT}; -use crate::services::projects::project::ProjectInfo; - -/// Fields of client reports that map to specific [`Outcome`]s without content. -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum ClientReportField { - /// The event has been filtered by an inbound data filter. - Filtered, - - /// The event has been filtered by a sampling rule. - FilteredSampling, - - /// The event has been rate limited. - RateLimited, - - /// The event has already been discarded on the client side. - ClientDiscard, -} - -/// Validates and extracts client reports. -/// -/// At the moment client reports are primarily used to transfer outcomes from -/// client SDKs. The outcomes are removed here and sent directly to the outcomes -/// system. -pub fn process_client_reports( - managed_envelope: &mut TypedEnvelope, - config: &Config, - project_info: &ProjectInfo, - outcome_aggregator: Addr, -) { - // if client outcomes are disabled we leave the client reports unprocessed - // and pass them on. - if !config.emit_outcomes().any() || !config.emit_client_outcomes() { - // if a processing relay has client outcomes disabled we drop them. - if config.processing_enabled() { - managed_envelope.retain_items(|item| match item.ty() { - ItemType::ClientReport => ItemAction::DropSilently, - _ => ItemAction::Keep, - }); - } - return; - } - - let mut timestamp = None; - let mut output_events = BTreeMap::new(); - let received = managed_envelope.received_at(); - - let clock_drift_processor = - ClockDriftProcessor::new(managed_envelope.envelope().sent_at(), received) - .at_least(MINIMUM_CLOCK_DRIFT); - - // we're going through all client reports but we're effectively just merging - // them into the first one. - managed_envelope.retain_items(|item| { - if item.ty() != &ItemType::ClientReport { - return ItemAction::Keep; - }; - match ClientReport::parse(&item.payload()) { - Ok(ClientReport { - timestamp: report_timestamp, - discarded_events, - rate_limited_events, - filtered_events, - filtered_sampling_events, - }) => { - // Glue all discarded events together and give them the appropriate outcome type - let input_events = - discarded_events - .into_iter() - .map(|discarded_event| (ClientReportField::ClientDiscard, discarded_event)) - .chain( - filtered_events.into_iter().map(|discarded_event| { - (ClientReportField::Filtered, discarded_event) - }), - ) - .chain(filtered_sampling_events.into_iter().map(|discarded_event| { - (ClientReportField::FilteredSampling, discarded_event) - })) - .chain(rate_limited_events.into_iter().map(|discarded_event| { - (ClientReportField::RateLimited, discarded_event) - })); - - for (outcome_type, discarded_event) in input_events { - if discarded_event.reason.len() > 200 { - relay_log::trace!("ignored client outcome with an overlong reason"); - continue; - } - *output_events - .entry(( - outcome_type, - discarded_event.reason, - discarded_event.category, - )) - .or_insert(0) += discarded_event.quantity; - } - if let Some(ts) = report_timestamp { - timestamp.get_or_insert(ts); - } - } - Err(err) => { - relay_log::trace!(error = &err as &dyn Error, "invalid client report received") - } - } - ItemAction::DropSilently - }); - - if output_events.is_empty() { - return; - } - - let timestamp = - timestamp.get_or_insert_with(|| UnixTimestamp::from_secs(received.timestamp() as u64)); - - if clock_drift_processor.is_drifted() { - relay_log::trace!("applying clock drift correction to client report"); - clock_drift_processor.process_timestamp(timestamp); - } - - let retention_days = project_info - .config() - .event_retention - .unwrap_or(DEFAULT_EVENT_RETENTION); - let max_age = SignedDuration::days(retention_days.into()); - // also if we unable to parse the timestamp, we assume it's way too old here. - let in_past = timestamp - .as_datetime() - .map(|ts| (received - ts) > max_age) - .unwrap_or(true); - if in_past { - relay_log::trace!( - "skipping client outcomes older than {} days", - max_age.num_days() - ); - return; - } - - let max_future = SignedDuration::seconds(config.max_secs_in_future()); - // also if we unable to parse the timestamp, we assume it's way far in the future here. - let in_future = timestamp - .as_datetime() - .map(|ts| (ts - received) > max_future) - .unwrap_or(true); - if in_future { - relay_log::trace!( - "skipping client outcomes more than {}s in the future", - max_future.num_seconds() - ); - return; - } - - for ((outcome_type, reason, category), quantity) in output_events.into_iter() { - let outcome = match outcome_from_parts(outcome_type, &reason) { - Ok(outcome) => outcome, - Err(_) => { - relay_log::trace!(?outcome_type, reason, "invalid outcome combination"); - continue; - } - }; - - outcome_aggregator.send(TrackOutcome { - // If we get to this point, the unwrap should not be used anymore, since we know by - // now that the timestamp can be parsed, but just incase we fallback to UTC current - // `DateTime`. - timestamp: timestamp.as_datetime().unwrap_or_else(Utc::now), - scoping: managed_envelope.scoping(), - outcome, - event_id: None, - remote_addr: None, // omitting the client address allows for better aggregation - category, - quantity, - }); - } -} +use crate::services::outcome::{DiscardReason, Outcome}; /// Validates and normalizes all user report items in the envelope. /// @@ -241,46 +58,24 @@ fn trim_whitespaces(data: &[u8]) -> &[u8] { &data[from..to + 1] } -/// Parse an outcome from an outcome ID and a reason string. -/// -/// Currently only used to reconstruct outcomes encoded in client reports. -fn outcome_from_parts(field: ClientReportField, reason: &str) -> Result { - match field { - ClientReportField::FilteredSampling => match reason.strip_prefix("Sampled:") { - Some(rule_ids) => MatchedRuleIds::parse(rule_ids) - .map(RuleCategories::from) - .map(Outcome::FilteredSampling) - .map_err(|_| ()), - None => Err(()), - }, - ClientReportField::ClientDiscard => Ok(Outcome::ClientDiscard(reason.into())), - ClientReportField::Filtered => Ok(Outcome::Filtered( - FilterStatKey::try_from(reason).map_err(|_| ())?, - )), - ClientReportField::RateLimited => Ok(Outcome::RateLimited(match reason { - "" => None, - other => Some(ReasonCode::new(other)), - })), - } -} - #[cfg(test)] mod tests { use relay_cogs::Token; use relay_config::Config; use relay_event_schema::protocol::EventId; use relay_sampling::evaluation::ReservoirCounters; + use relay_system::Addr; use crate::envelope::{Envelope, Item}; use crate::extractors::RequestMeta; use crate::managed::ManagedEnvelope; - use crate::processing; - use crate::services::outcome::RuleCategory; + use crate::processing::{self}; use crate::services::processor::{ProcessEnvelopeGrouped, ProcessingGroup, Submit}; use crate::testutils::create_test_processor; use super::*; + // FIXME: Ask if moving the tests over is worth the changes to ProcessEnvelopeGrouped and Submit or if they should just stay here (hard to find). #[tokio::test] async fn test_client_report_removal() { relay_test::setup(); @@ -340,70 +135,6 @@ mod tests { assert!(envelope.is_none()); } - #[tokio::test] - async fn test_client_report_forwarding() { - relay_test::setup(); - let outcome_aggregator = Addr::dummy(); - - let config = Config::from_json_value(serde_json::json!({ - "outcomes": { - "emit_outcomes": false, - // a relay need to emit outcomes at all to not process. - "emit_client_outcomes": true - } - })) - .unwrap(); - - let processor = create_test_processor(Default::default()).await; - - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - - let request_meta = RequestMeta::new(dsn); - let mut envelope = Envelope::from_request(None, request_meta); - - envelope.add_item({ - let mut item = Item::new(ItemType::ClientReport); - item.set_payload( - ContentType::Json, - r#" - { - "discarded_events": [ - ["queue_full", "error", 42] - ] - } - "#, - ); - item - }); - - let mut envelopes = ProcessingGroup::split_envelope(*envelope, &Default::default()); - assert_eq!(envelopes.len(), 1); - let (group, envelope) = envelopes.pop().unwrap(); - let envelope = ManagedEnvelope::new(envelope, outcome_aggregator); - - let message = ProcessEnvelopeGrouped { - group, - envelope, - ctx: processing::Context { - config: &config, - ..processing::Context::for_test() - }, - reservoir_counters: &ReservoirCounters::default(), - }; - - let Ok(Some(Submit::Envelope(new_envelope))) = - processor.process(&mut Token::noop(), message).await - else { - panic!(); - }; - let item = new_envelope.envelope().items().next().unwrap(); - assert_eq!(item.ty(), &ItemType::ClientReport); - - new_envelope.accept(); // do not try to capture or emit outcomes - } - #[tokio::test] #[cfg(feature = "processing")] async fn test_client_report_removal_in_processing() { @@ -566,107 +297,6 @@ mod tests { assert_eq!(new_envelope.items().next().unwrap().ty(), &ItemType::Event); } - #[test] - fn test_from_outcome_type_sampled() { - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "adsf").is_err()); - - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:").is_err()); - - assert!(outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:foo").is_err()); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:"), - Err(()) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:;"), - Err(()) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:ab;12"), - Err(()) - )); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123,456"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:123"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::Other].into() - ))) - ); - - assert_eq!( - outcome_from_parts(ClientReportField::FilteredSampling, "Sampled:1001"), - Ok(Outcome::FilteredSampling(RuleCategories( - [RuleCategory::BoostEnvironments].into() - ))) - ); - - assert_eq!( - outcome_from_parts( - ClientReportField::FilteredSampling, - "Sampled:1001,1456,1567,3333,4444" - ), - Ok(Outcome::FilteredSampling(RuleCategories( - [ - RuleCategory::BoostEnvironments, - RuleCategory::BoostLowVolumeTransactions, - RuleCategory::BoostLatestReleases, - RuleCategory::Custom - ] - .into() - ))) - ); - } - - #[test] - fn test_from_outcome_type_filtered() { - assert!(matches!( - outcome_from_parts(ClientReportField::Filtered, "error-message"), - Ok(Outcome::Filtered(FilterStatKey::ErrorMessage)) - )); - - assert!(matches!( - outcome_from_parts(ClientReportField::Filtered, "hydration-error"), - Ok(Outcome::Filtered(FilterStatKey::GenericFilter(_))) - )); - } - - #[test] - fn test_from_outcome_type_client_discard() { - assert_eq!( - outcome_from_parts(ClientReportField::ClientDiscard, "foo_reason").unwrap(), - Outcome::ClientDiscard("foo_reason".into()) - ); - } - - #[test] - fn test_from_outcome_type_rate_limited() { - assert!(matches!( - outcome_from_parts(ClientReportField::RateLimited, ""), - Ok(Outcome::RateLimited(None)) - )); - assert_eq!( - outcome_from_parts(ClientReportField::RateLimited, "foo_reason").unwrap(), - Outcome::RateLimited(Some(ReasonCode::new("foo_reason"))) - ); - } - #[test] fn test_trim_whitespaces() { assert_eq!(trim_whitespaces(b""), b"");