Skip to content

Commit a76ed88

Browse files
committed
fix child manager
1 parent 1bdeab1 commit a76ed88

File tree

2 files changed

+414
-128
lines changed

2 files changed

+414
-128
lines changed

grpc/src/client/load_balancing/child_manager.rs

Lines changed: 33 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
// policy in use. Complete tests must be written before it can be used in
2424
// production. Also, support for the work scheduler is missing.
2525

26+
use std::{fmt::Debug, sync::atomic::{AtomicUsize, Ordering}};
2627
use std::collections::HashSet;
2728
use std::fmt::Display;
2829
use std::sync::Mutex;
@@ -47,19 +48,14 @@ pub struct ChildManager<T> {
4748
children: Vec<Child<T>>,
4849
update_sharder: Box<dyn ResolverUpdateSharder<T>>,
4950
pending_work: Arc<Mutex<HashSet<usize>>>,
50-
updated: bool, // true iff a child has updated its state since the last call to has_updated.
51-
// work_requests: Arc<Mutex<HashSet<Arc<T>>>>,
52-
// work_scheduler: Arc<dyn WorkScheduler>,
51+
updated: bool, // true if a child has updated its state since the last call to has_updated.
5352
sent_connecting_state: bool,
5453
aggregated_state: ConnectivityState,
5554
last_ready_pickers: Vec<Arc<dyn Picker>>,
56-
57-
}
58-
59-
use std::{sync::{atomic::{AtomicUsize, Ordering}}};
6055

61-
pub trait ChildIdentifier: PartialEq + Hash + Eq + Send + Sync + Display + 'static {}
56+
}
6257

58+
pub trait ChildIdentifier: PartialEq + Hash + Eq + Send + Sync + Debug + 'static {}
6359

