Skip to content

Commit b5da373

Browse files
committed
ref(server): Switches common endpoint to use Managed
1 parent 879dbbb commit b5da373

File tree

25 files changed

+336
-155
lines changed

25 files changed

+336
-155
lines changed

relay-server/src/endpoints/attachments.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ pub async fn handle(
3939
meta: RequestMeta,
4040
Path(path): Path<AttachmentPath>,
4141
multipart: ConstrainedMultipart,
42-
) -> Result<impl IntoResponse, BadStoreRequest> {
42+
) -> axum::response::Result<impl IntoResponse> {
4343
let envelope = extract_envelope(meta, path, multipart, state.config()).await?;
44-
common::handle_envelope(&state, envelope).await?;
44+
common::handle_envelope(&state, envelope)
45+
.await?
46+
.ignore_rate_limits();
4547
Ok(StatusCode::CREATED)
4648
}

relay-server/src/endpoints/common.rs

Lines changed: 97 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use relay_statsd::metric;
99
use serde::Deserialize;
1010

1111
use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
12-
use crate::managed::ManagedEnvelope;
12+
use crate::managed::{Managed, Rejected};
1313
use crate::service::ServiceState;
1414
use crate::services::buffer::ProjectKeyPair;
1515
use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome};
@@ -269,31 +269,40 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
269269
/// returned and the envelope is not queued.
270270
fn queue_envelope(
271271
state: &ServiceState,
272-
mut managed_envelope: ManagedEnvelope,
273-
) -> Result<(), BadStoreRequest> {
274-
let envelope = managed_envelope.envelope_mut();
275-
272+
mut envelope: Managed<Box<Envelope>>,
273+
) -> Result<(), Rejected<BadStoreRequest>> {
276274
if state.config().relay_mode() != RelayMode::Proxy {
277275
// Remove metrics from the envelope and queue them directly on the project's `Aggregator`.
278276
// In proxy mode, we cannot aggregate metrics because we may not have a project ID.
279277
let is_metric = |i: &Item| matches!(i.ty(), ItemType::Statsd | ItemType::MetricBuckets);
280-
let metric_items = envelope.take_items_by(is_metric);
281278

282-
if !metric_items.is_empty() {
283-
relay_log::trace!("sending metrics into processing queue");
279+
let metrics;
280+
(envelope, metrics) = envelope.split_once(|mut envelope| {
281+
let metrics = envelope.take_items_by(is_metric).into_vec();
282+
(envelope, metrics)
283+
});
284+
285+
metrics.accept(|metrics| {
286+
if metrics.is_empty() {
287+
return;
288+
}
289+
284290
state.processor().send(ProcessMetrics {
285-
data: MetricData::Raw(metric_items.into_vec()),
291+
data: MetricData::Raw(metrics),
286292
received_at: envelope.received_at(),
287293
sent_at: envelope.sent_at(),
288294
project_key: envelope.meta().public_key(),
289295
source: BucketSource::from_meta(envelope.meta()),
290296
});
291-
}
297+
})
292298
}
293299

294-
let pkp = ProjectKeyPair::from_envelope(&*envelope);
295-
if !state.envelope_buffer(pkp).try_push(managed_envelope) {
296-
return Err(BadStoreRequest::QueueFailed);
300+
let pkp = ProjectKeyPair::from_envelope(&envelope);
301+
if let Err(envelope) = state.envelope_buffer(pkp).try_push(envelope) {
302+
return Err(envelope.reject_err((
303+
Outcome::Invalid(DiscardReason::Internal),
304+
BadStoreRequest::QueueFailed,
305+
)));
297306
}
298307

299308
Ok(())
@@ -310,66 +319,110 @@ fn queue_envelope(
310319
pub async fn handle_envelope(
311320
state: &ServiceState,
312321
envelope: Box<Envelope>,
313-
) -> Result<Option<EventId>, BadStoreRequest> {
322+
) -> Result<HandledEnvelope, Rejected<BadStoreRequest>> {
314323
emit_envelope_metrics(&envelope);
315324

325+
let mut envelope = Managed::from_envelope(envelope, state.outcome_aggregator().clone());
326+
316327
if state.memory_checker().check_memory().is_exceeded() {
317-
return Err(BadStoreRequest::QueueFailed);
328+
return Err(envelope.reject_err((
329+
Outcome::Invalid(DiscardReason::Internal),
330+
BadStoreRequest::QueueFailed,
331+
)));
318332
};
319333

320-
let mut managed_envelope = ManagedEnvelope::new(envelope, state.outcome_aggregator().clone());
321-
322334
// If configured, remove unknown items at the very beginning. If the envelope is
323335
// empty, we fail the request with a special control flow error to skip checks and
324336
// queueing, that still results in a `200 OK` response.
325-
utils::remove_unknown_items(state.config(), &mut managed_envelope);
326-
327-
let event_id = managed_envelope.envelope().event_id();
328-
if managed_envelope.envelope().is_empty() {
329-
managed_envelope.reject(Outcome::Invalid(DiscardReason::EmptyEnvelope));
330-
return Ok(event_id);
337+
utils::remove_unknown_items(state.config(), &mut envelope);
338+
339+
let event_id = envelope.event_id();
340+
if envelope.is_empty() {
341+
return Ok(HandledEnvelope {
342+
event_id,
343+
rate_limits: Default::default(),
344+
});
331345
}
332346

333-
let project_key = managed_envelope.envelope().meta().public_key();
347+
let project_key = envelope.meta().public_key();
334348

335349
// Prefetch sampling project key, current spooling implementations rely on this behavior.
336350
//
337351
// To be changed once spool v1 has been removed.
338-
if let Some(sampling_project_key) = managed_envelope.envelope().sampling_key()
352+
if let Some(sampling_project_key) = envelope.sampling_key()
339353
&& sampling_project_key != project_key
340354
{
341355
state.project_cache_handle().fetch(sampling_project_key);
342356
}
343357

344-
let checked = state
358+
let rate_limits = state
345359
.project_cache_handle()
346360
.get(project_key)
347-
.check_envelope(managed_envelope)
361+
.check_envelope(&mut envelope)
348362
.await
349-
.map_err(BadStoreRequest::EventRejected)?;
363+
.map_err(|err| err.map(BadStoreRequest::EventRejected))?;
350364

351-
let Some(mut managed_envelope) = checked.envelope else {
352-
// All items have been removed from the envelope.
353-
return Err(BadStoreRequest::RateLimited(checked.rate_limits));
354-
};
365+
if envelope.is_empty() {
366+
return Err(envelope.reject_err((None, BadStoreRequest::RateLimited(rate_limits))));
367+
}
355368

356-
if let Err(offender) =
357-
utils::check_envelope_size_limits(state.config(), managed_envelope.envelope())
358-
{
359-
managed_envelope.reject(Outcome::Invalid(DiscardReason::TooLarge(offender)));
360-
return Err(BadStoreRequest::Overflow(offender));
369+
if let Err(offender) = utils::check_envelope_size_limits(state.config(), &envelope) {
370+
return Err(envelope.reject_err((
371+
Outcome::Invalid(DiscardReason::TooLarge(offender)),
372+
BadStoreRequest::Overflow(offender),
373+
)));
361374
}
362375

363-
queue_envelope(state, managed_envelope)?;
376+
queue_envelope(state, envelope)?;
364377

365-
if checked.rate_limits.is_limited() {
378+
Ok(HandledEnvelope {
379+
event_id,
366380
// Even if some envelope items have been queued, there might be active rate limits on
367381
// other items. Communicate these rate limits to the downstream (Relay or SDK).
368-
//
369-
// See `IntoResponse` implementation of `BadStoreRequest`.
370-
Err(BadStoreRequest::RateLimited(checked.rate_limits))
371-
} else {
372-
Ok(event_id)
382+
rate_limits,
383+
})
384+
}
385+
386+
/// A successfully handled envelope, returned by [`handle_envelope`].
387+
#[derive(Debug)]
388+
#[must_use = "rate limits of a handled envelope must be used"]
389+
pub struct HandledEnvelope {
390+
/// The event id of the envelope.
391+
pub event_id: Option<EventId>,
392+
/// All active, but not necessarily enforced, rate limits.
393+
///
394+
/// These rate limits should always be communicated to the client on envelope endpoints.
395+
///
396+
/// See also: [`BadStoreRequest::RateLimited`].
397+
pub rate_limits: RateLimits,
398+
}
399+
400+
impl HandledEnvelope {
401+
/// Ensures all active rate limits are handled as an error.
402+
///
403+
/// This is legacy behaviour where active rate limits are returned as an error, instead of
404+
/// being added to the usual response.
405+
/// The event id in this legacy behaviour is only returned when there are no active rate
406+
/// limits.
407+
///
408+
/// The functions simplifies this legacy handling by turning rate limits into an error again.
409+
pub fn ensure_rate_limits(self) -> Result<Option<EventId>, BadStoreRequest> {
410+
if self.rate_limits.is_limited() {
411+
return Err(BadStoreRequest::RateLimited(self.rate_limits));
412+
}
413+
Ok(self.event_id)
414+
}
415+
416+
/// Explicitly ignores contained active rate limits.
417+
///
418+
/// Endpoints which choose to not propagate active rate limits, should use this method to
419+
/// explicitly state the fact they do not propagate the rate limits.
420+
///
421+
/// Most endpoints ignore active rate limits, they are mostly used in envelope based endpoints.
422+
///
423+
/// Note: enforced rate limits are still returned as an error from [`handle_envelope`].
424+
pub fn ignore_rate_limits(self) -> Option<EventId> {
425+
self.event_id
373426
}
374427
}
375428

relay-server/src/endpoints/envelope.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,11 @@ struct StoreResponse {
116116
async fn handle(
117117
state: ServiceState,
118118
params: EnvelopeParams,
119-
) -> Result<impl IntoResponse, BadStoreRequest> {
119+
) -> axum::response::Result<impl IntoResponse> {
120120
let envelope = params.extract_envelope()?;
121-
let id = common::handle_envelope(&state, envelope).await?;
121+
let id = common::handle_envelope(&state, envelope)
122+
.await?
123+
.ensure_rate_limits()?;
122124
Ok(Json(StoreResponse { id }))
123125
}
124126

relay-server/src/endpoints/integrations/otlp.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ mod traces {
4343
.with_required_feature(Feature::OtelTracesEndpoint)
4444
.build();
4545

46-
common::handle_envelope(&state, envelope).await?;
46+
common::handle_envelope(&state, envelope)
47+
.await?
48+
.ignore_rate_limits();
4749

4850
Ok(StatusCode::ACCEPTED)
4951
}
@@ -72,7 +74,9 @@ mod logs {
7274
.with_required_feature(Feature::OtelLogsEndpoint)
7375
.build();
7476

75-
common::handle_envelope(&state, envelope).await?;
77+
common::handle_envelope(&state, envelope)
78+
.await?
79+
.ignore_rate_limits();
7680

7781
Ok(StatusCode::ACCEPTED)
7882
}

relay-server/src/endpoints/integrations/vercel.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ mod logs {
4040
.with_required_feature(Feature::VercelLogDrainEndpoint)
4141
.build();
4242

43-
common::handle_envelope(&state, envelope).await?;
43+
common::handle_envelope(&state, envelope)
44+
.await?
45+
.ignore_rate_limits();
4446

4547
Ok(StatusCode::ACCEPTED)
4648
}

relay-server/src/endpoints/minidump.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,10 @@ async fn handle(
233233
let id = envelope.event_id();
234234

235235
// Never respond with a 429 since clients often retry these
236-
match common::handle_envelope(&state, envelope).await {
236+
match common::handle_envelope(&state, envelope)
237+
.await
238+
.map_err(|err| err.into_inner())
239+
{
237240
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
238241
Err(error) => return Err(error.into()),
239242
};

relay-server/src/endpoints/monitor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ async fn handle(
6666
envelope.add_item(item);
6767

6868
// Never respond with a 429
69-
match common::handle_envelope(&state, envelope).await {
69+
match common::handle_envelope(&state, envelope)
70+
.await
71+
.map_err(|err| err.into_inner())
72+
{
7073
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
7174
Err(error) => return Err(error.into()),
7275
};

relay-server/src/endpoints/nel.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async fn handle(
4040
state: ServiceState,
4141
mime: Mime,
4242
params: NelReportParams,
43-
) -> Result<impl IntoResponse, BadStoreRequest> {
43+
) -> axum::response::Result<impl IntoResponse> {
4444
if !is_nel_mime(mime) {
4545
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response());
4646
}
@@ -55,7 +55,10 @@ async fn handle(
5555
envelope.add_item(report_item);
5656
}
5757

58-
common::handle_envelope(&state, envelope).await?;
58+
common::handle_envelope(&state, envelope)
59+
.await?
60+
.ignore_rate_limits();
61+
5962
Ok(().into_response())
6063
}
6164

relay-server/src/endpoints/playstation.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ async fn handle(
121121
let id = envelope.event_id();
122122

123123
// Never respond with a 429 since clients often retry these
124-
match common::handle_envelope(&state, envelope).await {
124+
match common::handle_envelope(&state, envelope)
125+
.await
126+
.map_err(|err| err.into_inner())
127+
{
125128
Ok(_) | Err(BadStoreRequest::RateLimited(_)) => (),
126129
Err(error) => return Err(error.into()),
127130
};

relay-server/src/endpoints/security_report.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,16 @@ async fn handle(
9595
state: ServiceState,
9696
mime: Mime,
9797
params: SecurityReportParams,
98-
) -> Result<impl IntoResponse, BadStoreRequest> {
98+
) -> axum::response::Result<impl IntoResponse> {
9999
if !is_security_mime(mime) {
100100
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response());
101101
}
102102

103103
let envelope = params.extract_envelope()?;
104-
common::handle_envelope(&state, envelope).await?;
104+
common::handle_envelope(&state, envelope)
105+
.await?
106+
.ignore_rate_limits();
107+
105108
Ok(().into_response())
106109
}
107110

0 commit comments

Comments
 (0)