1515// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
1616
1717use crate :: prelude:: LOG_TARGET ;
18- use futures:: channel:: oneshot;
1918pub use hidden:: * ;
20- use hyper:: {
21- header:: CONTENT_TYPE ,
22- service:: { make_service_fn, service_fn} ,
23- Body , Method , Request , Response ,
19+ use http_body_util:: Full ;
20+ use hyper:: { body:: Bytes , header:: CONTENT_TYPE , service:: service_fn, Method , Request , Response } ;
21+ use hyper_util:: {
22+ rt:: { TokioExecutor , TokioIo } ,
23+ server:: { conn:: auto:: Builder , graceful:: GracefulShutdown } ,
2424} ;
2525use prometheus:: { Encoder , TextEncoder } ;
26+ use std:: net:: SocketAddr ;
27+ use tokio:: net:: TcpListener ;
2628
27- async fn serve_req ( req : Request < Body > ) -> Result < Response < Body > , hyper:: Error > {
29+ type Body = Full < Bytes > ;
30+
31+ async fn serve_req ( req : Request < hyper:: body:: Incoming > ) -> Result < Response < Body > , hyper:: Error > {
2832 let response = match ( req. method ( ) , req. uri ( ) . path ( ) ) {
2933 ( & Method :: GET , "/metrics" ) => {
3034 let mut buffer = vec ! [ ] ;
@@ -51,43 +55,55 @@ async fn serve_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
5155 Ok ( response)
5256}
5357
54- pub struct GracefulShutdown ( Option < oneshot:: Sender < ( ) > > ) ;
55-
56- impl Drop for GracefulShutdown {
57- fn drop ( & mut self ) {
58- if let Some ( handle) = self . 0 . take ( ) {
59- let _ = handle. send ( ( ) ) ;
60- }
61- }
62- }
63-
64- pub fn run ( port : u16 ) -> Result < GracefulShutdown , String > {
65- let ( tx, rx) = oneshot:: channel ( ) ;
66-
67- // For every connection, we must make a `Service` to handle all incoming HTTP requests on said
68- // connection.
69- let make_svc = make_service_fn ( move |_conn| async move {
70- Ok :: < _ , std:: convert:: Infallible > ( service_fn ( serve_req) )
71- } ) ;
58+ pub async fn run ( port : u16 ) -> Result < ( ) , String > {
59+ // Create the address to bind to
60+ let addr = SocketAddr :: from ( ( [ 0 , 0 , 0 , 0 ] , port) ) ;
7261
73- let addr = ( [ 0 , 0 , 0 , 0 ] , port ) . into ( ) ;
74- let server = hyper :: Server :: try_bind ( & addr)
75- . map_err ( |e| format ! ( "Failed bind socket on port {} {:?}" , port , e ) ) ?
76- . serve ( make_svc ) ;
62+ // Bind the TCP listener
63+ let listener = TcpListener :: bind ( & addr)
64+ . await
65+ . map_err ( |e| format ! ( "Failed to bind socket on port {}: {:?}" , port , e ) ) ? ;
7766
7867 log:: info!( target: LOG_TARGET , "Started prometheus endpoint on http://{}" , addr) ;
7968
80- let graceful = server. with_graceful_shutdown ( async {
81- rx. await . ok ( ) ;
82- } ) ;
69+ // Create a graceful shutdown handler
70+ let graceful = GracefulShutdown :: new ( ) ;
8371
72+ // Spawn the server task
8473 tokio:: spawn ( async move {
85- if let Err ( e) = graceful. await {
86- log:: warn!( "Server error: {}" , e) ;
74+ let executor = TokioExecutor :: new ( ) ;
75+ let server = Builder :: new ( executor) ;
76+
77+ loop {
78+ match listener. accept ( ) . await {
79+ Ok ( ( stream, _) ) => {
80+ let io = TokioIo :: new ( stream) ;
81+
82+ // Create a service for this connection
83+ let service = service_fn ( serve_req) ;
84+
85+ // Serve the connection with graceful shutdown
86+ let conn = server
87+ . serve_connection_with_upgrades ( io, service)
88+ . into_owned ( ) ;
89+
90+ let conn = graceful. watch ( conn) ;
91+
92+ tokio:: spawn ( async move {
93+ if let Err ( err) = conn. await {
94+ log:: debug!( target: LOG_TARGET , "connection error: {:?}" , err) ;
95+ }
96+ } ) ;
97+ }
98+ Err ( e) => {
99+ log:: debug!( target: LOG_TARGET , "Error accepting connection: {:?}" , e) ;
100+ continue ;
101+ }
102+ }
87103 }
88104 } ) ;
89105
90- Ok ( GracefulShutdown ( Some ( tx ) ) )
106+ Ok ( ( ) )
91107}
92108
93109mod hidden {
@@ -135,9 +151,9 @@ mod hidden {
135151 } ) ;
136152 static SUBMIT_SOLUTION_AND_WATCH_DURATION : Lazy < Gauge > = Lazy :: new ( || {
137153 register_gauge ! (
138- "staking_miner_submit_and_watch_duration_ms" ,
139- "The time in milliseconds it took to submit the solution to chain and to be included in block" ,
140- )
154+ "staking_miner_submit_and_watch_duration_ms" ,
155+ "The time in milliseconds it took to submit the solution to chain and to be included in block" ,
156+ )
141157 . unwrap ( )
142158 } ) ;
143159 static BALANCE : Lazy < Gauge > = Lazy :: new ( || {
0 commit comments