Skip to content

Commit 913bec3

Browse files
committed
Callbacks on remote read/write shutdown
- Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream shuts down reading and/or writing - This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of the hooks themselves!
1 parent 2b080e0 commit 913bec3

File tree

7 files changed

+181
-10
lines changed

7 files changed

+181
-10
lines changed

include/oxen/quic/connection.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ namespace oxen::quic
490490
int stream_ack(int64_t id, size_t size);
491491
int stream_receive(int64_t id, bstring_view data, bool fin);
492492
void stream_execute_close(Stream& s, uint64_t app_code);
493+
void stream_reset(int64_t id, uint64_t app_code);
493494
void stream_closed(int64_t id, uint64_t app_code);
494495
void close_all_streams();
495496
void check_pending_streams(uint64_t available);

include/oxen/quic/error.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ namespace oxen::quic
2020
// Application error code we close with if the stream data handle throws
2121
inline constexpr uint64_t STREAM_ERROR_EXCEPTION = ERROR_BASE + 100;
2222

23+
// Application error code for signalling a remote shut down stream reading
24+
inline constexpr uint64_t STREAM_REMOTE_READ_SHUTDOWN = ERROR_BASE + 101;
25+
26+
// Application error code for signalling a remote shut down stream writing
27+
inline constexpr uint64_t STREAM_REMOTE_WRITE_SHUTDOWN = ERROR_BASE + 102;
28+
2329
// Application error if a bt request stream handle throws an exception
2430
inline constexpr uint64_t BPARSER_ERROR_EXCEPTION = ERROR_BASE + 105;
2531

include/oxen/quic/opt.hpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,5 +202,42 @@ namespace oxen::quic
202202
_hook = nullptr;
203203
}
204204
};
205+
206+
// Used to provide callbacks for remote stream reset. Application can pass one or both callbacks to indicate what
207+
// logic should be executed when the remote shuts down stream reading or writing. The signature of `on_reset_hook_t`
208+
// matches that of other hooks, so we wrap it in an opt struct to differentiate and to structure access.
209+
struct remote_stream_reset
210+
{
211+
using on_reset_hook_t = std::function<void(Stream&, uint64_t)>;
212+
213+
private:
214+
on_reset_hook_t _on_read_reset = nullptr;
215+
on_reset_hook_t _on_write_reset = nullptr;
216+
217+
public:
218+
remote_stream_reset() = default;
219+
220+
explicit remote_stream_reset(on_reset_hook_t _on_read, on_reset_hook_t _on_write = nullptr) :
221+
_on_read_reset{std::move(_on_read)}, _on_write_reset{std::move(_on_write)}
222+
{
223+
if (not _on_read_reset and not _on_write_reset)
224+
throw std::invalid_argument{"Must set at least one of `on_read_reset` and `on_write_reset`!"};
225+
}
226+
227+
explicit operator bool() const { return has_read_hook() and has_write_hook(); }
228+
229+
void clear()
230+
{
231+
_on_read_reset = nullptr;
232+
_on_write_reset = nullptr;
233+
}
234+
235+
bool has_read_hook() const { return _on_read_reset != nullptr; }
236+
bool has_write_hook() const { return _on_write_reset != nullptr; }
237+
238+
void read_reset(Stream& s, uint64_t ec) { return _on_read_reset(s, ec); }
239+
void write_reset(Stream& s, uint64_t ec) { return _on_write_reset(s, ec); }
240+
};
241+
205242
} // namespace opt
206243
} // namespace oxen::quic

include/oxen/quic/stream.hpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,26 @@ namespace oxen::quic
9191

9292
bool is_paused() const;
9393

94-
bool is_reading() const;
94+
/** Remote Stream Reset:
95+
- Applications can call `::set_remote_reset_hooks(...)` to emplace logic to be executed when the remote stream
96+
shuts down reading and/or writing
97+
- This only happens once per lifetime of the stream; as a result, do NOT set more hooks while inside the body of
98+
the hooks themselves!
99+
*/
100+
void set_remote_reset_hooks(opt::remote_stream_reset hooks);
95101

