Skip to content

multi layer proxy example. #3628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -50,16 +50,7 @@ spmc = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = [
"fs",
"macros",
"net",
"io-std",
"io-util",
"rt",
"rt-multi-thread", # so examples can use #[tokio::main]
"sync",
"time",
"test-util",
"full"
] }
tokio-test = "0.4"
tokio-util = "0.7.10"
@@ -70,10 +61,10 @@ default = []

# Easily turn it all on
full = [
"client",
"http1",
"http2",
"server",
"client",
"http1",
"http2",
"server",
]

# HTTP versions
@@ -108,6 +99,11 @@ incremental = false
codegen-units = 1
incremental = false

[[example]]
name = "multi_layer_proxy"
path = "examples/multi_layer_proxy.rs"
required-features = ["full"]

[[example]]
name = "client"
path = "examples/client.rs"
41 changes: 29 additions & 12 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Examples of using hyper

These examples show how to do common tasks using `hyper`. You may also find the [Guides](https://hyper.rs/guides/1/) helpful.
These examples show how to do common tasks using `hyper`. You may also find the [Guides](https://hyper.rs/guides/1/)
helpful.

If you checkout this repository, you can run any of the examples with the command:

`cargo run --example {example_name} --features="full"`
`cargo run --example {example_name} --features="full"`

### Dependencies

@@ -28,9 +29,11 @@ futures-util = { version = "0.3", default-features = false }

### Clients

* [`client`](client.rs) - A simple CLI http client that requests the url passed in parameters and outputs the response content and details to the stdout, reading content chunk-by-chunk.
* [`client`](client.rs) - A simple CLI http client that requests the url passed in parameters and outputs the response
content and details to the stdout, reading content chunk-by-chunk.

* [`client_json`](client_json.rs) - A simple program that GETs some json, reads the body asynchronously, parses it with serde and outputs the result.
* [`client_json`](client_json.rs) - A simple program that GETs some json, reads the body asynchronously, parses it with
serde and outputs the result.

### Servers

@@ -42,22 +45,36 @@ futures-util = { version = "0.3", default-features = false }

* [`gateway`](gateway.rs) - A server gateway (reverse proxy) that proxies to the `hello` service above.

* [`graceful_shutdown`](graceful_shutdown.rs) - A server that has a timeout for incoming connections and does graceful connection shutdown.
* [`graceful_shutdown`](graceful_shutdown.rs) - A server that has a timeout for incoming connections and does graceful
connection shutdown.

* [`http_proxy`](http_proxy.rs) - A simple HTTP(S) proxy that handle and upgrade `CONNECT` requests and then proxy data between client and remote server.
* [`http_proxy`](http_proxy.rs) - A simple HTTP(S) proxy that handle and upgrade `CONNECT` requests and then proxy data
between client and remote server.

* [`multi_server`](multi_server.rs) - A server that listens to two different ports, a different `Service` per port.

* [`params`](params.rs) - A webserver that accept a form, with a name and a number, checks the parameters are presents and validates the input.
* [`params`](params.rs) - A webserver that accept a form, with a name and a number, checks the parameters are presents
and validates the input.

* [`send_file`](send_file.rs) - A server that sends back content of files using tokio-util to read the files asynchronously.
* [`send_file`](send_file.rs) - A server that sends back content of files using tokio-util to read the files
asynchronously.

* [`service_struct_impl`](service_struct_impl.rs) - A struct that manually implements the `Service` trait and uses a shared counter across requests.
* [`service_struct_impl`](service_struct_impl.rs) - A struct that manually implements the `Service` trait and uses a
shared counter across requests.

* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter).
* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (
like an `Rc` counter).

* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count.
* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for
every request, and every response is sent the last count.

* [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets).

* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the response in uppercase and a service that calls the first service and includes the first service response in its own response.
* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the
response in uppercase and a service that calls the first service and includes the first service response in its own
response.

* [`multi_layer_proxy`](multi_layer_proxy.rs) - In this configuration we have a `public` `master` server, which accepts
**outgoing** connections from `endpoint` servers.
The reason for using outgoing connections, is to avoid the need to open firewall ports.
The `master` will receive requests and forward them one of the servers connected to it.
340 changes: 340 additions & 0 deletions examples/multi_layer_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
use futures_util::future::join_all;
use std::net::SocketAddr;
use tokio::net::TcpListener;

#[path = "../benches/support/mod.rs"]
mod support;
use support::TokioIo;

pub mod helpers {
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty, Full};

pub fn host_addr(uri: &http::Uri) -> Option<String> {
uri.authority().and_then(|auth| Some(auth.to_string()))
}

pub fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

pub fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
}

