Skip to content
This repository was archived by the owner on Oct 13, 2023. It is now read-only.

Initial TCP socket host implementation. #113

Merged
merged 9 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 65 additions & 66 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ edition = "2021"
anyhow = "1.0.22"
thiserror = "1.0.15"
tracing = "0.1.26"
cap-std = "1.0.5"
cap-rand = "1.0.5"
cap-fs-ext = "1.0.5"
cap-std = "1.0.6"
cap-rand = "1.0.6"
cap-fs-ext = "1.0.6"
bitflags = "1.2"
windows-sys = "0.45.0"
rustix = "0.36.0"
Expand All @@ -27,15 +27,16 @@ wasi-common = { path = "wasi-common" }
once_cell = "1.12.0"
system-interface = { version = "0.25.1", features = ["cap_std_impls"] }
wit-bindgen = { version = "0.4.0", default-features = false }
ipnet = "2" # TODO: Move to cap_std::ipnet instead, when that's released.

[dependencies]
wasi = { version = "0.11.0", default-features = false }
wit-bindgen = { workspace = true, features = ["macros"] }
byte-array = { path = "byte-array" }

[build-dependencies]
wasm-encoder = "0.24"
object = { version = "0.29.0", default-features = false, features = ["archive"] }
wasm-encoder = "0.25"
object = { version = "0.30.0", default-features = false, features = ["archive"] }