96-
bool is_writing() const;
102+
void clear_remote_reset_hooks();
103+
104+
bool has_remote_reset_hooks() const;
97105

98106
void stop_reading();
99107

100108
void stop_writing();
101109

110+
bool is_reading() const;
111+
112+
bool is_writing() const;
113+
102114
// These public methods are synchronized so that they can be safely called from outside the
103115
// libquic main loop thread.
104116
bool available() const;
@@ -168,6 +180,10 @@ namespace oxen::quic
168180
opt::watermark _high_water;
169181
opt::watermark _low_water;
170182

183+
opt::remote_stream_reset _remote_reset;
184+
185+
bool _in_reset{false};
186+
171187
bool _is_reading{true};
172188
bool _is_writing{true};
173189

src/connection.cpp

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ namespace oxen::quic
124124
void* /*stream_user_data*/)
125125
{
126126
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
127-
static_cast<Connection*>(user_data)->stream_closed(stream_id, app_error_code);
127+
static_cast<Connection*>(user_data)->stream_reset(stream_id, app_error_code);
128128
return 0;
129129
}
130130

@@ -1171,6 +1171,51 @@ namespace oxen::quic
11711171
}
11721172
}
11731173

1174+
void Connection::stream_reset(int64_t id, uint64_t app_code)
1175+
{
1176+
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
1177+
assert(ngtcp2_is_bidi_stream(id));
1178+
auto it = _streams.find(id);
1179+
1180+
if (it == _streams.end())
1181+
return;
1182+
1183+
auto& stream = it->second;
1184+
1185+
switch (app_code)
1186+
{
1187+
case STREAM_REMOTE_READ_SHUTDOWN:
1188+
log::debug(log_cat, "Stream (ID:{}) received remote read shutdown signal!", id);
1189+
1190+
if (stream->_remote_reset.has_read_hook())
1191+
{
1192+
log::debug(log_cat, "Invoking remote_read_reset hook...");
1193+
stream->_in_reset = true;
1194+
stream->_remote_reset.read_reset(*stream.get(), app_code);
1195+
stream->_in_reset = false;
1196+
}
1197+
1198+
break;
1199+
1200+
case STREAM_REMOTE_WRITE_SHUTDOWN:
1201+
log::debug(log_cat, "Stream (ID:{}) received remote write shutdown signal!", id);
1202+
1203+
if (stream->_remote_reset.has_write_hook())
1204+
{
1205+
log::debug(log_cat, "Invoking remote_write_reset hook...");
1206+
stream->_in_reset = true;
1207+
stream->_remote_reset.write_reset(*stream.get(), app_code);
1208+
stream->_in_reset = false;
1209+
}
1210+
1211+
break;
1212+
1213+
default:
1214+
log::critical(
1215+
log_cat, "Stream (ID:{}) received unrecognized app code (ec:{}) for stream reset!", id, app_code);
1216+
}
1217+
}
1218+
11741219
void Connection::stream_closed(int64_t id, uint64_t app_code)
11751220
{
11761221
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);

src/stream.cpp

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,33 @@ namespace oxen::quic
127127
});
128128
}
129129

130+
void Stream::set_remote_reset_hooks(opt::remote_stream_reset sr)
131+
{
132+
// we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream
133+
// lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves
134+
endpoint.call([this, hooks = std::move(sr)]() {
135+
if (_in_reset)
136+
throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"};
137+
138+
log::debug(log_cat, "Stream (ID:{}) provided `remote_stream_reset` hooks!", _stream_id);
139+
_remote_reset = std::move(hooks);
140+
});
141+
}
142+
143+
void Stream::clear_remote_reset_hooks()
144+
{
145+
// we can use ::call(...) instead of ::call_soon(...) because stream read/write shutdown only happens once per stream
146+
// lifetime, and the application would be beyond incorrect to invoke this function in the callbacks themselves
147+
endpoint.call([this]() {
148+
if (_in_reset)
149+
throw std::runtime_error{"Cannot set `remote_stream_reset` while executing currently set hooks!!"};
150+
151+
log::debug(log_cat, "Stream (ID:{}) cleared `remote_stream_reset` hooks!", _stream_id);
152+
_remote_reset.clear();
153+
assert(not _remote_reset);
154+
});
155+
}
156+
130157
void Stream::stop_reading()
131158
{
132159
endpoint.call([this]() {
@@ -139,7 +166,7 @@ namespace oxen::quic
139166
_is_reading = false;
140167

141168
log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
142-
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0);
169+
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, STREAM_REMOTE_READ_SHUTDOWN);
143170
});
144171
}
145172

