Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,946 changes: 987 additions & 959 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ description = "A tool to submit NPoS election solutions for substrate based chai

[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0" }
scale-info = { package = "scale-info", version = "2.10.0" }
scale-info = { package = "scale-info", version = "2.11.1" }
clap = { version = "4.5", features = ["derive", "env"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
jsonrpsee = { version = "0.20", features = ["ws-client"] }
jsonrpsee = { version = "0.22.3", features = ["ws-client"] }
log = "0.4"
serde = "1.0"
serde_json = "1.0"
Expand All @@ -24,15 +24,15 @@ tokio = { version = "1.36", features = ["macros", "rt-multi-thread", "sync", "si
pin-project-lite = "0.2"

# subxt
scale-value = "0.13"
subxt = { version = "0.33", features = ["substrate-compat"] }
scale-value = "0.14.1"
subxt = { version = "0.35.1", features = ["substrate-compat"] }

# polkadot-sdk
frame-election-provider-support = "26.0.0"
pallet-election-provider-multi-phase = "25.0.0"
sp-npos-elections = "24.0.0"
frame-support = "26.0.0"
sp-runtime = "29.0.0"
frame-election-provider-support = "31.0.0"
pallet-election-provider-multi-phase = "30.0.0"
sp-npos-elections = "29.0.0"
frame-support = "31.0.0"
sp-runtime = "34.0.0"

# prometheus
prometheus = "0.13"
Expand All @@ -42,5 +42,5 @@ once_cell = "1.19"
[dev-dependencies]
anyhow = "1"
assert_cmd = "2.0"
sp-storage = "17.0.0"
sp-storage = "20.0.0"
regex = "1"
Binary file modified artifacts/metadata.scale
Binary file not shown.
9 changes: 1 addition & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub struct Client {
rpc: RpcClient,
/// Access to chain APIs such as storage, events etc.
chain_api: ChainClient,
/// Raw RPC client.
raw_rpc: RawRpcClient,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore

}

impl Client {
Expand Down Expand Up @@ -38,7 +36,7 @@ impl Client {
};

let chain_api = ChainClient::from_rpc_client(rpc.clone()).await?;
Ok(Self { rpc: RpcClient::new(rpc.clone()), raw_rpc: rpc, chain_api })
Ok(Self { rpc: RpcClient::new(rpc), chain_api })
}

/// Get a reference to the RPC interface exposed by subxt.
Expand All @@ -50,9 +48,4 @@ impl Client {
pub fn chain_api(&self) -> &ChainClient {
&self.chain_api
}

/// Get a reference to the raw rpc client API.
pub fn raw_rpc(&self) -> &RawRpcClient {
&self.raw_rpc
}
}
9 changes: 3 additions & 6 deletions src/commands/dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! The dry-run command.

use pallet_election_provider_multi_phase::RawSolution;
use subxt::config::DefaultExtrinsicParamsBuilder;

use crate::{
client::Client, epm, error::Error, helpers::storage_at, opt::Solver, prelude::*,
Expand Down Expand Up @@ -115,12 +116,8 @@ where

let nonce = client.rpc().system_account_next_index(signer.account_id()).await?;
let tx = epm::signed_solution(raw_solution)?;
let xt = client.chain_api().tx().create_signed_with_nonce(
&tx,
&*signer,
nonce,
Default::default(),
)?;
let params = DefaultExtrinsicParamsBuilder::new().nonce(nonce).build();
let xt = client.chain_api().tx().create_signed(&tx, &*signer, params).await?;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed API in subxt

let dry_run_bytes = client.rpc().dry_run(xt.encoded(), config.at).await?;
let dry_run_result = dry_run_bytes.into_dry_run_result(&client.chain_api().metadata())?;

Expand Down
38 changes: 22 additions & 16 deletions src/commands/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use clap::Parser;
use codec::{Decode, Encode};
use frame_election_provider_support::NposSolution;
use futures::future::TryFutureExt;
use jsonrpsee::core::Error as JsonRpseeError;
use jsonrpsee::core::ClientError as JsonRpseeError;
use pallet_election_provider_multi_phase::{RawSolution, SolutionOf};
use sp_runtime::Perbill;
use std::{str::FromStr, sync::Arc};
Expand Down Expand Up @@ -163,7 +163,11 @@ impl FromStr for SubmissionStrategy {
}
}

pub async fn monitor_cmd<T>(client: Client, config: MonitorConfig) -> Result<(), Error>
pub async fn monitor_cmd<T>(
client: Client,
config: MonitorConfig,
signed_phase_length: u64,
) -> Result<(), Error>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
+ Send
Expand Down Expand Up @@ -237,8 +241,15 @@ where
let config2 = config.clone();
let submit_lock2 = submit_lock.clone();
tokio::spawn(async move {
if let Err(err) =
mine_and_submit_solution::<T>(at, client2, signer2, config2, submit_lock2).await
if let Err(err) = mine_and_submit_solution::<T>(
at,
client2,
signer2,
config2,
submit_lock2,
signed_phase_length,
)
.await
{
kill_main_task_if_critical_err(&tx2, err)
}
Expand All @@ -264,6 +275,7 @@ async fn mine_and_submit_solution<T>(
signer: Signer,
config: MonitorConfig,
submit_lock: Arc<Mutex<()>>,
signed_phase_len: u64,
) -> Result<(), Error>
where
T: MinerConfig<AccountId = AccountId, MaxVotesPerVoter = static_types::MaxVotesPerVoter>
Expand Down Expand Up @@ -437,6 +449,7 @@ where
config.listen,
config.dry_run,
&at,
signed_phase_len,
)
.timed()
.await
Expand Down Expand Up @@ -530,26 +543,19 @@ async fn submit_and_watch_solution<T: MinerConfig + Send + Sync + 'static>(
listen: Listen,
dry_run: bool,
at: &Header,
signed_phase_length: u64,
) -> Result<(), Error> {
let tx = epm::signed_solution(RawSolution { solution, score, round })?;

// TODO: https://github.com/paritytech/polkadot-staking-miner/issues/730
//
// The extrinsic mortality length is static and doesn't know when the
// signed phase ends.
let signed_phase_len = client
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check: Instead of fetching the signed phase constant from the chain, we define our own signed phase in the static types.rs. And this is because the constant from the chain wasn't what we'd expect?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the PR description: its because the SignedPhase is removed from westend in preparation for the merkalized metadata.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the SignedPhase is removed from the metadata and it will impact the other chains as well when they update the EPM pallet

.chain_api()
.constants()
.at(&runtime::constants().election_provider_multi_phase().signed_phase())?;
// The extrinsic mortality length is static and it doesn't know when the signed phase ends.
let xt_cfg = DefaultExtrinsicParamsBuilder::default()
.mortal(at, signed_phase_len as u64)
.nonce(nonce)
.mortal(at, signed_phase_length)
.build();

let xt =
client
.chain_api()
.tx()
.create_signed_with_nonce(&tx, &*signer, nonce as u64, xt_cfg)?;
let xt = client.chain_api().tx().create_signed(&tx, &*signer, xt_cfg).await?;

if dry_run {
let dry_run_bytes = client.rpc().dry_run(xt.encoded(), None).await?;
Expand Down
5 changes: 3 additions & 2 deletions src/epm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ use pallet_election_provider_multi_phase::{
unsigned::TrimmingStatus, RawSolution, ReadySolution, SolutionOf, SolutionOrSnapshotSize,
};
use scale_info::{PortableRegistry, TypeInfo};
use scale_value::scale::{decode_as_type, TypeId};
use scale_value::scale::decode_as_type;
use sp_npos_elections::{ElectionScore, VoteWeight};
use subxt::{dynamic::Value, tx::DynamicPayload};

const EPM_PALLET_NAME: &str = "ElectionProviderMultiPhase";

type TypeId = u32;
type MinerVoterOf =
frame_election_provider_support::Voter<AccountId, crate::static_types::MaxVotesPerVoter>;
type RoundSnapshot = pallet_election_provider_multi_phase::RoundSnapshot<AccountId, MinerVoterOf>;
Expand Down Expand Up @@ -512,7 +513,7 @@ fn to_scale_value<T: scale_info::TypeInfo + 'static + Encode>(val: T) -> Result<

let bytes = val.encode();

decode_as_type(&mut bytes.as_ref(), ty_id, &types)
decode_as_type(&mut bytes.as_ref(), &ty_id, &types)
.map(|v| v.remove_context())
.map_err(|e| {
Error::DynamicTransaction(format!(
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum Error {
#[error("I/O error: `{0}`")]
Io(#[from] std::io::Error),
#[error("RPC error: `{0}`")]
RpcError(#[from] jsonrpsee::core::Error),
RpcError(#[from] jsonrpsee::core::ClientError),
#[error("subxt error: `{0}`")]
Subxt(#[from] subxt::Error),
#[error("Crypto error: `{0:?}`")]
Expand Down
2 changes: 1 addition & 1 deletion src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::{error::Error, prelude::*};
use codec::Decode;
use frame_support::weights::Weight;
use jsonrpsee::core::Error as JsonRpseeError;
use jsonrpsee::core::ClientError as JsonRpseeError;
use pin_project_lite::pin_project;
use serde::Deserialize;
use std::{
Expand Down
115 changes: 39 additions & 76 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@ mod signer;
mod static_types;

use clap::Parser;
use codec::Decode;
use error::Error;
use futures::future::{BoxFuture, FutureExt};
use prelude::*;
use std::str::FromStr;
use subxt::backend::rpc::RpcSubscription;
use tokio::sync::oneshot;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -93,17 +91,17 @@ macro_rules! any_runtime {
match $chain {
$crate::opt::Chain::Polkadot => {
#[allow(unused)]
use $crate::static_types::polkadot::MinerConfig;
use $crate::static_types::polkadot::{MinerConfig, SIGNED_PHASE_LENGTH};
$($code)*
},
$crate::opt::Chain::Kusama => {
#[allow(unused)]
use $crate::static_types::kusama::MinerConfig;
use $crate::static_types::kusama::{MinerConfig, SIGNED_PHASE_LENGTH};
$($code)*
},
$crate::opt::Chain::Westend => {
#[allow(unused)]
use $crate::static_types::westend::MinerConfig;
use $crate::static_types::westend::{MinerConfig, SIGNED_PHASE_LENGTH};
$($code)*
},
}
Expand All @@ -130,11 +128,12 @@ async fn main() -> Result<(), Error> {
// Start a new tokio task to perform the runtime updates in the background.
// if this fails then the miner will be stopped and has to be re-started.
let (tx_upgrade, rx_upgrade) = oneshot::channel::<Error>();
tokio::spawn(runtime_upgrade_task(client.clone(), tx_upgrade, runtime_version.spec_version));
tokio::spawn(runtime_upgrade_task(client.chain_api().clone(), tx_upgrade));

let res = any_runtime!(chain, {
let fut = match command {
Command::Monitor(cfg) => commands::monitor_cmd::<MinerConfig>(client, cfg).boxed(),
Command::Monitor(cfg) =>
commands::monitor_cmd::<MinerConfig>(client, cfg, SIGNED_PHASE_LENGTH).boxed(),
Command::DryRun(cfg) => commands::dry_run_cmd::<MinerConfig>(client, cfg).boxed(),
Command::EmergencySolution(cfg) =>
commands::emergency_solution_cmd::<MinerConfig>(client, cfg).boxed(),
Expand Down Expand Up @@ -209,85 +208,49 @@ async fn run_command(
}

/// Runs until the RPC connection fails or updating the metadata failed.
async fn runtime_upgrade_task(client: Client, tx: oneshot::Sender<Error>, mut spec_version: u32) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bug has been fixed in subxt and we can rely on that now :)

use sp_core::storage::StorageChangeSet;

async fn new_update_stream(
client: &Client,
) -> Result<RpcSubscription<StorageChangeSet<Hash>>, subxt::Error> {
use sp_core::Bytes;
use subxt::rpc_params;

let storage_key = Bytes(runtime::storage().system().last_runtime_upgrade().to_root_bytes());

client
.raw_rpc()
.subscribe(
"state_subscribeStorage",
rpc_params![vec![storage_key]],
"state_unsubscribeStorage",
)
.await
}
async fn runtime_upgrade_task(client: ChainClient, tx: oneshot::Sender<Error>) {
let updater = client.updater();

let mut update_stream = match new_update_stream(&client).await {
Ok(s) => s,
let mut update_stream = match updater.runtime_updates().await {
Ok(u) => u,
Err(e) => {
_ = tx.send(e.into());
return;
let _ = tx.send(e.into());
return
},
};

let close_err = loop {
let change_set = match update_stream.next().await {
Some(Ok(changes)) => changes,
Some(Err(err)) => break err.into(),
None => {
update_stream = match new_update_stream(&client).await {
Ok(sub) => sub,
Err(err) => break err.into(),
loop {
// if the runtime upgrade subscription fails then try establish a new one and if it fails quit.
let update = match update_stream.next().await {
Some(Ok(update)) => update,
_ => {
log::warn!(target: LOG_TARGET, "Runtime upgrade subscription failed");
update_stream = match updater.runtime_updates().await {
Ok(u) => u,
Err(e) => {
let _ = tx.send(e.into());
return
},
};
continue;
continue
},
};

let at = change_set.block;
assert!(change_set.changes.len() < 2, "Only one storage change per runtime upgrade");
let Some(bytes) = change_set.changes.get(0).and_then(|v| v.1.clone()) else { continue };
let next: runtime::runtime_types::frame_system::LastRuntimeUpgradeInfo =
match Decode::decode(&mut bytes.0.as_ref()) {
Ok(n) => n,
Err(e) => break e.into(),
};

if next.spec_version > spec_version {
let metadata = match client.rpc().state_get_metadata(Some(at)).await {
Ok(m) => m,
Err(err) => break err.into(),
};

let runtime_version = match client.rpc().state_get_runtime_version(Some(at)).await {
Ok(r) => r,
Err(err) => break err.into(),
};

client.chain_api().set_metadata(metadata);
client.chain_api().set_runtime_version(subxt::backend::RuntimeVersion {
spec_version: runtime_version.spec_version,
transaction_version: runtime_version.transaction_version,
});

spec_version = next.spec_version;
prometheus::on_runtime_upgrade();

log::info!(target: LOG_TARGET, "Runtime upgraded to v{spec_version}");
if let Err(e) = epm::update_metadata_constants(client.chain_api()) {
break e;
}
let version = update.runtime_version().spec_version;
match updater.apply_update(update) {
Ok(()) => {
if let Err(e) = epm::update_metadata_constants(&client) {
let _ = tx.send(e);
return
}
prometheus::on_runtime_upgrade();
log::info!(target: LOG_TARGET, "upgrade to version: {} successful", version);
},
Err(e) => {
log::debug!(target: LOG_TARGET, "upgrade to version: {} failed: {:?}", version, e);
},
}
};

let _ = tx.send(close_err.into());
}
}

#[cfg(test)]
Expand Down
Loading