Skip to content

Commit 8318b81

Browse files
committed
Add post-receive trigger for better packet handling
This adds an optional packet post-receive callback to Endpoint that allows Endpoint to signal the application when it is done processing a set of incoming packets -- either because it has processed them all, or because it hit the max-recv-per-loop limit (64). This is designed to allow an application to more efficiently deal with packets, especially datagrams, in batches: by using this an application can use the datagram handler to collect incoming datagrams, then use the post-receive callback to process accumulated datagrams in one go. Because the post-receive callback always fires immediately *before* libquic goes back to potentially blocking waiting to read new packets, this means the application can do batch processing without needing to resort to timers or polling for packet handling. This is particularly important for Lokinet, where we take the packet in the callback transfer it to a job in the Lokinet main loop thread to process it there: doing this one packet at a time is not likely to scale well because of the sheer number of jobs that would have to be put on the logic thread event queue; by batching them into groups of up to 64 packets at a time we ought to be able to do considerably better, and by triggering the processing based on the post-receive trigger we ensure we don't introduce unnecessary delays in terms of when packets get processed.
1 parent 5eb3bac commit 8318b81

File tree

5 files changed

+136
-8
lines changed

5 files changed

+136
-8
lines changed

include/quic/endpoint.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ namespace oxen::quic
4242
// called when a connection closes or times out before the handshake completes
4343
using connection_closed_callback = std::function<void(connection_interface& conn, uint64_t ec)>;
4444

45+
// Called after we are done reading currently-available UDP packets to allow batch processing of
46+
// incoming data (or any other just-before-potentially-blocking needed handling). First we fire
47+
// off any available callbacks triggered for incoming packets, then just before we go back to
48+
// potentially block waiting for more packets, we fire this to let the application know that
49+
// there might not be more callbacks immediately arriving and so it should process what it has.
50+
using post_receive_callback = std::function<void()>;
51+
4552
class Endpoint : std::enable_shared_from_this<Endpoint>
4653
{
4754
private:
@@ -50,6 +57,7 @@ namespace oxen::quic
5057
void handle_ep_opt(opt::inbound_alpns alpns);
5158
void handle_ep_opt(opt::handshake_timeout timeout);
5259
void handle_ep_opt(dgram_data_callback dgram_cb);
60+
void handle_ep_opt(post_receive_callback post_recv_cb);
5361
void handle_ep_opt(connection_established_callback conn_established_cb);
5462
void handle_ep_opt(connection_closed_callback conn_closed_cb);
5563

@@ -238,6 +246,8 @@ namespace oxen::quic
238246
bool _packet_splitting{false};
239247
Splitting _policy{Splitting::NONE};
240248

249+
post_receive_callback _post_receive{nullptr};
250+
241251
std::shared_ptr<IOContext> outbound_ctx;
242252
std::shared_ptr<IOContext> inbound_ctx;
243253

include/quic/udp.hpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,31 @@ namespace oxen::quic
3838
;
3939

4040
using receive_callback_t = std::function<void(const Packet& pkt)>;
41+
using post_receive_callback_t = std::function<void()>;
4142

4243
UDPSocket() = delete;
4344

4445
/// Constructs a UDP socket bound to the given address. Throws if binding fails. If
4546
/// binding to an any address (or any port) you can retrieve the realized address via
4647
/// address() after construction.
4748
///
48-
/// When packets are received they will be fed into the given callback.
49+
/// When packets are received they will be fed into the given `on_receive` callback.
50+
///
51+
/// The optional `post_receive` callback will be invoked after processing available incoming
52+
/// packets but before returning to polling the socket for additional incoming packets.
53+
/// This is meant to allow the caller to bundle incoming packets into batches without
54+
/// introducing delays: each time one or more packets are read from the socket there will be
55+
/// a sequence of `on_receive(...)` calls for each packet, followed by a `post_receive()`
56+
/// call immediately before the socket returns to waiting for additional packets. Thus a
57+
/// caller can use the `on_receive` callback to collect packets and the `post_receive`
58+
/// callback to process the collected packets all at once.
4959
///
5060
/// ev_loop must outlive this object.
51-
UDPSocket(event_base* ev_loop, const Address& addr, receive_callback_t cb);
61+
UDPSocket(
62+
event_base* ev_loop,
63+
const Address& addr,
64+
receive_callback_t on_receive,
65+
post_receive_callback_t post_receive = nullptr);
5266

5367
/// Non-copyable and non-moveable
5468
UDPSocket(const UDPSocket& s) = delete;
@@ -103,6 +117,8 @@ namespace oxen::quic
103117

104118
event_ptr rev_ = nullptr;
105119
receive_callback_t receive_callback_;
120+
post_receive_callback_t post_receive_;
121+
bool have_received_ = false;
106122
event_ptr wev_ = nullptr;
107123
std::vector<std::function<void()>> writeable_callbacks_;
108124
};