@@ -152,6 +179,18 @@ namespace oxen::quic
152179
return;
153180
}
154181

182+
if (user_buffers.empty())
183+
{
184+
log::warning(
185+
log_cat,
186+
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
187+
_stream_id);
188+
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
189+
return clear_watermarks();
190+
}
191+
192+
// if buffers are empty and we call shutdown_stream_write now, we do not need to flip this boolean; it is used to
193+
// signal for the same call in ::acknowledge()
155194
_is_writing = false;
156195
});
157196
}
@@ -248,6 +287,13 @@ namespace oxen::quic
248287
void Stream::append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive)
249288
{
250289
log::trace(log_cat, "{} called", __PRETTY_FUNCTION__);
290+
291+
if (not _is_writing)
292+
{
293+
log::warning(log_cat, "Stream (ID:{}) has halted writing; payload NOT appended to buffer!", _stream_id);
294+
return;
295+
}
296+
251297
user_buffers.emplace_back(buffer, std::move(keep_alive));
252298
assert(endpoint.in_event_loop());
253299
assert(_conn);
@@ -299,15 +345,18 @@ namespace oxen::quic
299345
if (bytes)
300346
user_buffers.front().first.remove_prefix(bytes);
301347

302-
auto sz = size();
303-
304-
if (not _is_writing and _unacked_size == 0)
348+
if (not _is_writing and user_buffers.empty())
305349
{
306-
log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id);
307-
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0);
350+
log::warning(
351+
log_cat,
352+
"All transmitted data dispatched and acked; halting all write operations on stream ID:{}",
353+
_stream_id);
354+
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, STREAM_REMOTE_WRITE_SHUTDOWN);
308355
return clear_watermarks();
309356
}
310357

358+
auto sz = size();
359+
311360
// Do not bother with this block of logic if no watermarks are set
312361
if (_is_watermarked)
313362
{

tests/012-stream_signalling.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,21 @@ namespace oxen::quic::test
180180

181181
auto client_stream = conn_interface->open_stream<Stream>([&](Stream&, bstring_view) { p.set_value(true); });
182182

183+
client_stream->set_remote_reset_hooks(opt::remote_stream_reset{
184+
[](Stream& s, uint64_t ec) {
185+
REQUIRE(ec == STREAM_REMOTE_READ_SHUTDOWN);
186+
187+
// Cannot set or clear callbacks while executing the callbacks!
188+
REQUIRE_THROWS(s.set_remote_reset_hooks(opt::remote_stream_reset{}));
189+
REQUIRE_THROWS(s.clear_remote_reset_hooks());
190+
191+
s.stop_writing();
192+
},
193+
[](Stream& s, uint64_t ec) {
194+
REQUIRE(ec == STREAM_REMOTE_WRITE_SHUTDOWN);
195+
s.stop_reading();
196+
}});
197+
183198
REQUIRE(client_stream->is_reading());
184199
REQUIRE(client_stream->is_writing());
185200

@@ -189,11 +204,13 @@ namespace oxen::quic::test
189204
REQUIRE_FALSE(server_stream->is_writing());
190205

191206
client_stream->send(bstring_view{req_msg});
192-
REQUIRE(f.get());
207+
require_future(f);
193208

194209
// allow the acks to get back to the client; extra time for slow CI archs
195210
std::this_thread::sleep_for(250ms);
196211

212+
REQUIRE_FALSE(client_stream->is_reading());
213+
197214
REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0);
198215
}
199216

0 commit comments

Comments
 (0)