Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
14d5b45
rm produce_span
jjbayer Oct 21, 2025
3b96721
wip: scaffold
jjbayer Oct 21, 2025
6de6c4d
wip: Move away from envelope
jjbayer Oct 22, 2025
8d4f004
wip
jjbayer Oct 22, 2025
8ee760f
wip: start porting over process
jjbayer Oct 23, 2025
e1239ba
wip: Up to profile filter
jjbayer Oct 23, 2025
f6a63c3
reject
jjbayer Oct 28, 2025
e03dc57
handle profile
jjbayer Oct 28, 2025
ba15677
wip
jjbayer Oct 29, 2025
257401a
wip: finalize_event
jjbayer Oct 29, 2025
f6ea5cd
wip: finalize_event
jjbayer Oct 29, 2025
a1fb533
Merge remote-tracking branch 'origin/master' into ref/store-no-envelo…
jjbayer Oct 31, 2025
00fe08a
wip
jjbayer Oct 31, 2025
ad03f63
add module
jjbayer Nov 3, 2025
6d17635
extract_transaction_metrics
jjbayer Nov 3, 2025
d09c0fc
Merge branch 'ref/process-transactions-3' into ref/store-no-envelope-…
jjbayer Nov 3, 2025
f98b8f0
too_many_arguments
jjbayer Nov 3, 2025
5b874c2
Merge branch 'ref/process-transactions-3' into ref/store-no-envelope-…
jjbayer Nov 3, 2025
a719218
Merge remote-tracking branch 'origin/master' into ref/store-no-envelo…
jjbayer Nov 3, 2025
cc19334
wip
jjbayer Nov 3, 2025
5bac4d9
ref: take project_id from scoping
jjbayer Nov 4, 2025
49b1af0
Merge remote-tracking branch 'origin/master' into ref/store-no-envelo…
jjbayer Nov 5, 2025
21b7360
merge
jjbayer Nov 5, 2025
e2409f8
wip
jjbayer Nov 5, 2025
031a5f6
wip
jjbayer Nov 5, 2025
8ab550d
wip
jjbayer Nov 5, 2025
70a714d
wip
jjbayer Nov 6, 2025
ca2a570
wip: profile early return
jjbayer Nov 6, 2025
7f0c182
wip: up to span from event
jjbayer Nov 6, 2025
e484eeb
reached the borrow checker
jjbayer Nov 6, 2025
30b1188
process is green
jjbayer Nov 6, 2025
1a3a588
ref
jjbayer Nov 6, 2025
157b609
earlier
jjbayer Nov 6, 2025
004b913
forgot files
jjbayer Nov 6, 2025
d3bc878
rate limiting
jjbayer Nov 7, 2025
4a2d681
serialization
jjbayer Nov 7, 2025
df18216
wip
jjbayer Nov 7, 2025
0bfb5ac
wip
jjbayer Nov 7, 2025
0d19f06
no errors
jjbayer Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions relay-server/src/envelope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ impl<M> EnvelopeHeaders<M> {
}
}

/// Overrides the dynamic sampling context in envelope headers.
pub fn set_dsc(&mut self, dsc: DynamicSamplingContext) {
self.trace = Some(ErrorBoundary::Ok(dsc));
}

/// Removes the dynamic sampling context from envelope headers.
pub fn remove_dsc(&mut self) {
self.trace = None;
}

