Skip to content

Commit f5e009a

Browse files
committed
fix: fix campaign reset not clearing leader state-s
Signed-off-by: WenyXu <[email protected]>
1 parent 7f7223a commit f5e009a

File tree

5 files changed

+67
-15
lines changed

5 files changed

+67
-15
lines changed

src/meta-srv/src/election/rds/mysql.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
1717
use std::time::Duration;
1818

1919
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20-
use common_telemetry::{error, warn};
20+
use common_telemetry::{error, info, warn};
2121
use common_time::Timestamp;
2222
use snafu::{OptionExt, ResultExt, ensure};
2323
use sqlx::mysql::{MySqlArguments, MySqlRow};
@@ -645,6 +645,13 @@ impl Election for MySqlElection {
645645
}
646646

647647
async fn reset_campaign(&self) {
648+
info!("Resetting campaign");
649+
if self.is_leader.load(Ordering::Relaxed) {
650+
if let Err(err) = self.step_down_without_lock().await {
651+
error!(err; "Failed to step down without lock");
652+
}
653+
info!("Step down without lock successfully, due to reset campaign");
654+
}
648655
if let Err(err) = self.client.lock().await.reset_client().await {
649656
error!(err; "Failed to reset client");
650657
}

src/meta-srv/src/election/rds/postgres.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
1717
use std::time::Duration;
1818

1919
use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20-
use common_telemetry::{error, warn};
20+
use common_telemetry::{error, info, warn};
2121
use common_time::Timestamp;
2222
use deadpool_postgres::{Manager, Pool};
2323
use snafu::{OptionExt, ResultExt, ensure};
@@ -477,6 +477,13 @@ impl Election for PgElection {
477477
}
478478

479479
async fn reset_campaign(&self) {
480+
info!("Resetting campaign");
481+
if self.is_leader.load(Ordering::Relaxed) {
482+
if let Err(err) = self.step_down_without_lock().await {
483+
error!(err; "Failed to step down without lock");
484+
}
485+
info!("Step down without lock successfully, due to reset campaign");
486+
}
480487
if let Err(err) = self.pg_client.write().await.reset_client().await {
481488
error!(err; "Failed to reset client");
482489
}
@@ -774,16 +781,12 @@ impl PgElection {
774781
key: key.clone(),
775782
..Default::default()
776783
};
777-
if self
778-
.is_leader
779-
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
780-
.is_ok()
781-
&& let Err(e) = self
782-
.leader_watcher
783-
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
784-
{
785-
error!(e; "Failed to send leader change message");
786-
}
784+
send_leader_change_and_set_flags(
785+
&self.is_leader,
786+
&self.leader_infancy,
787+
&self.leader_watcher,
788+
LeaderChangeMessage::StepDown(Arc::new(leader_key)),
789+
);
787790
Ok(())
788791
}
789792

src/meta-srv/src/metasrv/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ impl MetasrvBuilder {
373373
runtime_switch_manager.clone(),
374374
meta_peer_client.clone(),
375375
leader_cached_kv_backend.clone(),
376-
);
376+
)
377+
.with_state(state.clone());
377378