[lib]
crate-type = ["cdylib"]
Expand Down
2 changes: 1 addition & 1 deletion host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ cap-std = { workspace = true }
cap-rand = { workspace = true }
tokio = { version = "1.22.0", features = [ "rt", "macros" ] }
tracing = { workspace = true }
wasmtime = { git = "https://github.com/bytecodealliance/wasmtime", rev = "8d3a881b524d56498dfc71b65818b2abbeb1da44", features = ["component-model"] }
wasmtime = { git = "https://github.com/bytecodealliance/wasmtime", rev = "5ae8575296d5b524cde42ad10badf8c89945105a", features = ["component-model"] }
wasi-common = { path = "../wasi-common" }
wasi-cap-std-sync = { path = "../wasi-common/cap-std-sync" }
clap = { version = "4.1.9", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions host/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub fn add_to_linker<T: Send>(
wasi::streams::add_to_linker(l, f)?;
wasi::random::add_to_linker(l, f)?;
wasi::tcp::add_to_linker(l, f)?;
wasi::tcp_create_socket::add_to_linker(l, f)?;
wasi::udp::add_to_linker(l, f)?;
wasi::udp_create_socket::add_to_linker(l, f)?;
wasi::ip_name_lookup::add_to_linker(l, f)?;
wasi::instance_network::add_to_linker(l, f)?;
wasi::network::add_to_linker(l, f)?;
Expand Down
6 changes: 5 additions & 1 deletion host/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ async fn read(
.get_input_stream_mut(stream)
.map_err(convert)?;

let mut buffer = vec![0; len.try_into().unwrap()];
// Len could be any `u64` value, but we don't want to
// allocate too much up front, so make a wild guess
// of an upper bound for the buffer size.
let buffer_len = std::cmp::min(len, 0x400000) as _;
let mut buffer = vec![0; buffer_len];

let (bytes_read, end) = s.read(&mut buffer).await.map_err(convert)?;

Expand Down
18 changes: 13 additions & 5 deletions host/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ struct Args {
/// Filesystem path of a component
component: String,

/// Command-line arguments
args: Vec<String>,

/// Name of the world to load it in.
#[arg(long, default_value_t = String::from("command"))]
world: String,
Expand All @@ -35,9 +38,9 @@ async fn main() -> Result<()> {
let mut linker = Linker::new(&engine);

if args.world == "command" {
run_command(&mut linker, &engine, &component).await?;
run_command(&mut linker, &engine, &component, &args.args).await?;
} else if args.world == "proxy" {
run_proxy(&mut linker, &engine, &component).await?;
run_proxy(&mut linker, &engine, &component, &args.args).await?;
}

Ok(())
Expand All @@ -47,20 +50,23 @@ async fn run_command(
linker: &mut Linker<WasiCtx>,
engine: &Engine,
component: &Component,
args: &[String],
) -> anyhow::Result<()> {
command::add_to_linker(linker, |x| x)?;

let mut store = Store::new(
engine,
WasiCtxBuilder::new()
.inherit_stdin()
.inherit_stdout()
.inherit_stdio()
.inherit_network()
.build(),
);

let (wasi, _instance) = Command::instantiate_async(&mut store, component, linker).await?;

let result: Result<(), ()> = wasi.call_main(&mut store, 0, 1, 2, &[], &[]).await?;
let mut main_args: Vec<&str> = vec!["wasm"];
main_args.extend(args.iter().map(String::as_str));
let result: Result<(), ()> = wasi.call_main(&mut store, 0, 1, 2, &main_args, &[]).await?;

if result.is_err() {
anyhow::bail!("command returned with failing exit status");
Expand All @@ -73,6 +79,7 @@ async fn run_proxy(
linker: &mut Linker<WasiCtx>,
engine: &Engine,
component: &Component,
args: &[String],
) -> anyhow::Result<()> {
proxy::add_to_linker(linker, |x| x)?;

Expand All @@ -82,6 +89,7 @@ async fn run_proxy(

// TODO: do something
let _ = wasi;
let _ = args;
let result: Result<(), ()> = Ok(());

if result.is_err() {
Expand Down
16 changes: 12 additions & 4 deletions host/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@ use crate::{
command::wasi::network::{self, Network},
WasiCtx,
};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use cap_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use wasi_common::WasiNetwork;

pub(crate) fn convert(_error: wasi_common::Error) -> anyhow::Error {
todo!("convert wasi-common Error to wasi_network::Error")
}

#[async_trait::async_trait]
impl network::Host for WasiCtx {
async fn drop_network(&mut self, _network: Network) -> anyhow::Result<()> {
todo!()
async fn drop_network(&mut self, this: Network) -> anyhow::Result<()> {
let table = self.table_mut();
if !table.delete::<Box<dyn WasiNetwork>>(this).is_ok() {
anyhow::bail!("{this} is not a network");
}
Ok(())
}
}

#[async_trait::async_trait]
impl instance_network::Host for WasiCtx {
async fn instance_network(&mut self) -> anyhow::Result<Network> {
todo!()
let network = (self.network_creator)(self.pool.clone())?;
let table = self.table_mut();
let network = table.push(Box::new(network)).map_err(convert)?;
Ok(network)
}
}

Expand Down
22 changes: 14 additions & 8 deletions host/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use crate::{
command::wasi::monotonic_clock::{Instant, MonotonicClock},
command::wasi::poll::Pollable,
command::wasi::streams::{InputStream, OutputStream, StreamError},
command::wasi::tcp::TcpSocket,
proxy, WasiCtx,
};
use wasi_common::clocks::TableMonotonicClockExt;
use wasi_common::stream::TableStreamExt;
use wasi_common::tcp_socket::TableTcpSocketExt;

fn convert(error: wasi_common::Error) -> anyhow::Error {
if let Some(_errno) = error.downcast_ref() {
Expand All @@ -25,6 +27,8 @@ pub(crate) enum PollableEntry {
Write(OutputStream),
/// Poll for a monotonic-clock timer.
MonotonicClock(MonotonicClock, Instant, bool),
/// Poll for a tcp-socket.
TcpSocket(TcpSocket),
}

async fn drop_pollable(ctx: &mut WasiCtx, pollable: Pollable) -> anyhow::Result<()> {
Expand All @@ -41,25 +45,27 @@ async fn poll_oneoff(ctx: &mut WasiCtx, futures: Vec<Pollable>) -> anyhow::Resul
let mut poll = Poll::new();
let len = futures.len();
for (index, future) in futures.into_iter().enumerate() {
let userdata = Userdata::from(index as u64);

match *ctx.table().get(future).map_err(convert)? {
PollableEntry::Read(stream) => {
let wasi_stream: &dyn wasi_common::InputStream =
ctx.table().get_input_stream(stream).map_err(convert)?;
poll.subscribe_read(wasi_stream, Userdata::from(index as u64));
poll.subscribe_read(wasi_stream, userdata);
}
PollableEntry::Write(stream) => {
let wasi_stream: &dyn wasi_common::OutputStream =
ctx.table().get_output_stream(stream).map_err(convert)?;
poll.subscribe_write(wasi_stream, Userdata::from(index as u64));
poll.subscribe_write(wasi_stream, userdata);
}
PollableEntry::MonotonicClock(clock, when, absolute) => {
let wasi_clock = ctx.table().get_monotonic_clock(clock).map_err(convert)?;
poll.subscribe_monotonic_clock(
wasi_clock,
when,
absolute,
Userdata::from(index as u64),
);
poll.subscribe_monotonic_clock(wasi_clock, when, absolute, userdata);
}
PollableEntry::TcpSocket(tcp_socket) => {
let wasi_tcp_socket: &dyn wasi_common::WasiTcpSocket =
ctx.table().get_tcp_socket(tcp_socket).map_err(convert)?;
poll.subscribe_tcp_socket(wasi_tcp_socket, userdata);
}
}
}
Expand Down
Loading