/// Returns the timestamp when the event has been sent, according to the SDK.
pub fn sent_at(&self) -> Option<DateTime<Utc>> {
self.sent_at
Expand Down Expand Up @@ -441,12 +451,12 @@ impl Envelope {

/// Overrides the dynamic sampling context in envelope headers.
pub fn set_dsc(&mut self, dsc: DynamicSamplingContext) {
self.headers.trace = Some(ErrorBoundary::Ok(dsc));
self.headers.set_dsc(dsc);
}

/// Removes the dynamic sampling context from envelope headers.
pub fn remove_dsc(&mut self) {
self.headers.trace = None;
self.headers.remove_dsc();
}

/// Features required to process this envelope.
Expand Down Expand Up @@ -1221,7 +1231,7 @@ mod tests {
);
*Envelope::parse_bytes(bytes).unwrap()
};
envelope.set_dsc(dsc.clone());
envelope.headers.set_dsc(dsc.clone());

assert_eq!(
envelope.dsc().unwrap().transaction.as_ref().unwrap(),
Expand Down
47 changes: 47 additions & 0 deletions relay-server/src/managed/counted.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use relay_event_schema::protocol::{
OurLog, SessionAggregateItem, SessionAggregates, SessionUpdate, Span, SpanV2, TraceMetric,
};
Expand Down Expand Up @@ -29,6 +31,27 @@ impl Counted for () {
}
}

impl<T: Counted> Counted for Option<T> {
fn quantities(&self) -> Quantities {
match self {
Some(inner) => inner.quantities(),
None => Quantities::new(),
}
}
}

impl<T, S> Counted for (T, S)
where
T: Counted,
S: Counted,
{
fn quantities(&self) -> Quantities {
let mut v = self.0.quantities();
v.extend(self.1.quantities());
v
}
}

impl Counted for Item {
fn quantities(&self) -> Quantities {
self.quantities()
Expand Down Expand Up @@ -175,3 +198,27 @@ where
self.as_ref().quantities()
}
}

impl<T: Counted> Counted for Vec<T> {
fn quantities(&self) -> Quantities {
let mut quantities = BTreeMap::new();
for element in self {
for (category, size) in element.quantities() {
*quantities.entry(category).or_default() += size;
}
}
quantities.into_iter().collect()
}
}

impl<T: Counted, const N: usize> Counted for SmallVec<[T; N]> {
fn quantities(&self) -> Quantities {
let mut quantities = BTreeMap::new();
for element in self {
for (category, size) in element.quantities() {
*quantities.entry(category).or_default() += size;
}
}
quantities.into_iter().collect()
}
}
31 changes: 31 additions & 0 deletions relay-server/src/managed/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use relay_system::Addr;
use smallvec::SmallVec;

use crate::Envelope;
use crate::envelope::Item;
use crate::managed::{Counted, ManagedEnvelope, Quantities};
use crate::processing::CountRateLimited;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::ProcessingError;

Expand Down Expand Up @@ -152,6 +154,21 @@ impl<T: Counted> Managed<T> {
Managed::from_parts(other, Arc::clone(&self.meta))
}

/// Merge the current managed item with another managed item.
///
/// The merged tuple uses the meta of `self`. It is the responsibility of the caller to make sure
/// that this matches the `meta` of `other`.
pub fn merge<S>(self, other: Managed<S>) -> Managed<(T, S)>
where
S: Counted,
{
let (this, meta) = self.destructure();
let (other, other_meta) = other.destructure();
debug_assert!(Arc::ptr_eq(&meta, &other_meta));

Managed::from_parts((this, other), meta)
}

/// Original received timestamp.
pub fn received_at(&self) -> DateTime<Utc> {
self.meta.received_at
Expand All @@ -162,6 +179,13 @@ impl<T: Counted> Managed<T> {
self.meta.scoping
}

/// Get the address of the outcome aggregator.
///
/// NOTE: This should not be exposed, only here for the transition period.
pub fn outcome_addr(&self) -> &Addr<TrackOutcome> {
&self.meta.outcome_aggregator
}

/// Splits [`Self`] into two other [`Managed`] items.
///
/// The two resulting managed instances together are expected to have the same outcomes as the original instance..
Expand Down Expand Up @@ -560,6 +584,13 @@ impl<T: Counted + fmt::Debug> fmt::Debug for Managed<T> {
}
}

impl<T: Counted> Managed<Option<T>> {
pub fn transpose(self) -> Option<Managed<T>> {
let (o, meta) = self.destructure();
o.map(|t| Managed::from_parts(t, meta))
}
}

impl From<Managed<Box<Envelope>>> for ManagedEnvelope {
fn from(value: Managed<Box<Envelope>>) -> Self {
let (value, meta) = value.destructure();
Expand Down
5 changes: 4 additions & 1 deletion relay-server/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod logs;
pub mod sessions;
pub mod spans;
pub mod trace_metrics;
pub mod transactions;
pub mod utils;

/// A processor, for an arbitrary unit of work extracted from an envelope.
Expand All @@ -47,7 +48,9 @@ pub trait Processor {
/// Extracts a [`Self::UnitOfWork`] from a [`ManagedEnvelope`].
///
/// This is infallible, if a processor wants to report an error,
/// it should return a [`Self::UnitOfWork`] which later, can produce an error when being
/// it should return a [`Self::UnitOfWork`] which later, can produce an error when being processed.
///
/// Returns `None` if nothing in the envelope concerns this processor.
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope)
-> Option<Managed<Self::UnitOfWork>>;

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/spans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod filter;
mod integrations;
mod process;
#[cfg(feature = "processing")]
mod store;
pub mod store;
mod validate;

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down
Loading