pub mod proxy_endpoint {
use super::helpers::{empty, full, host_addr};
use super::TokioIo;
use bytes::Bytes;
use http::header;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::client::conn::http1::Builder;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::{http, Method, Request, Response};
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::net::TcpStream;

pub async fn proxy_endpoint_main() -> Result<(), Box<dyn std::error::Error>> {
let addr = SocketAddr::from_str(format!("{}:{}", "127.0.0.1", "5000").as_str())
.expect("Failed to parse address");
while let Ok(stream) = TcpStream::connect(addr).await {
println!("Connected to {}", addr);
let (mut send_request, conn) = Builder::new().handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn.with_upgrades());
let req = Request::builder()
.method(Method::CONNECT)
.uri(addr.to_string())
.header(header::UPGRADE, "")
.header("custom-header", "")
.body(empty())
.unwrap();
let res = send_request.send_request(req).await?;
let stream = hyper::upgrade::on(res).await?;

if let Err(err) = http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(stream, service_fn(proxy))
.with_upgrades()
.await
{
println!("Failed to serve connection: {:?}", err);
}
}
Ok(())
}

async fn proxy(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
println!("req: {:?}", req);

if Method::CONNECT == req.method() {
if let Some(addr) = host_addr(req.uri()) {
tokio::task::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
if let Err(e) = tunnel(upgraded, addr).await {
println!("server io error: {}", e);
};
}
Err(e) => println!("upgrade error: {}", e),
}
});

Ok(Response::new(empty()))
} else {
println!("CONNECT host is not socket addr: {:?}", req.uri());
let mut resp = Response::new(full("CONNECT must be to a socket address"));
*resp.status_mut() = http::StatusCode::BAD_REQUEST;

Ok(resp)
}
} else {
let host = req.uri().host().expect("uri has no host");
let port = req.uri().port_u16().unwrap_or(80);

let stream = TcpStream::connect((host, port)).await.unwrap();
let io = TokioIo::new(stream);

let (mut sender, conn) = Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.handshake(io)
.await?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});

let resp = sender.send_request(req).await?;
Ok(resp.map(|b| b.boxed()))
}
}

async fn tunnel(upgraded: Upgraded, addr: String) -> std::io::Result<()> {
let mut server = TcpStream::connect(addr.clone()).await?;
let mut upgraded = TokioIo::new(upgraded);
let (from_client, from_server) =
tokio::io::copy_bidirectional(&mut upgraded, &mut server).await?;
println!(
"proxy_endpoint => from_client = {} | from_server = {}",
from_client, from_server
);
Ok(())
}
}

pub mod proxy_master {
pub mod proxy_pool {
use hyper::upgrade::Upgraded;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Debug, Clone, Default)]
pub struct ProxyPool {
pool: Arc<Mutex<Vec<Upgraded>>>,
}

impl ProxyPool {
pub async fn put(&self, stream: Upgraded) {
self.pool.lock().await.push(stream);
}

pub async fn get(&self) -> Option<Upgraded> {
let mut lock = self.pool.lock().await;

// We have all proxy connection now, so we can pick any of them by arbitrary condition

// Just pop the last one for example
lock.pop()
}
}
}