6460
struct Child<T> {
6561
identifier: T,
@@ -114,13 +110,11 @@ impl<T: ChildIdentifier> ChildManager<T> {
114110
.iter()
115111
.map(|child| (&child.identifier, &child.state))
116112
}
117-
113+
118114
pub fn has_updated(&mut self) -> bool {
119115
mem::take(&mut self.updated)
120116
}
121117

122-
123-
124118
// Called to update all accounting in the ChildManager from operations
125119
// performed by a child policy on the WrappedController that was created for
126120
// it. child_idx is an index into the children map for the relevant child.
@@ -130,7 +124,7 @@ impl<T: ChildIdentifier> ChildManager<T> {
130124
// which way is better.
131125
fn resolve_child_controller(
132126
&mut self,
133-
channel_controller: &mut WrappedController,
127+
channel_controller: &mut WrappedController,
134128
child_idx: usize,
135129
) {
136130
// Add all created subchannels into the subchannel_child_map.
@@ -143,51 +137,47 @@ impl<T: ChildIdentifier> ChildManager<T> {
143137
self.updated = true;
144138
};
145139

146-
147140
if self.has_updated() {
148141
self.aggregate_states(channel_controller);
149-
150142
}
151-
152-
153143
}
144+
154145
// Called to aggregate states from pick first children. Sends a picker to
155146
// the channel based on aggregation
156147
fn aggregate_states (& mut self, channel_controller: &mut WrappedController) {
157148
let current_connectivity_state = self.aggregated_state.clone();
158149
let child_states_vec: Vec<_> = self.child_states().collect();
159150
// Constructing pickers to return
160-
let mut transient_failure_picker = TransientFailurePickers::new("error string I guess".to_string());
161-
let mut connecting_pickers = ConnectingPickers::new();
151+
// let mut transient_failure_picker = TransientFailurePickers::new("error string I guess".to_string());
152+
// let mut connecting_pickers = ConnectingPickers::new();
162153
let mut ready_pickers = ReadyPickers::new();
163154

164155
let mut has_idle = false;
156+
let mut has_connecting = false;
165157
let mut is_transient_failure = true;
166158

167159
for (child_id, state) in &child_states_vec {
168160
match state.connectivity_state {
169161
ConnectivityState::Idle => {
170-
connecting_pickers.add_picker(state.picker.clone());
171162
has_idle = true;
163+
has_connecting = true;
172164
is_transient_failure = false;
173165
}
174166
ConnectivityState::Connecting => {
175-
connecting_pickers.add_picker(state.picker.clone());
167+
has_connecting = true;
176168
is_transient_failure = false;
177169
}
178170
ConnectivityState::Ready => {
179171
ready_pickers.add_picker(state.picker.clone());
180172
is_transient_failure = false;
181173
}
182-
ConnectivityState::TransientFailure =>{
183-
transient_failure_picker.add_picker(state.picker.clone());
174+
_ =>{
175+
184176
}
185177
}
186178
}
187179

188180
let has_ready = ready_pickers.has_any();
189-
let has_connecting = connecting_pickers.pickers.len() >= 1;
190-
191181

192182
// Decide the new aggregate state
193183
let new_state = if has_ready {
@@ -202,16 +192,13 @@ impl<T: ChildIdentifier> ChildManager<T> {
202192

203193
self.aggregated_state = new_state;
204194

205-
206-
207195
// Now update state and send picker as appropriate
208196
match new_state {
209197
ConnectivityState::Ready => {
210198
let pickers_vec = ready_pickers.pickers.clone();
211199
let picker: Arc<dyn Picker> = Arc::new(ready_pickers);
212200
let should_update = !self.compare_prev_to_new_pickers(&self.last_ready_pickers, &pickers_vec);
213201

214-
215202
if should_update || self.aggregated_state != ConnectivityState::Ready {
216203
println!("child manager sends ready picker update");
217204
self.aggregated_state = ConnectivityState::Ready;
@@ -234,14 +221,13 @@ impl<T: ChildIdentifier> ChildManager<T> {
234221
}
235222
if !self.sent_connecting_state {
236223
println!("child manager sends connecting picker update");
237-
224+
238225
channel_controller.need_to_reach();
239226
let picker = Arc::new(QueuingPicker{});
240227
self.move_to_connecting(has_idle, channel_controller, picker);
241228
} else{
242229
println!("child manager is not sending connecting picker as it has alrady sent connecting state");
243230
}
244-
// Don't send another Connecting picker if already sent
245231
}
246232
ConnectivityState::Idle => {
247233
self.aggregated_state = ConnectivityState::Connecting;
@@ -251,57 +237,48 @@ impl<T: ChildIdentifier> ChildManager<T> {
251237
connectivity_state: ConnectivityState::Connecting,
252238
picker,
253239
});
254-
255240
self.sent_connecting_state = true;
256241
}
257242
ConnectivityState::TransientFailure => {
258243
if current_connectivity_state != ConnectivityState::TransientFailure{
259244
println!("child manager sends transient failure picker update");
260-
let picker = Arc::new(transient_failure_picker);
261245
channel_controller.need_to_reach();
246+
let picker = Arc::new(Failing {error: "No children available".to_string()});
262247
self.move_to_transient_failure(channel_controller, picker);
263248
}
264249
}
265-
250+
266251
}
267252
}
268-
269-
270253
}
271254

272255
impl<T: ChildIdentifier> ChildManager<T> {
273256
fn move_to_transient_failure(&mut self, channel_controller: &mut dyn ChannelController, picker: Arc<dyn Picker>) {
274257
self.aggregated_state = ConnectivityState::TransientFailure;
275258
channel_controller.update_picker(LbState {
276-
connectivity_state: ConnectivityState::TransientFailure,
259+
connectivity_state: ConnectivityState::TransientFailure,
277260
picker: picker,
278261
});
279262
println!("requesting resolution");
280263
channel_controller.request_resolution();
281264
self.sent_connecting_state = false;
282265
}
283266

284-
285-
286267
fn move_to_connecting(&mut self, is_idle: bool, channel_controller: &mut dyn ChannelController, picker: Arc<dyn Picker>) {
287268
self.aggregated_state = ConnectivityState::Connecting;
288269
channel_controller.update_picker(LbState {
289270
connectivity_state: ConnectivityState::Connecting,
290271
picker: picker,
291272
});
292273
self.sent_connecting_state = true;
293-
// if is_idle {
294-
// println!("requesting resolution");
295-
// channel_controller.request_resolution();
296-
// }
297274
}
298275

299276
fn compare_prev_to_new_pickers(& self, old_pickers: &[Arc<dyn Picker>], new_pickers: &[Arc<dyn Picker>]) -> bool {
300277
//if length is different, then definitely not the same picker
301278
if old_pickers.len() != new_pickers.len() {
302279
return false;
303280
}
304-
//compares two vectors of pickers by pointer equality and returns true if all pickers are the same
281+
//compares two vectors of pickers by pointer equality and returns true if all pickers are the same
305282
for (x, y) in old_pickers.iter().zip(new_pickers.iter()) {
306283
if !Arc::ptr_eq(x, y) {
307284
return false;
@@ -423,7 +400,6 @@ impl<T: ChildIdentifier> LbPolicy for ChildManager<T> {
423400
}
424401
Ok(())
425402
}
426-
427403

428404
fn subchannel_update(
429405
&mut self,
@@ -454,14 +430,16 @@ impl<T: ChildIdentifier> LbPolicy for ChildManager<T> {
454430
self.resolve_child_controller(&mut channel_controller, child_idx);
455431
}
456432
}
457-
433+
458434
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
459-
todo!()
460-
// let policy = &mut self.children.get_mut(&child_id.clone()).unwrap().policy;
461-
// let mut channel_controller = WrappedController::new(channel_controller);
462-
// // Call the proper child.
463-
// policy.exit_idle(&mut channel_controller);
464-
// self.resolve_child_controller(channel_controller, child_id.clone());
435+
let child_idxes = mem::take(&mut *self.pending_work.lock().unwrap());
436+
for child_idx in child_idxes {
437+
let mut channel_controller = WrappedController::new(channel_controller);
438+
self.children[child_idx]
439+
.policy
440+
.exit_idle(&mut channel_controller);
441+
self.resolve_child_controller(&mut channel_controller, child_idx);
442+
}
465443
}
466444
}
467445

@@ -495,12 +473,12 @@ impl ChannelController for WrappedController<'_> {
495473
}
496474

497475
fn update_picker(&mut self, update: LbState) {
498-
self.picker_update = Some(update.clone());
499-
if self.need_to_reach{
500-
self.channel_controller.update_picker(update);
501-
self.need_to_reach = false;
502-
}
476+
self.picker_update = Some(update.clone());
477+
if self.need_to_reach{
478+
self.channel_controller.update_picker(update);
479+
self.need_to_reach = false;
503480
}
481+
}
504482

505483
fn request_resolution(&mut self) {
506484
self.channel_controller.request_resolution();
@@ -526,8 +504,6 @@ struct ReadyPickers {
526504
next: AtomicUsize,
527505
}
528506

529-
530-
531507
impl ReadyPickers {
532508
pub fn new() -> Self {
533509
Self {
@@ -543,8 +519,6 @@ impl ReadyPickers {
543519
pub fn has_any(&self) -> bool {
544520
!self.pickers.is_empty()
545521
}
546-
547-
548522
}
549523

550524
impl Picker for ReadyPickers {
@@ -556,74 +530,5 @@ impl Picker for ReadyPickers {
556530
let idx = self.next.fetch_add(1, Ordering::Relaxed) % len;
557531
self.pickers[idx].pick(request)
558532
}
559-
560-
561-
}
562-
struct ConnectingPickers {
563-
pickers: Vec<Arc<dyn Picker>>,
564-
// next: AtomicUsize,
565-
}
566-
567-
impl Picker for ConnectingPickers {
568-
fn pick(&self, request: &Request) -> PickResult {
569-
570-
return PickResult::Queue;
571-
572-
}
573-
}
574-
575-
impl ConnectingPickers {
576-
pub fn new() -> Self {
577-
Self {
578-
pickers: vec![],
579-
}
580-
}
581-
582-
fn add_picker(&mut self, picker: Arc<dyn Picker>) {
583-
self.pickers.push(picker);
584-
}
585-
}
586-
587-
struct TransientFailurePickers {
588-
pickers: Vec<Arc<dyn Picker>>,
589-
error: String,
590-
591-
}
592-
593-
impl Picker for TransientFailurePickers {
594-
fn pick(&self, request: &Request) -> PickResult {
595-
596-
return PickResult::Fail(Status::unavailable(self.error.clone()))
597-
598-
}
599-
}
600-
601-
impl TransientFailurePickers {
602-
pub fn new(error: String) -> Self {
603-
Self {
604-
pickers: vec![],
605-
error,
606-
}
607-
}
608-
609-
fn add_picker(&mut self, picker: Arc<dyn Picker>) {
610-
self.pickers.push(picker);
611-
}
612-
}
613-
struct IdlePickers {
614-
pickers: Vec<Arc<dyn Picker>>,
615-
work_scheduler: Arc<dyn WorkScheduler>,
616-
// next: AtomicUsize,
617-
}
618-
619-
struct SchedulingIdlePicker {
620-
work_scheduler: Arc<dyn WorkScheduler>,
621-
}
622-
623-
impl Picker for SchedulingIdlePicker {
624-
fn pick(&self, _request: &Request) -> PickResult {
625-
self.work_scheduler.schedule_work();
626-
PickResult::Queue
627-
}
628533
}
629534

0 commit comments

Comments
 (0)