Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 40 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ polkadot-sdk = { git = "https://github.com/paritytech/polkadot-sdk", features =

# prometheus
prometheus = "0.14"
hyper = { version = "0.14.32", features = ["server", "http1", "http2", "tcp"] }
hyper = { version = "1.6.0", features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1.11", features = [
"server-auto",
"server-graceful",
"tokio",
] }
http-body-util = "0.1.0"
once_cell = "1.21"

[dev-dependencies]
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ async fn main() -> Result<(), Error> {
let runtime_version: RuntimeVersion =
client.rpc().state_get_runtime_version(None).await?.into();
let chain = opt::Chain::from_str(&runtime_version.spec_name)?;
let _prometheus_handle = prometheus::run(prometheus_port)
.map_err(|e| log::warn!("Failed to start prometheus endpoint: {}", e));
if let Err(e) = prometheus::run(prometheus_port).await {
log::warn!("Failed to start prometheus endpoint: {}", e);
}
log::info!(target: LOG_TARGET, "Connected to chain: {}", chain);

let is_legacy = !matches!(command, Command::ExperimentalMonitorMultiBlock(_));
Expand Down
90 changes: 53 additions & 37 deletions src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prelude::LOG_TARGET;
use futures::channel::oneshot;
pub use hidden::*;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response,
use http_body_util::Full;
use hyper::{body::Bytes, header::CONTENT_TYPE, service::service_fn, Method, Request, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::{conn::auto::Builder, graceful::GracefulShutdown},
};
use prometheus::{Encoder, TextEncoder};
use std::net::SocketAddr;
use tokio::net::TcpListener;

async fn serve_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
type Body = Full<Bytes>;

async fn serve_req(req: Request<hyper::body::Incoming>) -> Result<Response<Body>, hyper::Error> {
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
Expand All @@ -51,43 +55,55 @@ async fn serve_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(response)
}

pub struct GracefulShutdown(Option<oneshot::Sender<()>>);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to remove this because we never stop the server ourselves just needless complexity.

I think did this change in polkadot-sdk as well :P


impl Drop for GracefulShutdown {
fn drop(&mut self) {
if let Some(handle) = self.0.take() {
let _ = handle.send(());
}
}
}

pub fn run(port: u16) -> Result<GracefulShutdown, String> {
let (tx, rx) = oneshot::channel();

// For every connection, we must make a `Service` to handle all incoming HTTP requests on said
// connection.
let make_svc = make_service_fn(move |_conn| async move {
Ok::<_, std::convert::Infallible>(service_fn(serve_req))
});
pub async fn run(port: u16) -> Result<(), String> {
// Create the address to bind to
let addr = SocketAddr::from(([0, 0, 0, 0], port));

let addr = ([0, 0, 0, 0], port).into();
let server = hyper::Server::try_bind(&addr)
.map_err(|e| format!("Failed bind socket on port {} {:?}", port, e))?
.serve(make_svc);
// Bind the TCP listener
let listener = TcpListener::bind(&addr)
.await
.map_err(|e| format!("Failed to bind socket on port {}: {:?}", port, e))?;

log::info!(target: LOG_TARGET, "Started prometheus endpoint on http://{}", addr);

let graceful = server.with_graceful_shutdown(async {
rx.await.ok();
});
// Create a graceful shutdown handler
let graceful = GracefulShutdown::new();

// Spawn the server task
tokio::spawn(async move {
if let Err(e) = graceful.await {
log::warn!("Server error: {}", e);
let executor = TokioExecutor::new();
let server = Builder::new(executor);

loop {
match listener.accept().await {
Ok((stream, _)) => {
let io = TokioIo::new(stream);

// Create a service for this connection
let service = service_fn(serve_req);

// Serve the connection with graceful shutdown
let conn = server
.serve_connection_with_upgrades(io, service)
.into_owned();

let conn = graceful.watch(conn);

tokio::spawn(async move {
if let Err(err) = conn.await {
log::debug!(target: LOG_TARGET, "connection error: {:?}", err);
}
});
}
Err(e) => {
log::debug!(target: LOG_TARGET, "Error accepting connection: {:?}", e);
continue;
}
}
}
});

Ok(GracefulShutdown(Some(tx)))
Ok(())
}

mod hidden {
Expand Down Expand Up @@ -135,9 +151,9 @@ mod hidden {
});
static SUBMIT_SOLUTION_AND_WATCH_DURATION: Lazy<Gauge> = Lazy::new(|| {
register_gauge!(
"staking_miner_submit_and_watch_duration_ms",
"The time in milliseconds it took to submit the solution to chain and to be included in block",
)
"staking_miner_submit_and_watch_duration_ms",
"The time in milliseconds it took to submit the solution to chain and to be included in block",
)
.unwrap()
});
static BALANCE: Lazy<Gauge> = Lazy::new(|| {
Expand Down
Loading