pub mod proxy_endpoint {
use super::super::helpers::empty;
use super::super::TokioIo;
use super::proxy_pool::ProxyPool;
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use hyper::server;
use hyper::service::service_fn;
use hyper::{Method, Request, Response};
use tokio::net::TcpListener;

pub async fn listen_for_proxies_connecting(
pool: ProxyPool,
proxy_listener: TcpListener,
) -> () {
while let Ok((stream, addr)) = proxy_listener.accept().await {
let pool = pool.clone();
tokio::spawn(async move {
if let Err(err) = server::conn::http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(
TokioIo::new(stream),
service_fn(move |req| handle_proxy_request(pool.clone(), req)),
)
.with_upgrades()
.await
{
println!("Failed to serve connection from addr {:?}: {:?}", addr, err);
}
});
}
}

async fn handle_proxy_request(
pool: ProxyPool,
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
if Method::CONNECT == req.method() {
// Received an HTTP request like:
// ```
// CONNECT www.domain.com:443 HTTP/1.1
// Host: www.domain.com:443
// Proxy-Connection: Keep-Alive
// ```
//
// When HTTP method is CONNECT we should return an empty body
// then we can eventually upgrade the connection and talk a new protocol.
//
// Note: only after client received an empty body with STATUS_OK can the
// connection be upgraded, so we can't return a response inside
// `on_upgrade` future.
tokio::spawn(async move {
match hyper::upgrade::on(req).await {
Ok(upgraded) => {
// We can put proxy along with req here
pool.put(upgraded).await;
}
Err(e) => println!("upgrade error: {}", e),
}
});
Ok(Response::new(empty()))
} else {
// TODO : Process request - can register proxy here
println!("NOT CONNECT request");
Ok(Response::new(empty()))
}
}
}

pub mod clients_endpoint {
use super::super::helpers::empty;
use super::super::TokioIo;
use super::proxy_pool::ProxyPool;
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use hyper::service::service_fn;
use hyper::{client, server, Method, Request, Response};
use tokio::io::copy_bidirectional;
use tokio::net::TcpListener;

pub async fn listen_for_clients_connecting(pool: ProxyPool, client_listener: TcpListener) {
while let Ok((stream, addr)) = client_listener.accept().await {
let pool = pool.clone();
tokio::spawn(async move {
if let Err(err) = server::conn::http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(
TokioIo::new(stream),
service_fn(move |req| handle_client_request(pool.clone(), req)),
)
.with_upgrades()
.await
{
println!("Failed to serve connection from addr {:?}: {:?}", addr, err);
}
});
}
}

async fn handle_client_request(
pool: ProxyPool,
mut req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
if Method::CONNECT == req.method() {
tokio::spawn(async move {
match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => {
let proxy = pool.get().await.unwrap();
let (mut send_request, conn) =
client::conn::http1::Builder::new().handshake(proxy).await?;
tokio::spawn(conn.with_upgrades());
let res = send_request.send_request(req).await?;
let stream = hyper::upgrade::on(res).await?;
let (from_client, from_server) = copy_bidirectional(
&mut TokioIo::new(upgraded),
&mut TokioIo::new(stream),
)
.await
.unwrap();
println!(
"proxy_master => from_client = {} | from_server = {}",
from_client, from_server
);
}
Err(e) => println!("upgrade error = {}", e),
}
Ok::<(), hyper::Error>(())
});
Ok(Response::new(empty()))
} else {
Ok(Response::new(empty()))
}
}
}
}

#[tokio::main]
async fn main() {
let pool = proxy_master::proxy_pool::ProxyPool::default();
let addr_proxies = SocketAddr::from(([127, 0, 0, 1], 5000));
let proxy_listener = TcpListener::bind(addr_proxies).await.unwrap();
println!("Listening on for proxies on: {}", addr_proxies);
let addr_clients = SocketAddr::from(([127, 0, 0, 1], 4000));
let client_listener = TcpListener::bind(addr_clients).await.unwrap();
println!("Listening on for clients on: {}", addr_clients);

let proxy_listener_pool = pool.clone();

let proxy_endpoint_main_task = tokio::task::spawn(async move {
proxy_endpoint::proxy_endpoint_main().await.unwrap();
});

let proxy_listener_task = tokio::task::spawn(async move {
proxy_master::proxy_endpoint::listen_for_proxies_connecting(
proxy_listener_pool,
proxy_listener,
)
.await
});
let proxy_listener_pool = pool.clone();
let clients_listener_task = tokio::task::spawn(async move {
proxy_master::clients_endpoint::listen_for_clients_connecting(
proxy_listener_pool,
client_listener,
)
.await;
});
let _ = join_all(vec![
proxy_listener_task,
clients_listener_task,
proxy_endpoint_main_task,
])
.await;
}