src/endpoint.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ namespace oxen::quic
5656
dgram_recv_cb = std::move(func);
5757
}
5858

59+
void Endpoint::handle_ep_opt(post_receive_callback func)
60+
{
61+
log::trace(log_cat, "Endpoint given post-receive callback");
62+
_post_receive = std::move(func);
63+
}
64+
5965
void Endpoint::handle_ep_opt(connection_established_callback conn_established_cb)
6066
{
6167
log::trace(log_cat, "Endpoint given connection established callback");
@@ -71,8 +77,14 @@ namespace oxen::quic
7177
void Endpoint::_init_internals()
7278
{
7379
log::debug(log_cat, "Starting new UDP socket on {}", _local);
74-
socket =
75-
std::make_unique<UDPSocket>(get_loop().get(), _local, [this](const auto& packet) { handle_packet(packet); });
80+
socket = std::make_unique<UDPSocket>(
81+
get_loop().get(),
82+
_local,
83+
[this](const auto& packet) { handle_packet(packet); },
84+
[this] {
85+
if (_post_receive)
86+
_post_receive();
87+
});
7688

7789
_local = socket->address();
7890

src/udp.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,9 @@ namespace oxen::quic
8585
}
8686
#endif
8787

88-
UDPSocket::UDPSocket(event_base* ev_loop, const Address& addr, receive_callback_t on_receive) :
89-
ev_{ev_loop}, receive_callback_{std::move(on_receive)}
88+
UDPSocket::UDPSocket(
89+
event_base* ev_loop, const Address& addr, receive_callback_t on_receive, post_receive_callback_t post_receive) :
90+
ev_{ev_loop}, receive_callback_{std::move(on_receive)}, post_receive_{std::move(post_receive)}
9091
{
9192
assert(ev_);
9293

@@ -125,7 +126,16 @@ namespace oxen::quic
125126
ev_,
126127
sock_,
127128
EV_READ | EV_PERSIST,
128-
[](evutil_socket_t, short, void* self) { static_cast<UDPSocket*>(self)->receive(); },
129+
[](evutil_socket_t, short, void* self_) {
130+
auto& self = *static_cast<UDPSocket*>(self_);
131+
self.receive();
132+
if (self.have_received_)
133+
{
134+
self.have_received_ = false;
135+
if (self.post_receive_)
136+
self.post_receive_();
137+
}
138+
},
129139
this));
130140
event_add(rev_.get(), nullptr);
131141

@@ -190,6 +200,7 @@ namespace oxen::quic
190200
return;
191201
}
192202

203+
have_received_ = true;
193204
receive_callback_(Packet{bound_, payload, hdr});
194205
}
195206

tests/007-datagrams.cpp

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ namespace oxen::quic::test
333333
good_msg += v++;
334334

335335
for (int i = 0; i < n; ++i)
336-
conn_interface->send_datagram(std::basic_string_view<uint8_t>{good_msg});
336+
conn_interface->send_datagram(good_msg);
337337

