Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Normalize deprecated attribute names according to `sentry-conventions` for logs and V2 spans. ([#5257](https://github.com/getsentry/relay/pull/5257))
- Allow sample rate per trace metric. ([#5317](https://github.com/getsentry/relay/pull/5317))
- Replace `is_remote` with `is_segment` on the Span V2 schema. ([#5306](https://github.com/getsentry/relay/pull/5306))
- Add `response_timeout` config setting for Redis. ([#5329](https://github.com/getsentry/relay/pull/5329))

**Breaking Changes**:

Expand Down
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ criterion = "0.5.1"
crossbeam-channel = "0.5.15"
data-encoding = "2.9.0"
deadpool = "0.12.3"
deadpool-redis = "0.20.0"
debugid = "0.8.0"
dialoguer = "0.11.0"
dynfmt = "0.1.5"
Expand Down
7 changes: 7 additions & 0 deletions relay-config/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub struct PartialRedisConfigOptions {
/// blocking when the pool is exhausted.
#[serde(skip_serializing_if = "Option::is_none")]
pub wait_timeout: Option<u64>,
/// Sets the maximum time in seconds to wait for a result when sending a Redis command.
///
/// If a command exceeds this timeout, the connection will be recycled.
#[serde(skip_serializing_if = "Option::is_none")]
pub response_timeout: Option<u64>,
/// Sets the number of times after which the connection will check whether it is active when
/// being recycled.
///
Expand All @@ -55,6 +60,7 @@ impl Default for PartialRedisConfigOptions {
create_timeout: Some(3),
recycle_timeout: Some(2),
wait_timeout: None,
response_timeout: Some(6),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very low for the total command time, how did you arrive at this number?

I suspect that some cardinality limiting commands take longer than this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoot, you're totally right. I was only considering a subset of Redis operations. For cardinality limits the maximum usually hovers between 50 and 60s, should we make the timeout 90s to be safe?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Redis operations timeout too soon for long tasks

The default response_timeout of 6 seconds is too low for actual Redis operations. Based on the PR discussion, cardinality limiting commands can take 50-60 seconds to complete. With a 6-second timeout, these legitimate long-running operations will timeout, causing the connection to be detached (per the logic at pool.rs:50) and the operation to fail. This will result in functional failures for cardinality limiting operations.

Fix in Cursor Fix in Web

recycle_check_frequency: 100,
}
}
Expand Down Expand Up @@ -224,6 +230,7 @@ fn build_redis_config_options(
create_timeout: options.create_timeout,
recycle_timeout: options.recycle_timeout,
wait_timeout: options.wait_timeout,
response_timeout: options.response_timeout,
recycle_check_frequency: options.recycle_check_frequency,
}
}
Expand Down
5 changes: 2 additions & 3 deletions relay-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ publish = false
workspace = true

[dependencies]
deadpool = { workspace = true, optional = true }
deadpool-redis = { workspace = true, optional = true, features = ["rt_tokio_1", "cluster", "script"] }
deadpool = { workspace = true, optional = true, features = ["rt_tokio_1"] }
futures = { workspace = true }
redis = { workspace = true, optional = true, features = [
"cluster",
Expand All @@ -35,4 +34,4 @@ relay-system = { workspace = true }

[features]
default = []
impl = ["dep:deadpool", "dep:deadpool-redis", "dep:relay-statsd", "dep:redis"]
impl = ["dep:deadpool", "dep:relay-statsd", "dep:redis"]
6 changes: 6 additions & 0 deletions relay-redis/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pub struct RedisConfigOptions {
/// will fail with a timeout error. This setting helps prevent indefinite
/// blocking when the pool is exhausted.
pub wait_timeout: Option<u64>,
/// Sets the maximum time in seconds to wait for a result when sending a Redis command.
///
/// If a command exceeds this timeout, the connection will be recycled.
#[serde(skip_serializing_if = "Option::is_none")]
pub response_timeout: Option<u64>,
/// Sets the number of times after which the connection will check whether it is active when
/// being recycled.
///
Expand All @@ -43,6 +48,7 @@ impl Default for RedisConfigOptions {
create_timeout: Some(3),
recycle_timeout: Some(2),
wait_timeout: None,
response_timeout: Some(6),
recycle_check_frequency: 100,
}
}
Expand Down
88 changes: 68 additions & 20 deletions relay-redis/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use deadpool::managed::{Manager, Metrics, Object, Pool, RecycleError, RecycleResult};
use deadpool_redis::Manager as SingleManager;
use deadpool_redis::cluster::Manager as ClusterManager;
use futures::FutureExt;
use redis::AsyncConnectionConfig;
use redis::cluster::ClusterClientBuilder;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use crate::RedisConfigOptions;
use crate::redis;
use crate::redis::aio::MultiplexedConnection;
use crate::redis::cluster_async::ClusterConnection;
Expand Down Expand Up @@ -41,9 +44,10 @@ impl<C> TrackedConnection<C> {
/// from the pool.
///
/// An `Ok` result never leads to the connection being detached.
/// A [`RedisError`] leads to the connection being detached if it is unrecoverable.
/// A [`RedisError`] leads to the connection being detached if it is either unrecoverable or a timeout.
/// In case of timeout, we would rather close a potentially dead connection and establish a new one.
fn should_be_detached<T>(result: Result<T, &RedisError>) -> bool {
result.is_err_and(|error| error.is_unrecoverable_error())
result.is_err_and(|error| error.is_unrecoverable_error() || error.is_timeout())
}
}

Expand Down Expand Up @@ -135,9 +139,9 @@ impl redis::aio::ConnectionLike for CustomClusterConnection {
/// This manager handles the creation and recycling of Redis cluster connections,
/// ensuring proper connection health through periodic PING checks. It supports both
/// primary and replica nodes, with optional read-from-replicas functionality.
#[derive(Debug)]
pub struct CustomClusterManager {
manager: ClusterManager,
client: redis::cluster::ClusterClient,
ping_number: AtomicUsize,
recycle_check_frequency: usize,
}

Expand All @@ -149,11 +153,21 @@ impl CustomClusterManager {
pub fn new<T: IntoConnectionInfo>(
params: Vec<T>,
read_from_replicas: bool,
recycle_check_frequency: usize,
options: RedisConfigOptions,
) -> RedisResult<Self> {
let mut client = ClusterClientBuilder::new(params);

if read_from_replicas {
client = client.read_from_replicas();
}
if let Some(response_timeout) = options.response_timeout {
client = client.response_timeout(Duration::from_secs(response_timeout));
}

Ok(Self {
manager: ClusterManager::new(params, read_from_replicas)?,
recycle_check_frequency,
client: client.build()?,
ping_number: AtomicUsize::new(0),
recycle_check_frequency: options.recycle_check_frequency,
})
}
}
Expand All @@ -163,7 +177,8 @@ impl Manager for CustomClusterManager {
type Error = RedisError;

async fn create(&self) -> Result<TrackedConnection<ClusterConnection>, RedisError> {
self.manager.create().await.map(TrackedConnection::from)
let conn = self.client.get_async_connection().await?;
Ok(TrackedConnection::new(conn))
}

async fn recycle(
Expand All @@ -188,7 +203,17 @@ impl Manager for CustomClusterManager {
return Ok(());
}

self.manager.recycle(conn, metrics).await
// Copied from deadpool_redis::cluster::Manager
let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
let n = redis::cmd("PING")
.arg(&ping_number)
.query_async::<String>(conn)
.await?;
if n == ping_number {
Ok(())
} else {
Err(RecycleError::message("Invalid PING response"))
}
}
}

Expand Down Expand Up @@ -229,7 +254,9 @@ impl redis::aio::ConnectionLike for CustomSingleConnection {
/// ensuring proper connection health through periodic PING checks. It supports
/// multiplexed connections for efficient handling of multiple operations.
pub struct CustomSingleManager {
manager: SingleManager,
client: redis::Client,
ping_number: AtomicUsize,
connection_config: AsyncConnectionConfig,
recycle_check_frequency: usize,
}

Expand All @@ -238,13 +265,17 @@ impl CustomSingleManager {
///
/// The manager will establish and maintain connections to the specified Redis
/// instance, handling connection lifecycle and health checks.
pub fn new<T: IntoConnectionInfo>(
params: T,
recycle_check_frequency: usize,
) -> RedisResult<Self> {
pub fn new<T: IntoConnectionInfo>(params: T, options: RedisConfigOptions) -> RedisResult<Self> {
let mut connection_config = AsyncConnectionConfig::new();
if let Some(response_timeout) = options.response_timeout {
connection_config =
connection_config.set_response_timeout(Duration::from_secs(response_timeout));
}
Ok(Self {
manager: SingleManager::new(params)?,
recycle_check_frequency,
client: redis::Client::open(params)?,
ping_number: AtomicUsize::new(0),
connection_config,
recycle_check_frequency: options.recycle_check_frequency,
})
}
}
Expand All @@ -254,7 +285,10 @@ impl Manager for CustomSingleManager {
type Error = RedisError;

async fn create(&self) -> Result<TrackedConnection<MultiplexedConnection>, RedisError> {
self.manager.create().await.map(TrackedConnection::from)
self.client
.get_multiplexed_async_connection_with_config(&self.connection_config)
.await
.map(TrackedConnection::from)
}

async fn recycle(
Expand All @@ -279,7 +313,21 @@ impl Manager for CustomSingleManager {
return Ok(());
}

self.manager.recycle(conn, metrics).await
// Copied from deadpool_redis::Manager::recycle
let ping_number = self.ping_number.fetch_add(1, Ordering::Relaxed).to_string();
// Using pipeline to avoid roundtrip for UNWATCH
let (n,) = redis::Pipeline::with_capacity(2)
.cmd("UNWATCH")
.ignore()
.cmd("PING")
.arg(&ping_number)
.query_async::<(String,)>(conn)
.await?;
if n == ping_number {
Ok(())
} else {
Err(RecycleError::message("Invalid PING response"))
}
}
}

Expand Down
12 changes: 4 additions & 8 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use deadpool::Runtime;
use deadpool::managed::{BuildError, Manager, Metrics, Object, Pool, PoolError};
use deadpool_redis::{ConfigError, Runtime};
use redis::{Cmd, Pipeline, RedisFuture, Value};
use std::time::Duration;
use thiserror::Error;
Expand Down Expand Up @@ -34,10 +34,6 @@ pub enum RedisError {
/// An error that occurs when creating a Redis connection pool.
#[error("failed to create redis pool: {0}")]
CreatePool(#[from] BuildError),

/// An error that occurs when configuring Redis.
#[error("failed to configure redis: {0}")]
ConfigError(#[from] ConfigError),
Comment on lines -37 to -40
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variant and From impl were already unused.

}

/// A collection of Redis clients used by Relay for different purposes.
Expand Down Expand Up @@ -104,7 +100,7 @@ impl AsyncRedisClient {

// We use our custom cluster manager which performs recycling in a different way from the
// default manager.
let manager = pool::CustomClusterManager::new(servers, false, opts.recycle_check_frequency)
let manager = pool::CustomClusterManager::new(servers, false, opts.clone())
.map_err(RedisError::Redis)?;

let pool = Self::build_pool(manager, opts)?;
Expand All @@ -122,8 +118,8 @@ impl AsyncRedisClient {
pub fn single(server: &str, opts: &RedisConfigOptions) -> Result<Self, RedisError> {
// We use our custom single manager which performs recycling in a different way from the
// default manager.
let manager = pool::CustomSingleManager::new(server, opts.recycle_check_frequency)
.map_err(RedisError::Redis)?;
let manager =
pool::CustomSingleManager::new(server, opts.clone()).map_err(RedisError::Redis)?;

let pool = Self::build_pool(manager, opts)?;

Expand Down
2 changes: 1 addition & 1 deletion relay-redis/src/scripts.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use deadpool_redis::redis::Script;
use redis::Script;
use std::sync::OnceLock;

/// A collection of static methods to load predefined Redis scripts.
Expand Down
Loading