Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/meta-srv/src/election/rds/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use common_time::Timestamp;
use snafu::{OptionExt, ResultExt, ensure};
use sqlx::mysql::{MySqlArguments, MySqlRow};
Expand Down Expand Up @@ -645,6 +645,13 @@ impl Election for MySqlElection {
}

async fn reset_campaign(&self) {
info!("Resetting campaign");
if self.is_leader.load(Ordering::Relaxed) {
if let Err(err) = self.step_down_without_lock().await {
error!(err; "Failed to step down without lock");
}
info!("Step down without lock successfully, due to reset campaign");
}
if let Err(err) = self.client.lock().await.reset_client().await {
error!(err; "Failed to reset client");
}
Expand Down
25 changes: 14 additions & 11 deletions src/meta-srv/src/election/rds/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use common_time::Timestamp;
use deadpool_postgres::{Manager, Pool};
use snafu::{OptionExt, ResultExt, ensure};
Expand Down Expand Up @@ -477,6 +477,13 @@ impl Election for PgElection {
}

async fn reset_campaign(&self) {
info!("Resetting campaign");
if self.is_leader.load(Ordering::Relaxed) {
if let Err(err) = self.step_down_without_lock().await {
error!(err; "Failed to step down without lock");
}
info!("Step down without lock successfully, due to reset campaign");
}
if let Err(err) = self.pg_client.write().await.reset_client().await {
error!(err; "Failed to reset client");
}
Expand Down Expand Up @@ -774,16 +781,12 @@ impl PgElection {
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
&& let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
send_leader_change_and_set_flags(
&self.is_leader,
&self.leader_infancy,
&self.leader_watcher,
LeaderChangeMessage::StepDown(Arc::new(leader_key)),
);
Ok(())
}

Expand Down
65 changes: 37 additions & 28 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_telemetry::error;
use store_api::region_engine::GrantedRegion;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -83,36 +84,44 @@ impl HeartbeatHandler for RegionLeaseHandler {
let regions = stat.regions();
let datanode_id = stat.id;

let RenewRegionLeasesResponse {
non_exists,
renewed,
} = self
match self
.region_lease_keeper
.renew_region_leases(datanode_id, &regions)
.await?;

let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
renewer
.renew(ctx, renewed)
.into_iter()
.map(|region| region.into())
.collect()
} else {
renewed
.into_iter()
.map(|(region_id, region_lease_info)| {
GrantedRegion::new(region_id, region_lease_info.role).into()
})
.collect::<Vec<_>>()
};

acc.region_lease = Some(RegionLease {
regions: renewed,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
});
acc.inactive_region_ids = non_exists;
.await
{
Ok(RenewRegionLeasesResponse {
non_exists,
renewed,
}) => {
let renewed = if let Some(renewer) = &self.customized_region_lease_renewer {
renewer
.renew(ctx, renewed)
.into_iter()
.map(|region| region.into())
.collect()
} else {
renewed
.into_iter()
.map(|(region_id, region_lease_info)| {
GrantedRegion::new(region_id, region_lease_info.role).into()
})
.collect::<Vec<_>>()
};

acc.region_lease = Some(RegionLease {
regions: renewed,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: self.region_lease_seconds,
closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(),
});
acc.inactive_region_ids = non_exists;
}
Err(e) => {
error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions);
// If we throw error here, the datanode will be marked as failure by region failure handler.
// So we only log the error and continue.
}
}

Ok(HandleControl::Continue)
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ impl MetasrvBuilder {
runtime_switch_manager.clone(),
meta_peer_client.clone(),
leader_cached_kv_backend.clone(),
);
)
.with_state(state.clone());