338338
for (auto& f : data_futures)
339339
REQUIRE(f.get());
@@ -638,4 +638,83 @@ namespace oxen::quic::test
638638
};
639639
#endif
640640
};
641+
642+
TEST_CASE("007 - Datagram support: packet post-receive triggers", "[007][datagrams][packet-post-receive]")
643+
{
644+
auto client_established = callback_waiter{[](connection_interface&) {}};
645+
646+
Network test_net{};
647+
648+
std::basic_string<std::byte> big_msg{};
649+
650+
for (int v = 0; big_msg.size() < 1000; v++)
651+
big_msg += static_cast<std::byte>(v % 256);
652+
653+
std::atomic<int> recv_counter{0};
654+
std::atomic<int> data_counter{0};
655+
656+
std::promise<void> got_first, acked_first;
657+
std::promise<int> got_all_n_recvs;
658+
659+
dgram_data_callback recv_dgram_cb = [&](dgram_interface&, bstring value) {
660+
auto count = ++data_counter;
661+
CHECK(value == big_msg);
662+
if (count == 1)
663+
{
664+
// We get one datagram, then stall the quic thread so that the test can fire
665+
// multiple packets that we should then receive in one go.
666+
got_first.set_value();
667+
REQUIRE(acked_first.get_future().wait_for(1s) == std::future_status::ready);
668+
}
669+
else if (count == 31)
670+
{
671+
got_all_n_recvs.set_value(recv_counter);
672+
}
673+
};
674+
675+
auto recv_notifier = [&] { recv_counter++; };
676+
677+
opt::local_addr server_local{};
678+
opt::local_addr client_local{};
679+
680+
auto server_tls = GNUTLSCreds::make("./serverkey.pem"s, "./servercert.pem"s, "./clientcert.pem"s);
681+
auto client_tls = GNUTLSCreds::make("./clientkey.pem"s, "./clientcert.pem"s, "./servercert.pem"s);
682+
683+
auto server_endpoint = test_net.endpoint(server_local, recv_dgram_cb, recv_notifier, opt::enable_datagrams{});
684+
REQUIRE_NOTHROW(server_endpoint->listen(server_tls));
685+
686+
opt::remote_addr client_remote{"127.0.0.1"s, server_endpoint->local().port()};
687+
688+
auto client = test_net.endpoint(client_local, client_established, opt::enable_datagrams{});
689+
auto conn = client->connect(client_remote, client_tls);
690+
691+
REQUIRE(client_established.wait());
692+
693+
// Start off with *one* datagram; the first one the server receives will stall the
694+
// server until we signal it via the acked_first promise, during which we'll send a
695+
// bunch more that ought to be processed in a single batch.
696+
conn->send_datagram(big_msg);
697+
698+
REQUIRE(got_first.get_future().wait_for(1s) == std::future_status::ready);
699+
700+
int batches_before_flood = recv_counter;
701+
702+
for (int i = 0; i < 30; i++)
703+
conn->send_datagram(big_msg);
704+
705+
acked_first.set_value();
706+
707+
auto f = got_all_n_recvs.get_future();
708+
REQUIRE(f.wait_for(1s) == std::future_status::ready);
709+
auto recv_counter_before_final = f.get();
710+
REQUIRE(data_counter == 31);
711+
REQUIRE(recv_counter_before_final > batches_before_flood);
712+
// There should be a recv callback fired *immediately* after the data callback that
713+
// fulfilled the above proimise, so a miniscule wait here should guarantee that it has been
714+
// set.
715+
std::this_thread::sleep_for(1ms);
716+
auto final_recv_counter = recv_counter.load();
717+
REQUIRE(final_recv_counter > recv_counter_before_final);
718+
};
719+
641720
} // namespace oxen::quic::test

0 commit comments

Comments
 (0)