378379
Some(RegionFailureHandler::new(
379380
region_supervisor,

src/meta-srv/src/region/supervisor.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use crate::procedure::region_migration::{
5252
};
5353
use crate::region::failure_detector::RegionFailureDetector;
5454
use crate::selector::SelectorOptions;
55+
use crate::state::StateRef;
5556

5657
/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
5758
/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
@@ -129,6 +130,9 @@ pub struct RegionSupervisorTicker {
129130
/// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
130131
tick_handle: Mutex<Option<JoinHandle<()>>>,
131132

133+
/// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
134+
initialization_handler: Mutex<Option<JoinHandle<()>>>,
135+
132136
/// The interval of tick.
133137
tick_interval: Duration,
134138

@@ -172,6 +176,7 @@ impl RegionSupervisorTicker {
172176
);
173177
Self {
174178
tick_handle: Mutex::new(None),
179+
initialization_handler: Mutex::new(None),
175180
tick_interval,
176181
initialization_delay,
177182
initialization_retry_period,
@@ -192,7 +197,7 @@ impl RegionSupervisorTicker {
192197
self.initialization_retry_period,
193198
);
194199
initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
195-
common_runtime::spawn_global(async move {
200+
let initialization_handler = common_runtime::spawn_global(async move {
196201
loop {
197202
initialization_interval.tick().await;
198203
let (tx, rx) = oneshot::channel();
@@ -208,6 +213,7 @@ impl RegionSupervisorTicker {
208213
}
209214
}
210215
});
216+
*self.initialization_handler.lock().unwrap() = Some(initialization_handler);
211217

212218
let sender = self.sender.clone();
213219
let ticker_loop = tokio::spawn(async move {
@@ -237,6 +243,11 @@ impl RegionSupervisorTicker {
237243
handle.abort();
238244
info!("The tick loop is stopped.");
239245
}
246+
let initialization_handler = self.initialization_handler.lock().unwrap().take();
247+
if let Some(initialization_handler) = initialization_handler {
248+
initialization_handler.abort();
249+
info!("The initialization loop is stopped.");
250+
}
240251
}
241252
}
242253

@@ -280,6 +291,8 @@ pub struct RegionSupervisor {
280291
peer_resolver: PeerResolverRef,
281292
/// The kv backend.
282293
kv_backend: KvBackendRef,
294+
/// The meta state, used to check if the current metasrv is the leader.
295+
state: Option<StateRef>,
283296
}
284297

285298
/// Controller for managing failure detectors for regions.
@@ -363,12 +376,29 @@ impl RegionSupervisor {
363376
runtime_switch_manager,
364377
peer_resolver,
365378
kv_backend,
379+
state: None,
366380
}
367381
}
368382

383+
/// Sets the meta state.
384+
pub(crate) fn with_state(mut self, state: StateRef) -> Self {
385+
self.state = Some(state);
386+
self
387+
}
388+
369389
/// Runs the main loop.
370390
pub(crate) async fn run(&mut self) {
371391
while let Some(event) = self.receiver.recv().await {
392+
if let Some(state) = self.state.as_ref()
393+
&& !state.read().unwrap().is_leader()
394+
{
395+
warn!(
396+
"The current metasrv is not the leader, ignore {:?} event",
397+
event
398+
);
399+
continue;
400+
}
401+
372402
match event {
373403
Event::InitializeAllRegions(sender) => {
374404
match self.is_maintenance_mode_enabled().await {
@@ -403,7 +433,10 @@ impl RegionSupervisor {
403433
self.deregister_failure_detectors(detecting_regions).await
404434
}
405435
Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
406-
Event::Clear => self.clear(),
436+
Event::Clear => {
437+
self.clear();
438+
info!("Region supervisor is initialized.");
439+
}
407440
#[cfg(test)]
408441
Event::Dump(sender) => {
409442
let _ = sender.send(self.failure_detector.dump());
@@ -896,6 +929,7 @@ pub(crate) mod tests {
896929
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
897930
let ticker = RegionSupervisorTicker {
898931
tick_handle: Mutex::new(None),
932+
initialization_handler: Mutex::new(None),
899933
tick_interval: Duration::from_millis(10),
900934
initialization_delay: Duration::from_millis(100),
901935
initialization_retry_period: Duration::from_millis(100),
@@ -922,6 +956,7 @@ pub(crate) mod tests {
922956
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
923957
let ticker = RegionSupervisorTicker {
924958
tick_handle: Mutex::new(None),
959+
initialization_handler: Mutex::new(None),
925960
tick_interval: Duration::from_millis(1000),
926961
initialization_delay: Duration::from_millis(50),
927962
initialization_retry_period: Duration::from_millis(50),

src/meta-srv/src/state.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ impl State {
7575
})
7676
}
7777

78+
/// Returns true if the current state is a leader.
79+
pub fn is_leader(&self) -> bool {
80+
matches!(self, State::Leader(_))
81+
}
82+
83+
/// Returns true if the leader cache is enabled.
7884
pub fn enable_leader_cache(&self) -> bool {
7985
match &self {
8086
State::Leader(leader) => leader.enable_leader_cache,

0 commit comments

Comments
 (0)