Skip to content

Commit 5fcda12

Browse files
committed
feat(spanv2): Implement AI normalizations
1 parent 29ea47c commit 5fcda12

File tree

6 files changed

+144
-4
lines changed

6 files changed

+144
-4
lines changed

relay-conventions/src/consts.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,18 @@ convention_attributes!(
2121
DB_SYSTEM_NAME => "db.system.name",
2222
DESCRIPTION => "sentry.description",
2323
FAAS_TRIGGER => "faas.trigger",
24+
GEN_AI_COST_INPUT_TOKENS => "gen_ai.cost.input_tokens",
25+
GEN_AI_COST_OUTPUT_TOKENS => "gen_ai.cost.output_tokens",
26+
GEN_AI_COST_TOTAL_TOKENS => "gen_ai.cost.total_tokens",
27+
GEN_AI_REQUEST_MODEL => "gen_ai.request.model",
28+
GEN_AI_RESPONSE_MODEL => "gen_ai.response.model",
29+
GEN_AI_RESPONSE_TPS => "gen_ai.response.tokens_per_second",
2430
GEN_AI_SYSTEM => "gen_ai.system",
31+
GEN_AI_USAGE_INPUT_CACHED_TOKENS => "gen_ai.usage.input_tokens.cached",
32+
GEN_AI_USAGE_INPUT_TOKENS => "gen_ai.usage.input_tokens",
33+
GEN_AI_USAGE_OUTPUT_REASONING_TOKENS => "gen_ai.usage.output_tokens.reasoning",
34+
GEN_AI_USAGE_OUTPUT_TOKENS => "gen_ai.usage.output_tokens",
35+
GEN_AI_USAGE_TOTAL_TOKENS => "gen_ai.usage.total_tokens",
2536
HTTP_PREFETCH => "sentry.http.prefetch",
2637
HTTP_REQUEST_METHOD => "http.request.method",
2738
HTTP_RESPONSE_STATUS_CODE => "http.response.status_code",
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use std::time::Duration;
2+
3+
use relay_conventions::{
4+
GEN_AI_COST_INPUT_TOKENS, GEN_AI_COST_OUTPUT_TOKENS, GEN_AI_COST_TOTAL_TOKENS,
5+
GEN_AI_REQUEST_MODEL, GEN_AI_RESPONSE_MODEL, GEN_AI_RESPONSE_TPS,
6+
GEN_AI_USAGE_INPUT_CACHED_TOKENS, GEN_AI_USAGE_INPUT_TOKENS,
7+
GEN_AI_USAGE_OUTPUT_REASONING_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, GEN_AI_USAGE_TOTAL_TOKENS,
8+
};
9+
use relay_event_schema::protocol::Attributes;
10+
use relay_protocol::Annotated;
11+
12+
use crate::ModelCosts;
13+
use crate::span::ai;
14+
15+
/// Normalizes AI attributes.
16+
pub fn normalize_ai(
17+
attributes: &mut Annotated<Attributes>,
18+
duration: Option<Duration>,
19+
costs: Option<&ModelCosts>,
20+
) {
21+
let Some(attributes) = attributes.value_mut() else {
22+
return;
23+
};
24+
25+
normalize_total_tokens(attributes);
26+
normalize_tokens_per_second(attributes, duration);
27+
normalize_ai_costs(attributes, costs);
28+
}
29+
30+
/// Calculates the [`GEN_AI_USAGE_TOTAL_TOKENS`] attribute.
31+
fn normalize_total_tokens(attributes: &mut Attributes) {
32+
if attributes.contains_key(GEN_AI_USAGE_TOTAL_TOKENS) {
33+
return;
34+
}
35+
36+
let input_tokens = attributes
37+
.get_value(GEN_AI_USAGE_INPUT_TOKENS)
38+
.and_then(|v| v.as_f64());
39+
40+
let output_tokens = attributes
41+
.get_value(GEN_AI_USAGE_OUTPUT_TOKENS)
42+
.and_then(|v| v.as_f64());
43+
44+
if input_tokens.is_none() && output_tokens.is_none() {
45+
return;
46+
}
47+
48+
let total_tokens = input_tokens.unwrap_or(0.0) + output_tokens.unwrap_or(0.0);
49+
attributes.insert(GEN_AI_USAGE_TOTAL_TOKENS, total_tokens);
50+
}
51+
52+
/// Calculates the [`GEN_AI_RESPONSE_TPS`] attribute.
53+
fn normalize_tokens_per_second(attributes: &mut Attributes, duration: Option<Duration>) {
54+
let Some(duration) = duration.filter(|d| !d.is_zero()) else {
55+
return;
56+
};
57+
58+
if attributes.contains_key(GEN_AI_RESPONSE_TPS) {
59+
return;
60+
}
61+
62+
let output_tokens = attributes
63+
.get_value(GEN_AI_USAGE_OUTPUT_TOKENS)
64+
.and_then(|v| v.as_f64())
65+
.filter(|v| *v > 0.0);
66+
67+
if let Some(output_tokens) = output_tokens {
68+
let tps = output_tokens / duration.as_secs_f64();
69+
attributes.insert(GEN_AI_RESPONSE_TPS, tps);
70+
}
71+
}
72+
73+
/// Calculates model costs and serializes them into attributes.
74+
fn normalize_ai_costs(attributes: &mut Attributes, model_costs: Option<&ModelCosts>) {
75+
if attributes.contains_key(GEN_AI_COST_TOTAL_TOKENS) {
76+
return;
77+
}
78+
79+
let model_cost = attributes
80+
.get_value(GEN_AI_REQUEST_MODEL)
81+
.or_else(|| attributes.get_value(GEN_AI_RESPONSE_MODEL))
82+
.and_then(|v| v.as_str())
83+
.and_then(|model| model_costs?.cost_per_token(model));
84+
85+
let Some(model_cost) = model_cost else { return };
86+
87+
let get_tokens = |key| {
88+
attributes
89+
.get_value(key)
90+
.and_then(|v| v.as_f64())
91+
.unwrap_or(0.0)
92+
};
93+
94+
let tokens = ai::UsedTokens {
95+
input_tokens: get_tokens(GEN_AI_USAGE_INPUT_TOKENS),
96+
input_cached_tokens: get_tokens(GEN_AI_USAGE_INPUT_CACHED_TOKENS),
97+
output_tokens: get_tokens(GEN_AI_USAGE_OUTPUT_TOKENS),
98+
output_reasoning_tokens: get_tokens(GEN_AI_USAGE_OUTPUT_REASONING_TOKENS),
99+
};
100+
101+
let Some(costs) = ai::calculate_costs(model_cost, tokens) else {
102+
return;
103+
};
104+
105+
attributes.insert(GEN_AI_COST_INPUT_TOKENS, costs.input);
106+
attributes.insert(GEN_AI_COST_OUTPUT_TOKENS, costs.output);
107+
attributes.insert(GEN_AI_COST_TOTAL_TOKENS, costs.total());
108+
}

relay-event-normalization/src/eap/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ use relay_protocol::{Annotated, ErrorKind, Meta, Remark, RemarkType, Value};
1616

1717
use crate::{ClientHints, FromUserAgentInfo as _, RawUserAgentInfo};
1818

19+
mod ai;
20+
21+
pub use self::ai::normalize_ai;
22+
1923
/// Normalizes/validates all attribute types.
2024
///
2125
/// Removes and marks all attributes with an error for which the specified [`AttributeType`]

relay-event-normalization/src/normalize/span/ai.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ fn map_ai_measurements_to_data(span: &mut Span) {
144144
&mut data.gen_ai_usage_output_tokens,
145145
"ai_completion_tokens_used",
146146
);
147+
}
148+
149+
fn set_total_tokens(span: &mut Span) {
150+
let data = span.data.get_or_insert_with(SpanData::default);
147151

148152
// It might be that 'total_tokens' is not set in which case we need to calculate it
149153
if data.gen_ai_usage_total_tokens.value().is_none() {
@@ -214,6 +218,8 @@ pub fn enrich_ai_span_data(
214218
}
215219

216220
map_ai_measurements_to_data(span);
221+
set_total_tokens(span);
222+
217223
if let Some(model_costs) = model_costs {
218224
extract_ai_data(span, model_costs);
219225
}

relay-server/src/processing/spans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl processing::Processor for SpansProcessor {
159159

160160
dynamic_sampling::validate_dsc(&spans).reject(&spans)?;
161161

162-
process::normalize(&mut spans, &self.geo_lookup);
162+
process::normalize(&mut spans, &self.geo_lookup, ctx);
163163
filter::filter(&mut spans, ctx);
164164

165165
self.limiter.enforce_quotas(&mut spans, ctx).await?;

relay-server/src/processing/spans/process.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use relay_event_normalization::{
24
GeoIpLookup, RequiredMode, SchemaProcessor, TimestampProcessor, TrimmingProcessor, eap,
35
};
@@ -67,11 +69,11 @@ fn expand_legacy_span(item: &Item) -> Result<WithHeader<SpanV2>> {
6769
}
6870

6971
/// Normalizes individual spans.
70-
pub fn normalize(spans: &mut Managed<ExpandedSpans>, geo_lookup: &GeoIpLookup) {
72+
pub fn normalize(spans: &mut Managed<ExpandedSpans>, geo_lookup: &GeoIpLookup, ctx: Context<'_>) {
7173
spans.retain_with_context(
7274
|spans| (&mut spans.spans, spans.headers.meta()),
7375
|span, meta, _| {
74-
normalize_span(span, meta, geo_lookup).inspect_err(|err| {
76+
normalize_span(span, meta, geo_lookup, ctx).inspect_err(|err| {
7577
relay_log::debug!("failed to normalize span: {err}");
7678
})
7779
},
@@ -82,6 +84,7 @@ fn normalize_span(
8284
span: &mut Annotated<SpanV2>,
8385
meta: &RequestMeta,
8486
geo_lookup: &GeoIpLookup,
87+
ctx: Context<'_>,
8588
) -> Result<()> {
8689
process_value(span, &mut TimestampProcessor, ProcessingState::root())?;
8790

@@ -97,7 +100,9 @@ fn normalize_span(
97100
meta.client_addr().and_then(|ip| geo_lookup.lookup(ip))
98101
});
99102

100-
// TODO: ai model costs
103+
let duration = span_duration(span);
104+
let model_costs = ctx.global_config.ai_model_costs.as_ref().ok();
105+
eap::normalize_ai(&mut span.attributes, duration, model_costs);
101106
};
102107

103108
process_value(span, &mut TrimmingProcessor::new(), ProcessingState::root())?;
@@ -144,6 +149,12 @@ fn scrub_span(span: &mut Annotated<SpanV2>, ctx: Context<'_>) -> Result<()> {
144149
Ok(())
145150
}
146151

152+
fn span_duration(span: &SpanV2) -> Option<Duration> {
153+
let start_timestamp = *span.start_timestamp.value()?;
154+
let timestamp = *span.end_timestamp.value()?;
155+
(timestamp - start_timestamp).to_std().ok()
156+
}
157+
147158
#[cfg(test)]
148159
mod tests {
149160
use relay_pii::{DataScrubbingConfig, PiiConfig};

0 commit comments

Comments
 (0)