Skip to content

Commit 49a8bd0

Browse files
committed
Implement graceful switch balancer other LB-related changes.
General: - Add Debug to many traits and derive/impl in structs. - Pass LB config to LB policies via `Option<LbConfig>` instead of `Option<&LbConfig>`. It should be rare that policies want to store a config except for the leaf policy. Child manager: The original assumption was that all children would be the same type/configuration, but several policies (including gracefulswitch) will not have that property. So, several changes are made: - Children are considered unique by both their identifier and their LbPolicyBuilder's name(). - Make it so the sharder also can shard LbConfig and provide it via the ChildUpdate.child_update field in addition to the ResolverUpdate. - Make ResolverUpdateSharder a generic instead of Box<dyn>. - Add booleans so users of child manager can easily easily tell whether any child policies updated themselves, and which ones did. - Pass &mut self for sharder so that it can maintain and update its state if needed. - Change the sharder's output ChildUpdate.child_update field to an Option; if None then the child will not be called during the resolver update, but will remain in the child manager. - Change child_states into children and provide the whole Child struct, exposing the fields it contains. - Provide mutable access to the sharder. - Change the LB config to be a flat JSON array to facilitate use within another LB policy that should not need a struct to contain on the children. - Minor test cleanups Graceful switch: The previous implementation in hyperium#2399 contained a lot of logic to manage child policy delegation. It was intended that only ChildManager should need to have this kind of logic. - Create a new implementation of this policy that delegates to ChildManager. - Uses a Sharder that simply emits the active policy with no update alongside any new policy in the new LbConfig. - maybe_swap is called after every call into the ChildManager to determine if child updates necessitate a swap. - This logic is simple: if the active policy is not Ready, or if there is a new policy and it is not Connecting, then set the new policy as the active policy and call resolver_update on the ChildManager. The sharder will see that no LbConfig is provided and just emit the active policy with no config, causing the ChildManager to drop the previously active policy. If no swap is needed, update the picker of the active policy if it had an update. - Minor test cleanups/fixes vs. hyperium#2399.
1 parent 6b2c2c7 commit 49a8bd0

File tree

10 files changed

+408
-541
lines changed

10 files changed

+408
-541
lines changed

grpc/src/client/channel.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ impl load_balancing::ChannelController for InternalChannelController {
453453
}
454454

