diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index b14fea51e01c..c9affc41addb 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -46,9 +46,7 @@ use snafu::ResultExt; #[cfg(feature = "mysql_kvbackend")] use sqlx::mysql::MySqlConnectOptions; #[cfg(feature = "mysql_kvbackend")] -use sqlx::mysql::{MySqlConnection, MySqlPool}; -#[cfg(feature = "mysql_kvbackend")] -use sqlx::Connection; +use sqlx::mysql::MySqlPool; use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{oneshot, Mutex}; @@ -278,6 +276,7 @@ pub async fn metasrv_builder( let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS); let execution_timeout = Duration::from_secs(META_LEASE_SECS); let statement_timeout = Duration::from_secs(META_LEASE_SECS); + let idle_session_timeout = Duration::from_secs(META_LEASE_SECS); let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS); let mut cfg = Config::new(); @@ -286,8 +285,12 @@ pub async fn metasrv_builder( // We use a separate pool for election since we need a different session keep-alive idle time. let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?; - let election_client = - ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?; + let election_client = ElectionPgClient::new( + pool, + execution_timeout, + idle_session_timeout, + statement_timeout, + )?; let election = PgElection::with_pg_client( opts.grpc.server_addr.clone(), election_client, @@ -308,6 +311,10 @@ pub async fn metasrv_builder( } #[cfg(feature = "mysql_kvbackend")] (None, BackendImpl::MysqlStore) => { + use std::time::Duration; + + use crate::election::rds::mysql::ElectionMysqlClient; + let pool = create_mysql_pool(&opts.store_addrs).await?; let kv_backend = MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops) @@ -315,13 +322,29 @@ pub async fn metasrv_builder( .context(error::KvBackendSnafu)?; // Since election will acquire a lock of the table, we need a separate table for election. let election_table_name = opts.meta_table_name.clone() + "_election"; - let election_client = create_mysql_client(opts).await?; + // We use a separate pool for election since we need a different session keep-alive idle time. + let pool = create_mysql_pool(&opts.store_addrs).await?; + let execution_timeout = Duration::from_secs(META_LEASE_SECS); + let statement_timeout = Duration::from_secs(META_LEASE_SECS); + let idle_session_timeout = Duration::from_secs(META_LEASE_SECS); + let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2); + let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS); + let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS); + + let election_client = ElectionMysqlClient::new( + pool, + execution_timeout, + statement_timeout, + innode_lock_wait_timeout, + idle_session_timeout, + &election_table_name, + ); let election = MySqlElection::with_mysql_client( opts.grpc.server_addr.clone(), election_client, opts.store_key_prefix.clone(), - CANDIDATE_LEASE_SECS, - META_LEASE_SECS, + candidate_lease_ttl, + meta_lease_ttl, &election_table_name, ) .await?; @@ -438,14 +461,6 @@ pub async fn create_mysql_pool(store_addrs: &[String]) -> Result { let pool = MySqlPool::connect_with(opts) .await .context(error::CreateMySqlPoolSnafu)?; - Ok(pool) -} -#[cfg(feature = "mysql_kvbackend")] -async fn create_mysql_client(opts: &MetasrvOptions) -> Result { - let opts = setup_mysql_options(&opts.store_addrs).await?; - let client = MySqlConnection::connect_with(&opts) - .await - .context(error::ConnectMySqlSnafu)?; - Ok(client) + Ok(pool) } diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index 6971302f4e54..27b348a83a3e 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -16,12 +16,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use common_telemetry::warn; +use common_telemetry::{error, warn}; use common_time::Timestamp; use snafu::{ensure, OptionExt, ResultExt}; use sqlx::mysql::{MySqlArguments, MySqlRow}; +use sqlx::pool::PoolConnection; use sqlx::query::Query; -use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row}; +use sqlx::{MySql, MySqlPool, MySqlTransaction, Row}; use tokio::sync::{broadcast, Mutex, MutexGuard}; use tokio::time::MissedTickBehavior; @@ -31,14 +32,14 @@ use crate::election::{ CANDIDATES_ROOT, ELECTION_KEY, }; use crate::error::{ - DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu, - UnexpectedSnafu, + AcquireMySqlClientSnafu, DecodeSqlValueSnafu, DeserializeFromJsonSnafu, + LeaderLeaseChangedSnafu, LeaderLeaseExpiredSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, + SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu, }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; struct ElectionSqlFactory<'a> { table_name: &'a str, - meta_lease_ttl_secs: u64, } struct ElectionSqlSet { @@ -86,11 +87,8 @@ struct ElectionSqlSet { } impl<'a> ElectionSqlFactory<'a> { - fn new(table_name: &'a str, meta_lease_ttl_secs: u64) -> Self { - Self { - table_name, - meta_lease_ttl_secs, - } + fn new(table_name: &'a str) -> Self { + Self { table_name } } fn build(self) -> ElectionSqlSet { @@ -104,42 +102,6 @@ impl<'a> ElectionSqlFactory<'a> { } } - // Currently the session timeout is longer than the leader lease time. - // So the leader will renew the lease twice before the session timeout if everything goes well. - fn set_idle_session_timeout_sql(&self) -> String { - format!( - "SET SESSION wait_timeout = {};", - self.meta_lease_ttl_secs + 1 - ) - } - - fn set_lock_wait_timeout_sql(&self) -> &str { - "SET SESSION innodb_lock_wait_timeout = 1;" - } - - fn create_table_sql(&self) -> String { - format!( - r#" - CREATE TABLE IF NOT EXISTS `{}` ( - k VARBINARY(3072) PRIMARY KEY, - v BLOB - ); - "#, - self.table_name - ) - } - - fn insert_once(&self) -> String { - format!( - "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');", - self.table_name - ) - } - - fn check_version(&self) -> &str { - "SELECT @@version;" - } - /// Use `SELECT FOR UPDATE` to lock for compatibility with other MySQL-compatible databases /// instead of directly using `GET_LOCK`. fn campaign_sql(&self) -> String { @@ -192,8 +154,8 @@ impl<'a> ElectionSqlFactory<'a> { } enum Executor<'a> { - Default(MutexGuard<'a, MySqlConnection>), - Txn(MySqlTransaction<'a>), + Default(MutexGuard<'a, ElectionMysqlClient>), + Txn(TransactionWithExecutionTimeout<'a>), } impl Executor<'_> { @@ -203,107 +165,339 @@ impl Executor<'_> { sql: &str, ) -> Result> { match self { - Executor::Default(client) => { - let res = query - .fetch_all(&mut **client) - .await - .context(MySqlExecutionSnafu { sql })?; - Ok(res) - } - Executor::Txn(txn) => { - let res = query - .fetch_all(&mut **txn) - .await - .context(MySqlExecutionSnafu { sql })?; - Ok(res) - } + Executor::Default(client) => client.query(query, sql).await, + Executor::Txn(txn) => txn.query(query, sql).await, } } async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result { match self { - Executor::Default(client) => { - let res = query - .execute(&mut **client) - .await - .context(MySqlExecutionSnafu { sql })?; - Ok(res.rows_affected()) - } - Executor::Txn(txn) => { - let res = query - .execute(&mut **txn) - .await - .context(MySqlExecutionSnafu { sql })?; - Ok(res.rows_affected()) - } + Executor::Default(client) => client.execute(query, sql).await, + Executor::Txn(txn) => txn.execute(query, sql).await, } } async fn commit(self) -> Result<()> { match self { - Executor::Txn(txn) => { - txn.commit() - .await - .context(MySqlExecutionSnafu { sql: "COMMIT" })?; - Ok(()) - } + Executor::Txn(txn) => txn.commit().await, _ => Ok(()), } } } +/// MySQL client for election. +pub struct ElectionMysqlClient { + current: Option>, + pool: MySqlPool, + + /// The client-side timeout for statement execution. + /// + /// This timeout is enforced by the client application and is independent of any server-side timeouts. + /// If a statement takes longer than this duration to execute, the client will abort the operation. + execution_timeout: Duration, + + /// The maximum execution time for the statement. + /// + /// This timeout is enforced by the server and is independent of any client-side timeouts. + /// If a statement takes longer than this duration to execute, the server will abort the operation. + max_execution_time: Duration, + + /// The lock wait timeout for the session. + /// + /// This timeout determines how long the server waits for a lock to be acquired before timing out. + /// If a lock cannot be acquired within this duration, the server will abort the operation. + innode_lock_wait_timeout: Duration, + + /// The wait timeout for the session. + /// + /// This timeout determines how long the server waits for activity on a noninteractive connection + /// before closing it. If a connection is idle for longer than this duration, the server will + /// terminate it. + wait_timeout: Duration, + + /// The table name for election. + table_for_election: String, +} + +impl ElectionMysqlClient { + pub fn new( + pool: MySqlPool, + execution_timeout: Duration, + max_execution_time: Duration, + innode_lock_wait_timeout: Duration, + wait_timeout: Duration, + table_for_election: &str, + ) -> Self { + Self { + current: None, + pool, + execution_timeout, + max_execution_time, + innode_lock_wait_timeout, + wait_timeout, + table_for_election: table_for_election.to_string(), + } + } + + fn create_table_sql(&self) -> String { + format!( + r#" + CREATE TABLE IF NOT EXISTS `{}` ( + k VARBINARY(3072) PRIMARY KEY, + v BLOB + );"#, + self.table_for_election + ) + } + + fn insert_once_sql(&self) -> String { + format!( + "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');", + self.table_for_election + ) + } + + fn check_version_sql(&self) -> String { + "SELECT @@version;".to_string() + } + + async fn reset_client(&mut self) -> Result<()> { + self.current = None; + self.maybe_init_client().await + } + + async fn ensure_table_exists(&mut self) -> Result<()> { + let create_table_sql = self.create_table_sql(); + let query = sqlx::query(&create_table_sql); + self.execute(query, &create_table_sql).await?; + // Insert at least one row for `SELECT * FOR UPDATE` to work. + let insert_once_sql = self.insert_once_sql(); + let query = sqlx::query(&insert_once_sql); + self.execute(query, &insert_once_sql).await?; + Ok(()) + } + + async fn maybe_init_client(&mut self) -> Result<()> { + if self.current.is_none() { + let client = self.pool.acquire().await.context(AcquireMySqlClientSnafu)?; + + self.current = Some(client); + let (query, sql) = if !self.wait_timeout.is_zero() { + let sql = "SET SESSION wait_timeout = ?, innodb_lock_wait_timeout = ?, max_execution_time = ?;"; + ( + sqlx::query(sql) + .bind(self.wait_timeout.as_secs()) + .bind(self.innode_lock_wait_timeout.as_secs()) + .bind(self.max_execution_time.as_millis() as u64), + sql, + ) + } else { + let sql = "SET SESSION innodb_lock_wait_timeout = ?, max_execution_time = ?;"; + ( + sqlx::query(sql) + .bind(self.innode_lock_wait_timeout.as_secs()) + .bind(self.max_execution_time.as_millis() as u64), + sql, + ) + }; + self.set_session_isolation_level().await?; + self.execute(query, sql).await?; + self.check_version(&self.check_version_sql()).await?; + } + Ok(()) + } + + /// Set session isolation level to serializable. + /// + /// # Panics + /// if `current` is `None`. + async fn set_session_isolation_level(&mut self) -> Result<()> { + let sql = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE"; + let query = sqlx::query(sql); + self.execute(query, sql).await?; + Ok(()) + } + + /// Check if the MySQL version is supported. + /// + /// # Panics + /// if `current` is `None`. + async fn check_version(&mut self, sql: &str) -> Result<()> { + // Check if the MySQL version is supported. + let query = sqlx::query(sql); + // Safety: `maybe_init_client` ensures `current` is not `None`. + let client = self.current.as_mut().unwrap(); + let row = tokio::time::timeout(self.execution_timeout, query.fetch_one(&mut **client)) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql, + duration: self.execution_timeout, + } + .build() + })?; + match row { + Ok(row) => { + let version: String = row.try_get(0).context(DecodeSqlValueSnafu {})?; + if !version.starts_with("8.0") && !version.starts_with("5.7") { + warn!( + "Unsupported MySQL version: {}, expected: [5.7, 8.0]", + version + ); + } + } + Err(e) => { + warn!(e; "Failed to check MySQL version through sql: {}", sql); + } + } + Ok(()) + } + + /// Returns the result of the query. + /// + /// # Panics + /// if `current` is `None`. + async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result { + // Safety: `maybe_init_client` ensures `current` is not `None`. + let client = self.current.as_mut().unwrap(); + let future = query.execute(&mut **client); + let rows_affected = tokio::time::timeout(self.execution_timeout, future) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql, + duration: self.execution_timeout, + } + .build() + })? + .context(MySqlExecutionSnafu { sql })? + .rows_affected(); + Ok(rows_affected) + } + + /// Returns the result of the query. + /// + /// # Panics + /// if `current` is `None`. + async fn query( + &mut self, + query: Query<'_, MySql, MySqlArguments>, + sql: &str, + ) -> Result> { + // Safety: `maybe_init_client` ensures `current` is not `None`. + let client = self.current.as_mut().unwrap(); + let future = query.fetch_all(&mut **client); + tokio::time::timeout(self.execution_timeout, future) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql, + duration: self.execution_timeout, + } + .build() + })? + .context(MySqlExecutionSnafu { sql }) + } + + async fn transaction(&mut self) -> Result> { + use sqlx::Acquire; + let client = self.current.as_mut().unwrap(); + let transaction = client + .begin() + .await + .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + + Ok(TransactionWithExecutionTimeout { + transaction, + execution_timeout: self.execution_timeout, + }) + } +} + +struct TransactionWithExecutionTimeout<'a> { + transaction: MySqlTransaction<'a>, + execution_timeout: Duration, +} + +impl TransactionWithExecutionTimeout<'_> { + async fn query( + &mut self, + query: Query<'_, MySql, MySqlArguments>, + sql: &str, + ) -> Result> { + let res = tokio::time::timeout( + self.execution_timeout, + query.fetch_all(&mut *self.transaction), + ) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql, + duration: self.execution_timeout, + } + .build() + })? + .context(MySqlExecutionSnafu { sql })?; + Ok(res) + } + + async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result { + let res = tokio::time::timeout( + self.execution_timeout, + query.execute(&mut *self.transaction), + ) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql, + duration: self.execution_timeout, + } + .build() + })? + .context(MySqlExecutionSnafu { sql })?; + Ok(res.rows_affected()) + } + + async fn commit(self) -> Result<()> { + tokio::time::timeout(self.execution_timeout, self.transaction.commit()) + .await + .map_err(|_| { + SqlExecutionTimeoutSnafu { + sql: "COMMIT", + duration: self.execution_timeout, + } + .build() + })? + .context(MySqlExecutionSnafu { sql: "COMMIT" })?; + Ok(()) + } +} + /// MySQL implementation of Election. pub struct MySqlElection { leader_value: String, - client: Mutex, + client: Mutex, is_leader: AtomicBool, leader_infancy: AtomicBool, leader_watcher: broadcast::Sender, store_key_prefix: String, - candidate_lease_ttl_secs: u64, - meta_lease_ttl_secs: u64, + candidate_lease_ttl: Duration, + meta_lease_ttl: Duration, sql_set: ElectionSqlSet, } impl MySqlElection { pub async fn with_mysql_client( leader_value: String, - mut client: sqlx::MySqlConnection, + mut client: ElectionMysqlClient, store_key_prefix: String, - candidate_lease_ttl_secs: u64, - meta_lease_ttl_secs: u64, + candidate_lease_ttl: Duration, + meta_lease_ttl: Duration, table_name: &str, ) -> Result { - let sql_factory = ElectionSqlFactory::new(table_name, meta_lease_ttl_secs); - sqlx::query(&sql_factory.create_table_sql()) - .execute(&mut client) - .await - .context(MySqlExecutionSnafu { - sql: &sql_factory.create_table_sql(), - })?; - // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead lock. - sqlx::query(&sql_factory.set_idle_session_timeout_sql()) - .execute(&mut client) - .await - .context(MySqlExecutionSnafu { - sql: &sql_factory.set_idle_session_timeout_sql(), - })?; - // Set lock wait timeout to LOCK_WAIT_TIMEOUT to avoid waiting too long. - sqlx::query(sql_factory.set_lock_wait_timeout_sql()) - .execute(&mut client) - .await - .context(MySqlExecutionSnafu { - sql: sql_factory.set_lock_wait_timeout_sql(), - })?; - // Insert at least one row for `SELECT * FOR UPDATE` to work. - sqlx::query(&sql_factory.insert_once()) - .execute(&mut client) - .await - .context(MySqlExecutionSnafu { - sql: &sql_factory.insert_once(), - })?; - // Check MySQL version - Self::check_version(&mut client, sql_factory.check_version()).await?; + let sql_factory = ElectionSqlFactory::new(table_name); + client.maybe_init_client().await?; + client.ensure_table_exists().await?; let tx = listen_leader_change(leader_value.clone()); Ok(Arc::new(Self { leader_value, @@ -312,8 +506,8 @@ impl MySqlElection { leader_infancy: AtomicBool::new(false), leader_watcher: tx, store_key_prefix, - candidate_lease_ttl_secs, - meta_lease_ttl_secs, + candidate_lease_ttl, + meta_lease_ttl, sql_set: sql_factory.build(), })) } @@ -329,6 +523,12 @@ impl MySqlElection { fn candidate_key(&self) -> String { format!("{}{}", self.candidate_root(), self.leader_value) } + + async fn maybe_init_client(&self) -> Result<()> { + let mut client = self.client.lock().await; + client.maybe_init_client().await?; + Ok(()) + } } #[async_trait::async_trait] @@ -346,6 +546,7 @@ impl Election for MySqlElection { } async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { + self.maybe_init_client().await?; let key = self.candidate_key(); let node_info = serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { @@ -359,7 +560,7 @@ impl Election for MySqlElection { .put_value_with_lease( &key, &node_info, - self.candidate_lease_ttl_secs, + self.candidate_lease_ttl.as_secs(), &mut executor, ) .await?; @@ -370,7 +571,7 @@ impl Election for MySqlElection { self.put_value_with_lease( &key, &node_info, - self.candidate_lease_ttl_secs, + self.candidate_lease_ttl.as_secs(), &mut executor, ) .await?; @@ -378,8 +579,7 @@ impl Election for MySqlElection { } // Check if the current lease has expired and renew the lease. - let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2)); + let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2); loop { let _ = keep_alive_interval.tick().await; let client = self.client.lock().await; @@ -405,7 +605,7 @@ impl Election for MySqlElection { &key, &lease.origin, &node_info, - self.candidate_lease_ttl_secs, + self.candidate_lease_ttl.as_secs(), &mut executor, ) .await?; @@ -414,6 +614,7 @@ impl Election for MySqlElection { } async fn all_candidates(&self) -> Result> { + self.maybe_init_client().await?; let key_prefix = self.candidate_root(); let client = self.client.lock().await; let mut executor = Executor::Default(client); @@ -434,16 +635,23 @@ impl Election for MySqlElection { } async fn campaign(&self) -> Result<()> { - let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(self.meta_lease_ttl_secs / 2)); + self.maybe_init_client().await?; + let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2); keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - let _ = self.do_campaign().await; + self.do_campaign().await?; keep_alive_interval.tick().await; } } + async fn reset_campaign(&self) { + if let Err(err) = self.client.lock().await.reset_client().await { + error!(err; "Failed to reset client"); + } + } + async fn leader(&self) -> Result { + self.maybe_init_client().await?; if self.is_leader.load(Ordering::Relaxed) { Ok(self.leader_value.as_bytes().into()) } else { @@ -602,63 +810,62 @@ impl MySqlElection { /// Attempts to acquire leadership by executing a campaign. This function continuously checks /// if the current lease is still valid. async fn do_campaign(&self) -> Result<()> { - // Need to restrict the scope of the client to avoid ambiguous overloads. - use sqlx::Acquire; + let lease = { + let client = self.client.lock().await; + let mut executor = Executor::Default(client); + self.get_value_with_lease(&self.election_key(), &mut executor) + .await? + }; - let client = self.client.lock().await; - let executor = Executor::Default(client); - let mut lease = Lease::default(); - match ( - self.lease_check(executor, &mut lease).await, - self.is_leader(), - self.leader_value == lease.leader_value, - ) { + let is_leader = self.is_leader(); + // If current leader value is the same as the leader value in the remote lease, + // it means the current leader is still valid. + let is_current_leader = lease + .as_ref() + .map(|lease| lease.leader_value == self.leader_value) + .unwrap_or(false); + match (self.lease_check(&lease), is_leader, is_current_leader) { // If the leader lease is valid and I'm the leader, renew the lease. (Ok(_), true, true) => { let mut client = self.client.lock().await; - let txn = client - .begin() - .await - .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let txn = client.transaction().await?; let mut executor = Executor::Txn(txn); let query = sqlx::query(&self.sql_set.campaign); executor.query(query, &self.sql_set.campaign).await?; - self.renew_lease(executor, lease).await?; + // Safety: Checked if lease is not None above. + self.renew_lease(executor, lease.unwrap()).await?; } // If the leader lease expires and I'm the leader, notify the leader watcher and step down. // Another instance should be elected as the leader in this case. - (Err(_), true, _) | (Ok(_), true, false) => { + (Err(err), true, _) => { + warn!(err; "Leader lease expired, step down..."); + self.step_down_without_lock().await?; + } + (Ok(_), true, false) => { warn!("Leader lease expired, step down..."); self.step_down_without_lock().await?; } // If the leader lease expires and I'm not the leader, elect myself. - (Err(_), false, _) => { - warn!("Leader lease expired, elected."); + (Err(err), false, _) => { + warn!(err; "Leader lease expired, elect myself."); let mut client = self.client.lock().await; - let txn = client - .begin() - .await - .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let txn = client.transaction().await?; let mut executor = Executor::Txn(txn); let query = sqlx::query(&self.sql_set.campaign); executor.query(query, &self.sql_set.campaign).await?; - self.elected(&mut executor).await?; - executor.commit().await?; + self.elected(executor, lease).await?; } // If the leader lease is valid and I'm the leader, but I don't think I'm the leader. // Just re-elect myself. (Ok(_), false, true) => { warn!("I should be the leader, but I don't think so. Something went wrong."); let mut client = self.client.lock().await; - let txn = client - .begin() - .await - .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let txn = client.transaction().await?; let mut executor = Executor::Txn(txn); let query = sqlx::query(&self.sql_set.campaign); executor.query(query, &self.sql_set.campaign).await?; - self.elected(&mut executor).await?; - executor.commit().await?; + // Safety: Checked if lease is not None above. + self.renew_lease(executor, lease.unwrap()).await?; } // If the leader lease is valid and I'm not the leader, do nothing. (Ok(_), false, false) => {} @@ -673,11 +880,27 @@ impl MySqlElection { &key, &lease.origin, &self.leader_value, - self.meta_lease_ttl_secs, + self.meta_lease_ttl.as_secs(), &mut executor, ) .await?; executor.commit().await?; + + if !self.is_leader() { + let key = self.election_key(); + let leader_key = RdsLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone().into_bytes(), + ..Default::default() + }; + send_leader_change_and_set_flags( + &self.is_leader, + &self.leader_infancy, + &self.leader_watcher, + LeaderChangeMessage::Elected(Arc::new(leader_key)), + ); + } + Ok(()) } @@ -689,17 +912,12 @@ impl MySqlElection { /// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock /// will be released. /// - **Case 2**: If all checks pass, the function returns without performing any actions. - async fn lease_check(&self, mut executor: Executor<'_>, lease: &mut Lease) -> Result<()> { - let key = self.election_key(); - let check_lease = self - .get_value_with_lease(&key, &mut executor) - .await? - .context(NoLeaderSnafu)?; - *lease = check_lease; + fn lease_check(&self, lease: &Option) -> Result { + let lease = lease.as_ref().context(NoLeaderSnafu)?; // Case 1: Lease expired - ensure!(lease.expire_time > lease.current, NoLeaderSnafu); + ensure!(lease.expire_time > lease.current, LeaderLeaseExpiredSnafu); // Case 2: Everything is fine - Ok(()) + Ok(lease.clone()) } /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key. @@ -721,16 +939,31 @@ impl MySqlElection { /// Elected as leader. The leader should put the key and notify the leader watcher. /// Caution: Should only elected while holding the lock. - async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> { + async fn elected( + &self, + mut executor: Executor<'_>, + expected_lease: Option, + ) -> Result<()> { let key = self.election_key(); let leader_key = RdsLeaderKey { name: self.leader_value.clone().into_bytes(), key: key.clone().into_bytes(), ..Default::default() }; - self.delete_value(&key, executor).await?; - self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl_secs, executor) - .await?; + let remote_lease = self.get_value_with_lease(&key, &mut executor).await?; + ensure!( + expected_lease.map(|lease| lease.origin) == remote_lease.map(|lease| lease.origin), + LeaderLeaseChangedSnafu + ); + self.delete_value(&key, &mut executor).await?; + self.put_value_with_lease( + &key, + &self.leader_value, + self.meta_lease_ttl.as_secs(), + &mut executor, + ) + .await?; + executor.commit().await?; send_leader_change_and_set_flags( &self.is_leader, @@ -740,40 +973,25 @@ impl MySqlElection { ); Ok(()) } - - /// Check if the MySQL version is supported. - async fn check_version(client: &mut MySqlConnection, sql: &str) -> Result<()> { - let query = sqlx::query(sql); - match query.fetch_one(client).await { - Ok(row) => { - let version: String = row.try_get(0).unwrap(); - if !version.starts_with("8.0") && !version.starts_with("5.7") { - warn!( - "Unsupported MySQL version: {}, expected: [5.7, 8.0]", - version - ); - } - } - Err(e) => { - warn!(e; "Failed to check MySQL version through sql: {}", sql); - } - } - Ok(()) - } } #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::env; use common_meta::maybe_skip_mysql_integration_test; use common_telemetry::init_default_ut_logging; - use sqlx::Connection; use super::*; - use crate::error::MySqlExecutionSnafu; - - async fn create_mysql_client(table_name: Option<&str>) -> Result> { + use crate::bootstrap::create_mysql_pool; + use crate::error; + + async fn create_mysql_client( + table_name: Option<&str>, + execution_timeout: Duration, + wait_timeout: Duration, + ) -> Result> { init_default_ut_logging(); let endpoint = env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default(); if endpoint.is_empty() { @@ -782,30 +1000,26 @@ mod tests { } .fail(); } - let mut client = MySqlConnection::connect(&endpoint).await.unwrap(); - if let Some(table_name) = table_name { - let create_table_sql = format!( - "CREATE TABLE IF NOT EXISTS {}(k VARCHAR(255) PRIMARY KEY, v BLOB);", - table_name - ); - sqlx::query(&create_table_sql) - .execute(&mut client) - .await - .context(MySqlExecutionSnafu { - sql: create_table_sql, - })?; + let pool = create_mysql_pool(&[endpoint]).await.unwrap(); + let mut client = ElectionMysqlClient::new( + pool, + execution_timeout, + execution_timeout, + Duration::from_secs(1), + wait_timeout, + table_name.unwrap_or("default_greptime_metakv_election"), + ); + client.maybe_init_client().await?; + if table_name.is_some() { + client.ensure_table_exists().await?; } Ok(Mutex::new(client)) } - async fn drop_table(client: &Mutex, table_name: &str) { + async fn drop_table(client: &Mutex, table_name: &str) { let mut client = client.lock().await; let sql = format!("DROP TABLE IF EXISTS {};", table_name); - sqlx::query(&sql) - .execute(&mut *client) - .await - .context(MySqlExecutionSnafu { sql }) - .unwrap(); + client.execute(sqlx::query(&sql), &sql).await.unwrap(); } #[tokio::test] @@ -816,11 +1030,18 @@ mod tests { let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_mysql_crud_greptime_metakv"; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); { let mut a = client.lock().await; - let txn = a.begin().await.unwrap(); + let txn = a.transaction().await.unwrap(); let mut executor = Executor::Txn(txn); let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name); let query = sqlx::query(&raw_query); @@ -835,9 +1056,9 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs: 10, - meta_lease_ttl_secs: 1, - sql_set: ElectionSqlFactory::new(table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), }; let client = mysql_election.client.lock().await; let mut executor = Executor::Default(client); @@ -910,11 +1131,17 @@ mod tests { async fn candidate( leader_value: String, - candidate_lease_ttl_secs: u64, + candidate_lease_ttl: Duration, store_key_prefix: String, table_name: String, ) { - let client = create_mysql_client(None).await.unwrap(); + let meta_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); + let client = + create_mysql_client(Some(&table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); let (tx, _) = broadcast::channel(100); let mysql_election = MySqlElection { @@ -924,9 +1151,9 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix, - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 1, - sql_set: ElectionSqlFactory::new(&table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(&table_name).build(), }; let node_info = MetasrvNodeInfo { @@ -942,24 +1169,29 @@ mod tests { async fn test_candidate_registration() { maybe_skip_mysql_integration_test!(); let leader_value_prefix = "test_leader".to_string(); - let candidate_lease_ttl_secs = 2; + let candidate_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_candidate_registration_greptime_metakv"; let mut handles = vec![]; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); for i in 0..10 { let leader_value = format!("{}{}", leader_value_prefix, i); let handle = tokio::spawn(candidate( leader_value, - candidate_lease_ttl_secs, + candidate_lease_ttl, uuid.clone(), table_name.to_string(), )); handles.push(handle); } // Wait for candidates to registrate themselves and renew their leases at least once. - tokio::time::sleep(Duration::from_secs(candidate_lease_ttl_secs / 2 + 1)).await; + tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await; let (tx, _) = broadcast::channel(100); let leader_value = "test_leader".to_string(); @@ -970,9 +1202,9 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid.clone(), - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 1, - sql_set: ElectionSqlFactory::new(table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), }; let candidates = mysql_election.all_candidates().await.unwrap(); @@ -983,7 +1215,7 @@ mod tests { } // Wait for the candidate leases to expire. - tokio::time::sleep(Duration::from_secs(candidate_lease_ttl_secs + 1)).await; + tokio::time::sleep(candidate_lease_ttl + Duration::from_secs(1)).await; let candidates = mysql_election.all_candidates().await.unwrap(); assert!(candidates.is_empty()); @@ -1004,15 +1236,18 @@ mod tests { drop_table(&mysql_election.client, table_name).await; } - async fn elected(election: &MySqlElection, table_name: &str) { + async fn elected( + election: &MySqlElection, + table_name: &str, + expected_lease: Option, + ) -> Result<()> { let mut client = election.client.lock().await; - let txn = client.begin().await.unwrap(); + let txn = client.transaction().await.unwrap(); let mut executor = Executor::Txn(txn); let raw_query = format!("SELECT * FROM {} FOR UPDATE;", table_name); let query = sqlx::query(&raw_query); let _ = executor.query(query, &raw_query).await.unwrap(); - election.elected(&mut executor).await.unwrap(); - executor.commit().await.unwrap(); + election.elected(executor, expected_lease).await } async fn get_lease(election: &MySqlElection) -> Option { @@ -1024,14 +1259,120 @@ mod tests { .unwrap() } + #[tokio::test] + async fn test_elected_with_incorrect_lease_fails() { + maybe_skip_mysql_integration_test!(); + let leader_value = "test_leader".to_string(); + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_elected_failed_greptime_metakv"; + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value: leader_value.clone(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), + }; + + let incorrect_lease = Lease::default(); + let err = elected(&leader_mysql_election, table_name, Some(incorrect_lease)) + .await + .unwrap_err(); + assert_matches!(err, error::Error::LeaderLeaseChanged { .. }); + let lease = get_lease(&leader_mysql_election).await; + assert!(lease.is_none()); + drop_table(&leader_mysql_election.client, table_name).await; + } + + #[tokio::test] + async fn test_reelection_with_idle_session_timeout() { + maybe_skip_mysql_integration_test!(); + let leader_value = "test_leader".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_reelection_greptime_metakv"; + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(2); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value: leader_value.clone(), + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), + }; + + elected(&leader_mysql_election, table_name, None) + .await + .unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + // Wait for mysql server close the inactive connection. + tokio::time::sleep(Duration::from_millis(2100)).await; + // Should be failed. + leader_mysql_election + .client + .lock() + .await + .query(sqlx::query("SELECT 1"), "SELECT 1") + .await + .unwrap_err(); + // Reset the client. + leader_mysql_election + .client + .lock() + .await + .reset_client() + .await + .unwrap(); + + // Should able to re-elected. + elected(&leader_mysql_election, table_name, Some(lease.clone())) + .await + .unwrap(); + let lease = get_lease(&leader_mysql_election).await.unwrap(); + assert_eq!(lease.leader_value, leader_value); + assert!(lease.expire_time > lease.current); + assert!(leader_mysql_election.is_leader()); + drop_table(&leader_mysql_election.client, table_name).await; + } + #[tokio::test] async fn test_elected_and_step_down() { maybe_skip_mysql_integration_test!(); let leader_value = "test_leader".to_string(); - let candidate_lease_ttl_secs = 1; + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_elected_and_step_down_greptime_metakv"; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); let (tx, mut rx) = broadcast::channel(100); let leader_mysql_election = MySqlElection { @@ -1041,12 +1382,14 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs, - meta_lease_ttl_secs: 1, - sql_set: ElectionSqlFactory::new(table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), }; - elected(&leader_mysql_election, table_name).await; + elected(&leader_mysql_election, table_name, None) + .await + .unwrap(); let lease = get_lease(&leader_mysql_election).await.unwrap(); assert_eq!(lease.leader_value, leader_value); assert!(lease.expire_time > lease.current); @@ -1086,7 +1429,9 @@ mod tests { _ => panic!("Expected LeaderChangeMessage::StepDown"), } - elected(&leader_mysql_election, table_name).await; + elected(&leader_mysql_election, table_name, Some(lease.clone())) + .await + .unwrap(); let lease = get_lease(&leader_mysql_election).await.unwrap(); assert_eq!(lease.leader_value, leader_value); assert!(lease.expire_time > lease.current); @@ -1114,9 +1459,13 @@ mod tests { let leader_value = "test_leader".to_string(); let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_leader_action_greptime_metakv"; - let candidate_lease_ttl_secs = 5; - let meta_lease_ttl_secs = 1; - let client = create_mysql_client(Some(table_name)).await.unwrap(); + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); let (tx, mut rx) = broadcast::channel(100); let leader_mysql_election = MySqlElection { @@ -1126,9 +1475,9 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs, - meta_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), }; // Step 1: No leader exists, campaign and elected. @@ -1161,7 +1510,7 @@ mod tests { assert!(leader_mysql_election.is_leader()); // Step 3: Something wrong, the leader lease expired. - tokio::time::sleep(Duration::from_secs(meta_lease_ttl_secs + 1)).await; + tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await; leader_mysql_election.do_campaign().await.unwrap(); let lease = get_lease(&leader_mysql_election).await.unwrap(); assert_eq!(lease.leader_value, leader_value); @@ -1294,12 +1643,17 @@ mod tests { async fn test_follower_action() { maybe_skip_mysql_integration_test!(); common_telemetry::init_default_ut_logging(); - let candidate_lease_ttl_secs = 5; - let meta_lease_ttl_secs = 1; + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(1); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); let uuid = uuid::Uuid::new_v4().to_string(); let table_name = "test_follower_action_greptime_metakv"; - let follower_client = create_mysql_client(Some(table_name)).await.unwrap(); + let follower_client = + create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); let (tx, mut rx) = broadcast::channel(100); let follower_mysql_election = MySqlElection { leader_value: "test_follower".to_string(), @@ -1308,12 +1662,15 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid.clone(), - candidate_lease_ttl_secs, - meta_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), }; - let leader_client = create_mysql_client(Some(table_name)).await.unwrap(); + let leader_client = + create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); let (tx, _) = broadcast::channel(100); let leader_mysql_election = MySqlElection { leader_value: "test_leader".to_string(), @@ -1322,9 +1679,9 @@ mod tests { leader_infancy: AtomicBool::new(true), leader_watcher: tx, store_key_prefix: uuid, - candidate_lease_ttl_secs, - meta_lease_ttl_secs, - sql_set: ElectionSqlFactory::new(table_name, 1).build(), + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), }; leader_mysql_election.do_campaign().await.unwrap(); @@ -1333,7 +1690,7 @@ mod tests { follower_mysql_election.do_campaign().await.unwrap(); // Step 2: As a follower, the leader exists but the lease expired. Re-elect itself. - tokio::time::sleep(Duration::from_secs(meta_lease_ttl_secs + 1)).await; + tokio::time::sleep(meta_lease_ttl + Duration::from_secs(1)).await; follower_mysql_election.do_campaign().await.unwrap(); assert!(follower_mysql_election.is_leader()); @@ -1352,4 +1709,33 @@ mod tests { drop_table(&follower_mysql_election.client, table_name).await; } + + #[tokio::test] + async fn test_wait_timeout() { + maybe_skip_mysql_integration_test!(); + common_telemetry::init_default_ut_logging(); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(1); + + let client = create_mysql_client(None, execution_timeout, idle_session_timeout) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(1100)).await; + // Wait for the idle session timeout. + let err = client + .lock() + .await + .query(sqlx::query("SELECT 1"), "SELECT 1") + .await + .unwrap_err(); + assert_matches!(err, error::Error::MySqlExecution { .. }); + // Reset the client and try again. + client.lock().await.reset_client().await.unwrap(); + let _ = client + .lock() + .await + .query(sqlx::query("SELECT 1"), "SELECT 1") + .await + .unwrap(); + } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b840d1b0f0fa..b1ea799e5fce 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -382,6 +382,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to decode sql value"))] + DecodeSqlValue { + #[snafu(source)] + error: sqlx::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to find table route for {table_id}"))] TableRouteNotFound { table_id: TableId, @@ -417,6 +425,18 @@ pub enum Error { location: Location, }, + #[snafu(display("Leader lease expired"))] + LeaderLeaseExpired { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Leader lease changed during election"))] + LeaderLeaseChanged { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Table {} not found", name))] TableNotFound { name: String, @@ -766,7 +786,7 @@ pub enum Error { error: deadpool::managed::PoolError, }, - #[cfg(feature = "pg_kvbackend")] + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))] SqlExecutionTimeout { #[snafu(implicit)] @@ -812,8 +832,8 @@ pub enum Error { }, #[cfg(feature = "mysql_kvbackend")] - #[snafu(display("Failed to connect to mysql"))] - ConnectMySql { + #[snafu(display("Failed to acquire mysql client from pool"))] + AcquireMySqlClient { #[snafu(source)] error: sqlx::Error, #[snafu(implicit)] @@ -911,6 +931,8 @@ impl ErrorExt for Error { | Error::SerializeToJson { .. } | Error::DeserializeFromJson { .. } | Error::NoLeader { .. } + | Error::LeaderLeaseExpired { .. } + | Error::LeaderLeaseChanged { .. } | Error::CreateChannel { .. } | Error::BatchGet { .. } | Error::Range { .. } @@ -1012,19 +1034,21 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::LookupPeer { source, .. } => source.status_code(), + Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted, + #[cfg(feature = "pg_kvbackend")] Error::CreatePostgresPool { .. } | Error::GetPostgresClient { .. } | Error::GetPostgresConnection { .. } - | Error::PostgresExecution { .. } - | Error::SqlExecutionTimeout { .. } => StatusCode::Internal, + | Error::PostgresExecution { .. } => StatusCode::Internal, #[cfg(feature = "mysql_kvbackend")] Error::MySqlExecution { .. } | Error::CreateMySqlPool { .. } - | Error::ConnectMySql { .. } - | Error::ParseMySqlUrl { .. } => StatusCode::Internal, - - Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted, + | Error::ParseMySqlUrl { .. } + | Error::DecodeSqlValue { .. } + | Error::AcquireMySqlClient { .. } => StatusCode::Internal, + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + Error::SqlExecutionTimeout { .. } => StatusCode::Internal, } }