diff --git a/config/config.md b/config/config.md index 37cd5d1bacb1..3250728e9e81 100644 --- a/config/config.md +++ b/config/config.md @@ -323,6 +323,7 @@ | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | +| `region_failure_detector_initialization_delay` | String | `10m` | Delay before initializing region failure detectors.
This delay helps prevent premature initialization of region failure detectors in cases where
cluster maintenance mode is enabled right after metasrv starts, especially when the cluster
is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration
may trigger unnecessary region failovers during datanode startup. | | `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.
**This option is not recommended to be set to true, because it may lead to data loss during failover.** | | `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 81213b2a1268..63ff0155520b 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -43,6 +43,13 @@ use_memory_store = false ## - Using shared storage (e.g., s3). enable_region_failover = false +## Delay before initializing region failure detectors. +## This delay helps prevent premature initialization of region failure detectors in cases where +## cluster maintenance mode is enabled right after metasrv starts, especially when the cluster +## is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration +## may trigger unnecessary region failovers during datanode startup. +region_failure_detector_initialization_delay = '10m' + ## Whether to allow region failover on local WAL. ## **This option is not recommended to be set to true, because it may lead to data loss during failover.** allow_region_failover_on_local_wal = false diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 94d2a0bf07b3..dbf87adf2f9e 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -48,6 +48,11 @@ impl TableRouteKey { pub fn new(table_id: TableId) -> Self { Self { table_id } } + + /// Returns the range prefix of the table route key. + pub fn range_prefix() -> Vec { + format!("{}/", TABLE_ROUTE_PREFIX).into_bytes() + } } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b1ea799e5fce..7abf5193f6f0 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -54,14 +54,6 @@ pub enum Error { peer_id: u64, }, - #[snafu(display("Failed to lookup peer: {}", peer_id))] - LookupPeer { - #[snafu(implicit)] - location: Location, - source: common_meta::error::Error, - peer_id: u64, - }, - #[snafu(display("Another migration procedure is running for region: {}", region_id))] MigrationRunning { #[snafu(implicit)] @@ -1033,7 +1025,6 @@ impl ErrorExt for Error { } Error::Other { source, .. } => source.status_code(), - Error::LookupPeer { source, .. } => source.status_code(), Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted, #[cfg(feature = "pg_kvbackend")] diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 50797d44c480..91d0b22caf63 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -110,6 +110,14 @@ pub struct MetasrvOptions { pub use_memory_store: bool, /// Whether to enable region failover. pub enable_region_failover: bool, + /// Delay before initializing region failure detectors. + /// + /// This delay helps prevent premature initialization of region failure detectors in cases where + /// cluster maintenance mode is enabled right after metasrv starts, especially when the cluster + /// is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration + /// may trigger unnecessary region failovers during datanode startup. + #[serde(with = "humantime_serde")] + pub region_failure_detector_initialization_delay: Duration, /// Whether to allow region failover on local WAL. /// /// If it's true, the region failover will be allowed even if the local WAL is used. @@ -219,6 +227,7 @@ impl Default for MetasrvOptions { selector: SelectorType::default(), use_memory_store: false, enable_region_failover: false, + region_failure_detector_initialization_delay: Duration::from_secs(10 * 60), allow_region_failover_on_local_wal: false, grpc: GrpcOptions { bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT), diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 167c5afd8ea3..85e50c8669f4 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -64,7 +64,7 @@ use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker}; use crate::procedure::wal_prune::Context as WalPruneContext; use crate::region::supervisor::{ HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector, - RegionSupervisorTicker, DEFAULT_TICK_INTERVAL, + RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, }; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::round_robin::RoundRobinSelector; @@ -299,6 +299,8 @@ impl MetasrvBuilder { Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _, Some(Arc::new(RegionSupervisorTicker::new( DEFAULT_TICK_INTERVAL, + options.region_failure_detector_initialization_delay, + DEFAULT_INITIALIZATION_RETRY_PERIOD, tx.clone(), ))), ) @@ -341,6 +343,7 @@ impl MetasrvBuilder { region_migration_manager.clone(), maintenance_mode_manager.clone(), peer_lookup_service.clone(), + leader_cached_kv_backend.clone(), ); Some(RegionFailureHandler::new( diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index b277bd3e23b6..adf6c0732b7e 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -23,7 +23,7 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use table::table_name::TableName; @@ -253,10 +253,12 @@ impl RegionMigrationManager { } /// Throws an error if `leader_peer` is not the `from_peer`. + /// + /// If `from_peer` is unknown, use the leader peer as the `from_peer`. fn verify_region_leader_peer( &self, region_route: &RegionRoute, - task: &RegionMigrationProcedureTask, + task: &mut RegionMigrationProcedureTask, ) -> Result<()> { let leader_peer = region_route .leader_peer @@ -275,6 +277,15 @@ impl RegionMigrationManager { } ); + if task.from_peer.addr.is_empty() { + warn!( + "The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}", + leader_peer, task.region_id + ); + // The peer id is the same as the leader peer id. + task.from_peer = leader_peer.clone(); + } + Ok(()) } @@ -300,7 +311,7 @@ impl RegionMigrationManager { /// Submits a new region migration procedure. pub async fn submit_procedure( &self, - task: RegionMigrationProcedureTask, + mut task: RegionMigrationProcedureTask, ) -> Result> { let Some(guard) = self.insert_running_procedure(&task) else { return error::MigrationRunningSnafu { @@ -333,7 +344,7 @@ impl RegionMigrationManager { .fail(); } - self.verify_region_leader_peer(®ion_route, &task)?; + self.verify_region_leader_peer(®ion_route, &mut task)?; self.verify_region_follower_peers(®ion_route, &task)?; let table_info = self.retrieve_table_info(region_id).await?; let TableName { @@ -341,12 +352,6 @@ impl RegionMigrationManager { schema_name, .. } = table_info.table_name(); - METRIC_META_REGION_MIGRATION_DATANODES - .with_label_values(&["src", &task.from_peer.id.to_string()]) - .inc(); - METRIC_META_REGION_MIGRATION_DATANODES - .with_label_values(&["desc", &task.to_peer.id.to_string()]) - .inc(); let RegionMigrationProcedureTask { region_id, from_peer, @@ -377,6 +382,12 @@ impl RegionMigrationManager { return; } }; + METRIC_META_REGION_MIGRATION_DATANODES + .with_label_values(&["src", &task.from_peer.id.to_string()]) + .inc(); + METRIC_META_REGION_MIGRATION_DATANODES + .with_label_values(&["desc", &task.to_peer.id.to_string()]) + .inc(); if let Err(e) = watcher::wait(watcher).await { error!(e; "Failed to wait region migration procedure {procedure_id} for {task}"); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index edbe29d85caf..6d4e67cd2844 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -15,23 +15,30 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use common_meta::datanode::Stat; use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; use common_meta::key::maintenance::MaintenanceModeManagerRef; +use common_meta::key::table_route::{TableRouteKey, TableRouteValue}; +use common_meta::key::{MetadataKey, MetadataValue}; +use common_meta::kv_backend::KvBackendRef; use common_meta::leadership_notifier::LeadershipChangeListener; use common_meta::peer::{Peer, PeerLookupServiceRef}; +use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use common_meta::rpc::store::RangeRequest; use common_meta::DatanodeId; use common_runtime::JoinHandle; use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound}; -use snafu::{ensure, OptionExt, ResultExt}; +use futures::{StreamExt, TryStreamExt}; +use snafu::{ensure, ResultExt}; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::time::{interval, MissedTickBehavior}; +use tokio::sync::oneshot; +use tokio::time::{interval, interval_at, MissedTickBehavior}; use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; @@ -70,6 +77,9 @@ impl From<&Stat> for DatanodeHeartbeat { /// /// Variants: /// - `Tick`: This event is used to trigger region failure detection periodically. +/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors. +/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions. +/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions. /// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes. /// - `Clear`: This event is used to reset the state of the supervisor, typically used /// when a system-wide reset or reinitialization is needed. @@ -78,6 +88,7 @@ impl From<&Stat> for DatanodeHeartbeat { /// of the supervisor during tests. pub(crate) enum Event { Tick, + InitializeAllRegions(tokio::sync::oneshot::Sender<()>), RegisterFailureDetectors(Vec), DeregisterFailureDetectors(Vec), HeartbeatArrived(DatanodeHeartbeat), @@ -102,6 +113,7 @@ impl Debug for Event { Self::Tick => write!(f, "Tick"), Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(), Self::Clear => write!(f, "Clear"), + Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"), Self::RegisterFailureDetectors(arg0) => f .debug_tuple("RegisterFailureDetectors") .field(arg0) @@ -127,6 +139,12 @@ pub struct RegionSupervisorTicker { /// The interval of tick. tick_interval: Duration, + /// The delay before initializing all region failure detectors. + initialization_delay: Duration, + + /// The retry period for initializing all region failure detectors. + initialization_retry_period: Duration, + /// Sends [Event]s. sender: Sender, } @@ -149,10 +167,21 @@ impl LeadershipChangeListener for RegionSupervisorTicker { } impl RegionSupervisorTicker { - pub(crate) fn new(tick_interval: Duration, sender: Sender) -> Self { + pub(crate) fn new( + tick_interval: Duration, + initialization_delay: Duration, + initialization_retry_period: Duration, + sender: Sender, + ) -> Self { + info!( + "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}", + tick_interval, initialization_delay, initialization_retry_period + ); Self { tick_handle: Mutex::new(None), tick_interval, + initialization_delay, + initialization_retry_period, sender, } } @@ -163,15 +192,39 @@ impl RegionSupervisorTicker { if handle.is_none() { let sender = self.sender.clone(); let tick_interval = self.tick_interval; + let initialization_delay = self.initialization_delay; + + let mut initialization_interval = interval_at( + tokio::time::Instant::now() + initialization_delay, + self.initialization_retry_period, + ); + initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + common_runtime::spawn_global(async move { + loop { + initialization_interval.tick().await; + let (tx, rx) = oneshot::channel(); + if sender.send(Event::InitializeAllRegions(tx)).await.is_err() { + info!("EventReceiver is dropped, region failure detectors initialization loop is stopped"); + break; + } + if rx.await.is_ok() { + info!("All region failure detectors are initialized."); + break; + } + } + }); + + let sender = self.sender.clone(); let ticker_loop = tokio::spawn(async move { - let mut interval = interval(tick_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut tick_interval = interval(tick_interval); + tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + if let Err(err) = sender.send(Event::Clear).await { warn!(err; "EventReceiver is dropped, failed to send Event::Clear"); return; } loop { - interval.tick().await; + tick_interval.tick().await; if sender.send(Event::Tick).await.is_err() { info!("EventReceiver is dropped, tick loop is stopped"); break; @@ -202,6 +255,8 @@ pub type RegionSupervisorRef = Arc; /// The default tick interval. pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); +/// The default initialization retry period. +pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60); /// Selector for region supervisor. pub enum RegionSupervisorSelector { @@ -228,6 +283,8 @@ pub struct RegionSupervisor { maintenance_mode_manager: MaintenanceModeManagerRef, /// Peer lookup service peer_lookup: PeerLookupServiceRef, + /// The kv backend. + kv_backend: KvBackendRef, } /// Controller for managing failure detectors for regions. @@ -290,6 +347,7 @@ impl RegionSupervisor { tokio::sync::mpsc::channel(1024) } + #[allow(clippy::too_many_arguments)] pub(crate) fn new( event_receiver: Receiver, options: PhiAccrualFailureDetectorOptions, @@ -298,6 +356,7 @@ impl RegionSupervisor { region_migration_manager: RegionMigrationManagerRef, maintenance_mode_manager: MaintenanceModeManagerRef, peer_lookup: PeerLookupServiceRef, + kv_backend: KvBackendRef, ) -> Self { Self { failure_detector: RegionFailureDetector::new(options), @@ -308,6 +367,7 @@ impl RegionSupervisor { region_migration_manager, maintenance_mode_manager, peer_lookup, + kv_backend, } } @@ -315,6 +375,26 @@ impl RegionSupervisor { pub(crate) async fn run(&mut self) { while let Some(event) = self.receiver.recv().await { match event { + Event::InitializeAllRegions(sender) => { + match self.is_maintenance_mode_enabled().await { + Ok(false) => {} + Ok(true) => { + warn!("Skipping initialize all regions since maintenance mode is enabled."); + continue; + } + Err(err) => { + error!(err; "Failed to check maintenance mode during initialize all regions."); + continue; + } + } + + if let Err(err) = self.initialize_all().await { + error!(err; "Failed to initialize all regions."); + } else { + // Ignore the error. + let _ = sender.send(()); + } + } Event::Tick => { let regions = self.detect_region_failure(); self.handle_region_failures(regions).await; @@ -336,6 +416,59 @@ impl RegionSupervisor { info!("RegionSupervisor is stopped!"); } + async fn initialize_all(&self) -> Result<()> { + let now = Instant::now(); + let regions = self.regions(); + let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix()); + let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| { + TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value)) + }) + .into_stream(); + + let mut stream = stream + .map_ok(|(_, value)| { + TableRouteValue::try_from_raw_value(&value) + .context(error::TableMetadataManagerSnafu) + }) + .boxed(); + let mut detecting_regions = Vec::new(); + while let Some(route) = stream + .try_next() + .await + .context(error::TableMetadataManagerSnafu)? + { + let route = route?; + if !route.is_physical() { + continue; + } + + let physical_table_route = route.into_physical_table_route(); + physical_table_route + .region_routes + .iter() + .for_each(|region_route| { + if !regions.contains(®ion_route.region.id) { + if let Some(leader_peer) = ®ion_route.leader_peer { + detecting_regions.push((leader_peer.id, region_route.region.id)); + } + } + }); + } + + let num_detecting_regions = detecting_regions.len(); + if !detecting_regions.is_empty() { + self.register_failure_detectors(detecting_regions).await; + } + + info!( + "Initialize {} region failure detectors, elapsed: {:?}", + num_detecting_regions, + now.elapsed() + ); + + Ok(()) + } + async fn register_failure_detectors(&self, detecting_regions: Vec) { let ts_millis = current_time_millis(); for region in detecting_regions { @@ -497,12 +630,10 @@ impl RegionSupervisor { .peer_lookup .datanode(from_peer_id) .await - .context(error::LookupPeerSnafu { - peer_id: from_peer_id, - })? - .context(error::PeerUnavailableSnafu { - peer_id: from_peer_id, - })?; + .ok() + .flatten() + .unwrap_or_else(|| Peer::empty(from_peer_id)); + let region_peers = self .select_peers(from_peer_id, regions, failed_datanodes) .await?; @@ -599,6 +730,14 @@ impl RegionSupervisor { .collect::>() } + /// Returns all regions that registered in the failure detector. + fn regions(&self) -> HashSet { + self.failure_detector + .iter() + .map(|e| e.region_ident().1) + .collect::>() + } + /// Updates the state of corresponding failure detectors. fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) { for region_id in heartbeat.regions { @@ -618,13 +757,22 @@ impl RegionSupervisor { #[cfg(test)] pub(crate) mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; + use common_meta::ddl::test_util::{ + test_create_logical_table_task, test_create_physical_table_task, + }; use common_meta::ddl::RegionFailureDetectorController; - use common_meta::key::maintenance; + use common_meta::key::table_route::{ + LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue, + }; + use common_meta::key::{maintenance, TableMetadataManager}; use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::test_util::NoopPeerLookupService; + use common_telemetry::info; use common_time::util::current_time_millis; use rand::Rng; use store_api::storage::RegionId; @@ -654,6 +802,7 @@ pub(crate) mod tests { Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend())); let peer_lookup = Arc::new(NoopPeerLookupService); let (tx, rx) = RegionSupervisor::channel(); + let kv_backend = env.kv_backend(); ( RegionSupervisor::new( @@ -664,6 +813,7 @@ pub(crate) mod tests { region_migration_manager, maintenance_mode_manager, peer_lookup, + kv_backend, ), tx, ) @@ -748,6 +898,8 @@ pub(crate) mod tests { let ticker = RegionSupervisorTicker { tick_handle: Mutex::new(None), tick_interval: Duration::from_millis(10), + initialization_delay: Duration::from_millis(100), + initialization_retry_period: Duration::from_millis(100), sender: tx, }; // It's ok if we start the ticker again. @@ -757,11 +909,116 @@ pub(crate) mod tests { ticker.stop(); assert!(!rx.is_empty()); while let Ok(event) = rx.try_recv() { - assert_matches!(event, Event::Tick | Event::Clear); + assert_matches!( + event, + Event::Tick | Event::Clear | Event::InitializeAllRegions(_) + ); + } + } + } + + #[tokio::test] + async fn test_initialize_all_regions_event_handling() { + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = tokio::sync::mpsc::channel(128); + let ticker = RegionSupervisorTicker { + tick_handle: Mutex::new(None), + tick_interval: Duration::from_millis(1000), + initialization_delay: Duration::from_millis(50), + initialization_retry_period: Duration::from_millis(50), + sender: tx, + }; + ticker.start(); + sleep(Duration::from_millis(60)).await; + let handle = tokio::spawn(async move { + let mut counter = 0; + while let Some(event) = rx.recv().await { + if let Event::InitializeAllRegions(tx) = event { + if counter == 0 { + // Ignore the first event + counter += 1; + continue; + } + tx.send(()).unwrap(); + info!("Responded initialize all regions event"); + break; + } } + rx + }); + + let rx = handle.await.unwrap(); + for _ in 0..3 { + sleep(Duration::from_millis(100)).await; + assert!(rx.is_empty()); } } + #[tokio::test] + async fn test_initialize_all_regions() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, sender) = new_test_supervisor(); + let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone()); + + // Create a physical table metadata + let table_id = 1024; + let mut create_physical_table_task = test_create_physical_table_task("my_physical_table"); + create_physical_table_task.set_table_id(table_id); + let table_info = create_physical_table_task.table_info; + let table_route = PhysicalTableRouteValue::new(vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 0), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]); + let table_route_value = TableRouteValue::Physical(table_route); + table_metadata_manager + .create_table_metadata(table_info, table_route_value, HashMap::new()) + .await + .unwrap(); + + // Create a logical table metadata + let logical_table_id = 1025; + let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table"); + test_create_logical_table_task.set_table_id(logical_table_id); + let table_info = test_create_logical_table_task.table_info; + let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]); + let table_route_value = TableRouteValue::Logical(table_route); + table_metadata_manager + .create_table_metadata(table_info, table_route_value, HashMap::new()) + .await + .unwrap(); + tokio::spawn(async move { supervisor.run().await }); + let (tx, rx) = oneshot::channel(); + sender.send(Event::InitializeAllRegions(tx)).await.unwrap(); + assert!(rx.await.is_ok()); + + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + assert_eq!(detector.len(), 1); + assert!(detector.contains(&(1, RegionId::new(1024, 0)))); + } + + #[tokio::test] + async fn test_initialize_all_regions_with_maintenance_mode() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, sender) = new_test_supervisor(); + + supervisor + .maintenance_mode_manager + .set_maintenance_mode() + .await + .unwrap(); + tokio::spawn(async move { supervisor.run().await }); + let (tx, rx) = oneshot::channel(); + sender.send(Event::InitializeAllRegions(tx)).await.unwrap(); + // The sender is dropped, so the receiver will receive an error. + assert!(rx.await.is_err()); + } + #[tokio::test] async fn test_region_failure_detector_controller() { let (mut supervisor, sender) = new_test_supervisor();