455455
// A channel that is not idle (connecting, ready, or erroring).
456+
#[derive(Debug)]
456457
pub(super) struct GracefulSwitchBalancer {
457458
pub(super) policy: Mutex<Option<Box<dyn LbPolicy>>>,
458459
policy_builder: Mutex<Option<Arc<dyn LbPolicyBuilder>>>,
@@ -529,7 +530,7 @@ impl GracefulSwitchBalancer {
529530

530531
p.as_mut()
531532
.unwrap()
532-
.resolver_update(update, config.as_ref(), controller)
533+
.resolver_update(update, config, controller)
533534

534535
// TODO: close old LB policy gracefully vs. drop?
535536
}

grpc/src/client/load_balancing/child_manager.rs

Lines changed: 124 additions & 71 deletions
Large diffs are not rendered by default.

grpc/src/client/load_balancing/graceful_switch.rs

Lines changed: 234 additions & 447 deletions
Large diffs are not rendered by default.

grpc/src/client/load_balancing/mod.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub struct LbPolicyOptions {
7575
/// Used to asynchronously request a call into the LbPolicy's work method if
7676
/// the LbPolicy needs to provide an update without waiting for an update
7777
/// from the channel first.
78-
pub trait WorkScheduler: Send + Sync {
78+
pub trait WorkScheduler: Send + Sync + Debug {
7979
// Schedules a call into the LbPolicy's work method. If there is already a
8080
// pending work call that has not yet started, this may not schedule another
8181
// call.
@@ -123,7 +123,7 @@ impl ParsedJsonLbConfig {
123123

124124
/// An LB policy factory that produces LbPolicy instances used by the channel
125125
/// to manage connections and pick connections for RPCs.
126-
pub(crate) trait LbPolicyBuilder: Send + Sync {
126+
pub(crate) trait LbPolicyBuilder: Send + Sync + Debug {
127127
/// Builds and returns a new LB policy instance.
128128
///
129129
/// Note that build must not fail. Any optional configuration is delivered
@@ -153,13 +153,13 @@ pub(crate) trait LbPolicyBuilder: Send + Sync {
153153
/// LB policies are responsible for creating connections (modeled as
154154
/// Subchannels) and producing Picker instances for picking connections for
155155
/// RPCs.
156-
pub trait LbPolicy: Send {
156+
pub trait LbPolicy: Send + Debug {
157157
/// Called by the channel when the name resolver produces a new set of
158158
/// resolved addresses or a new service config.
159159
fn resolver_update(
160160
&mut self,
161161
update: ResolverUpdate,
162-
config: Option<&LbConfig>,
162+
config: Option<LbConfig>,
163163
channel_controller: &mut dyn ChannelController,
164164
) -> Result<(), Box<dyn Error + Send + Sync>>;
165165

@@ -246,7 +246,7 @@ impl Display for SubchannelState {
246246
///
247247
/// If the ConnectivityState is TransientFailure, the Picker should return an
248248
/// Err with an error that describes why connections are failing.
249-
pub trait Picker: Send + Sync {
249+
pub trait Picker: Send + Sync + Debug {
250250
/// Picks a connection to use for the request.
251251
///
252252
/// This function should not block. If the Picker needs to do blocking or
@@ -256,6 +256,7 @@ pub trait Picker: Send + Sync {
256256
fn pick(&self, request: &Request) -> PickResult;
257257
}
258258

259+
#[derive(Debug)]
259260
pub enum PickResult {
260261
/// Indicates the Subchannel in the Pick should be used for the request.
261262
Pick(Pick),
@@ -317,7 +318,7 @@ impl Display for PickResult {
317318
}
318319
}
319320
/// Data provided by the LB policy.
320-
#[derive(Clone)]
321+
#[derive(Clone, Debug)]
321322
pub struct LbState {
322323
pub connectivity_state: super::ConnectivityState,
323324
pub picker: Arc<dyn Picker>,
@@ -347,6 +348,16 @@ pub struct Pick {
347348
pub on_complete: Option<CompletionCallback>,
348349
}
349350

351+
impl Debug for Pick {
352+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
353+
f.debug_struct("Pick")
354+
.field("subchannel", &self.subchannel)
355+
.field("metadata", &self.metadata)
356+
.field("on_complete", &format_args!("{:p}", &self.on_complete))
357+
.finish()
358+
}
359+
}
360+
350361
pub trait DynHash {
351362
#[allow(clippy::redundant_allocation)]
352363
fn dyn_hash(&self, state: &mut Box<&mut dyn Hasher>);
@@ -438,6 +449,7 @@ impl Display for dyn Subchannel {
438449
}
439450
}
440451

452+
#[derive(Debug)]
441453
struct WeakSubchannel(Weak<dyn Subchannel>);
442454

443455
impl From<Arc<dyn Subchannel>> for WeakSubchannel {
@@ -580,6 +592,7 @@ impl<T: ForwardingSubchannel> private::Sealed for T {}
580592

581593
/// QueuingPicker always returns Queue. LB policies that are not actively
582594
/// Connecting should not use this picker.
595+
#[derive(Debug)]
583596
pub struct QueuingPicker {}
584597

585598
impl Picker for QueuingPicker {
@@ -588,6 +601,7 @@ impl Picker for QueuingPicker {
588601
}
589602
}
590603

604+
#[derive(Debug)]
591605
pub struct Failing {
592606
pub error: String,
593607
}

grpc/src/client/load_balancing/pick_first.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use super::{
2323

2424
pub static POLICY_NAME: &str = "pick_first";
2525

26+
#[derive(Debug)]
2627
struct Builder {}
2728

2829
impl LbPolicyBuilder for Builder {
@@ -44,6 +45,7 @@ pub fn reg() {
4445
super::GLOBAL_LB_REGISTRY.add_builder(Builder {})
4546
}
4647

48+
#[derive(Debug)]
4749
struct PickFirstPolicy {
4850
work_scheduler: Arc<dyn WorkScheduler>,
4951
subchannel: Option<Arc<dyn Subchannel>>,
@@ -55,7 +57,7 @@ impl LbPolicy for PickFirstPolicy {
5557
fn resolver_update(
5658
&mut self,
5759
update: ResolverUpdate,
58-
config: Option<&LbConfig>,
60+
config: Option<LbConfig>,
5961
channel_controller: &mut dyn ChannelController,
6062
) -> Result<(), Box<dyn Error + Send + Sync>> {
6163
let mut addresses = update
@@ -108,6 +110,7 @@ impl LbPolicy for PickFirstPolicy {
108110
}
109111
}
110112

113+
#[derive(Debug)]
111114
struct OneSubchannelPicker {
112115
sc: Arc<dyn Subchannel>,
113116
}
@@ -116,8 +119,9 @@ impl Picker for OneSubchannelPicker {
116119
fn pick(&self, request: &Request) -> PickResult {
117120
PickResult::Pick(Pick {
118121
subchannel: self.sc.clone(),
119-
on_complete: None,
122+
// on_complete: None,
120123
metadata: MetadataMap::new(),
124+
on_complete: None,
121125
})
122126
}
123127
}

grpc/src/client/load_balancing/test_utils.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl ChannelController for TestChannelController {
149149
}
150150
}
151151

152+
#[derive(Debug)]
152153
pub(crate) struct TestWorkScheduler {
153154
pub(crate) tx_events: mpsc::UnboundedSender<TestEvent>,
154155
}
@@ -164,7 +165,7 @@ type ResolverUpdateFn = Arc<
164165
dyn Fn(
165166
&mut StubPolicyData,
166167
ResolverUpdate,
167-
Option<&LbConfig>,
168+
Option<LbConfig>,
168169
&mut dyn ChannelController,
169170
) -> Result<(), Box<dyn Error + Send + Sync>>
170171
+ Send
@@ -186,7 +187,14 @@ pub struct StubPolicyFuncs {
186187
pub subchannel_update: Option<SubchannelUpdateFn>,
187188
}
188189

190+
impl Debug for StubPolicyFuncs {
191+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192+
write!(f, "stub funcs")
193+
}
194+
}
195+
189196
/// Data holds test data that will be passed all to functions in PolicyFuncs
197+
#[derive(Debug)]
190198
pub struct StubPolicyData {
191199
pub test_data: Option<Box<dyn Any + Send + Sync>>,
192200
}
@@ -199,6 +207,7 @@ impl StubPolicyData {
199207
}
200208

201209
/// The stub `LbPolicy` that calls the provided functions.
210+
#[derive(Debug)]
202211
pub struct StubPolicy {
203212
funcs: StubPolicyFuncs,
204213
data: StubPolicyData,
@@ -208,7 +217,7 @@ impl LbPolicy for StubPolicy {
208217
fn resolver_update(
209218
&mut self,
210219
update: ResolverUpdate,
211-
config: Option<&LbConfig>,
220+
config: Option<LbConfig>,
212221
channel_controller: &mut dyn ChannelController,
213222
) -> Result<(), Box<dyn Error + Send + Sync>> {
214223
if let Some(f) = &mut self.funcs.resolver_update {
@@ -238,6 +247,7 @@ impl LbPolicy for StubPolicy {
238247
}
239248

240249
/// StubPolicyBuilder builds a StubLbPolicy.
250+
#[derive(Debug)]
241251
pub struct StubPolicyBuilder {
242252
name: &'static str,
243253
funcs: StubPolicyFuncs,

grpc/src/client/name_resolution/dns/test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ pub async fn invalid_target() {
252252
.contains(&target.to_string()));
253253
}
254254

255-
#[derive(Clone)]
255+
#[derive(Clone, Debug)]
256256
struct FakeDns {
257257
latency: Duration,
258258
lookup_result: Result<Vec<std::net::IpAddr>, String>,
@@ -270,6 +270,7 @@ impl rt::DnsResolver for FakeDns {
270270
}
271271
}
272272

273+
#[derive(Debug)]
273274
struct FakeRuntime {
274275
inner: TokioRuntime,
275276
dns: FakeDns,

grpc/src/client/service_config.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,21 @@ use std::{any::Any, error::Error, sync::Arc};
2929
pub(crate) struct ServiceConfig;
3030

3131
/// A convenience wrapper for an LB policy's configuration object.
32-
#[derive(Debug)]
32+
#[derive(Debug, Clone)]
3333
pub(crate) struct LbConfig {
3434
config: Arc<dyn Any + Send + Sync>,
3535
}
3636

3737
impl LbConfig {
3838
/// Create a new LbConfig wrapper containing the provided config.
39-
pub fn new<T: 'static + Send + Sync>(config: T) -> Self {
39+
pub fn new(config: impl Any + Send + Sync) -> Self {
4040
LbConfig {
4141
config: Arc::new(config),
4242
}
4343
}
4444

4545
/// Convenience method to extract the LB policy's configuration object.
46-
pub fn convert_to<T: 'static + Send + Sync>(
47-
&self,
48-
) -> Result<Arc<T>, Box<dyn Error + Send + Sync>> {
49-
match self.config.clone().downcast::<T>() {
50-
Ok(c) => Ok(c),
51-
Err(e) => Err("failed to downcast to config type".into()),
52-
}
46+
pub fn convert_to<T: 'static + Send + Sync>(&self) -> Option<Arc<T>> {
47+
self.config.clone().downcast::<T>().ok()
5348
}
5449
}

grpc/src/rt/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
*/
2424

2525
use ::tokio::io::{AsyncRead, AsyncWrite};
26+
use std::fmt::Debug;
2627
use std::{future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
2728

2829
pub(crate) mod hyper_wrapper;
@@ -39,7 +40,7 @@ pub(crate) type BoxedTaskHandle = Box<dyn TaskHandle>;
3940
/// time-based operations such as sleeping. It provides a uniform interface
4041
/// that can be implemented for various async runtimes, enabling pluggable
4142
/// and testable infrastructure.
42-
pub(super) trait Runtime: Send + Sync {
43+
pub(super) trait Runtime: Send + Sync + Debug {
4344
/// Spawns the given asynchronous task to run in the background.
4445
fn spawn(&self, task: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) -> BoxedTaskHandle;
4546

@@ -98,7 +99,7 @@ pub(crate) trait TcpStream: AsyncRead + AsyncWrite + Send + Unpin {}
9899
/// # Panics
99100
///
100101
/// Panics if any of its functions are called.
101-
#[derive(Default)]
102+
#[derive(Default, Debug)]
102103
pub(crate) struct NoOpRuntime {}
103104

104105
impl Runtime for NoOpRuntime {

grpc/src/rt/tokio/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ impl DnsResolver for TokioDefaultDnsResolver {
6464
}
6565
}
6666

67+
#[derive(Debug)]
6768
pub(crate) struct TokioRuntime {}
6869

6970
impl TaskHandle for JoinHandle<()> {

0 commit comments

Comments
 (0)