Some(RegionFailureHandler::new(
region_supervisor,
Expand Down
61 changes: 24 additions & 37 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_procedure::error::{
Expand Down Expand Up @@ -231,8 +231,6 @@ pub struct VolatileContext {
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
opening_region_guard: Option<OperatingRegionGuard>,
/// `table_route` is stored via previous steps for future use.
table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `datanode_table` is stored via previous steps for future use.
from_peer_datanode_table: Option<DatanodeTableValue>,
/// `table_info` is stored via previous steps for future use.
Expand Down Expand Up @@ -399,29 +397,23 @@ impl Context {
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_route_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
let table_route_value = &mut self.volatile_ctx.table_route;

if table_route_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
let table_route = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;

*table_route_value = Some(table_route);
}

Ok(table_route_value.as_ref().unwrap())
&self,
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
let table_id = self.persistent_ctx.region_id.table_id();
let table_route = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;

Ok(table_route)
}

/// Notifies the RegionSupervisor to register failure detectors of failed region.
Expand Down Expand Up @@ -463,12 +455,6 @@ impl Context {
.await;
}

/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route.take();
value.is_some()
}

/// Returns the `table_info` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
Expand Down Expand Up @@ -663,14 +649,13 @@ impl RegionMigrationProcedure {
})
}

async fn rollback_inner(&mut self) -> Result<()> {
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();

let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
self.context.remove_table_route_value();
let table_metadata_manager = self.context.table_metadata_manager.clone();
let table_route = self.context.get_table_route_value().await?;

Expand All @@ -683,9 +668,11 @@ impl RegionMigrationProcedure {
.any(|route| route.is_leader_downgrading());

if downgraded {
let table_lock = TableLock::Write(region_id.table_id()).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
info!("Rollbacking downgraded region leader table route, region: {region_id}");
table_metadata_manager
.update_leader_region_status(table_id, table_route, |route| {
.update_leader_region_status(table_id, &table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
Expand All @@ -712,8 +699,8 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}

async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner(ctx)
.await
.map_err(ProcedureError::external)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl UpdateMetadata {

// TODO(weny): ensures the leader region peer is the `from_peer`.
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if route.region.id == region_id
&& route
.leader_peer
Expand All @@ -61,16 +61,13 @@ impl UpdateMetadata {
.await
.context(error::TableMetadataManagerSnafu)
{
ctx.remove_table_route_value();
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}"
),
});
}

ctx.remove_table_route_value();

Ok(())
}
}
Expand All @@ -81,7 +78,7 @@ mod tests {

use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;

use crate::error::Error;
Expand Down Expand Up @@ -115,63 +112,6 @@ mod tests {
assert!(!err.is_retryable());
}

#[tokio::test]
async fn test_failed_to_update_table_route_error() {
let state = UpdateMetadata::Downgrade;
let persistent_context = new_persistent_context();
let from_peer = persistent_context.from_peer.clone();

let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();

let table_info = new_test_table_info(1024, vec![1, 2]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
..Default::default()
},
];

env.create_physical_table_metadata(table_info, region_routes)
.await;

let table_metadata_manager = env.table_metadata_manager();
let original_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();

// modifies the table route.
table_metadata_manager
.update_leader_region_status(table_id, &original_table_route, |route| {
if route.region.id == RegionId::new(1024, 2) {
Some(Some(LeaderState::Downgrading))
} else {
None
}
})
.await
.unwrap();

// sets the old table route.
ctx.volatile_ctx.table_route = Some(original_table_route);

let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(err.is_retryable());
assert!(format!("{err:?}").contains("Failed to update the table route"));
}

#[tokio::test]
async fn test_only_downgrade_from_peer() {
let mut state = Box::new(UpdateMetadata::Downgrade);
Expand Down Expand Up @@ -212,7 +152,6 @@ mod tests {
// It should remain unchanged.
assert_eq!(latest_table_route.version().unwrap(), 0);
assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
assert!(ctx.volatile_ctx.table_route.is_none());
}

#[tokio::test]
Expand Down Expand Up @@ -254,6 +193,5 @@ mod tests {
.unwrap();

assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
assert!(ctx.volatile_ctx.table_route.is_none());
}
}
Loading
Loading