Skip to content

Commit 2b080e0

Browse files
committed
stream stop read/write
1 parent d1af022 commit 2b080e0

File tree

6 files changed

+152
-20
lines changed

6 files changed

+152
-20
lines changed

include/oxen/quic/stream.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ namespace oxen::quic
9191

9292
bool is_paused() const;
9393

94+
bool is_reading() const;
95+
96+
bool is_writing() const;
97+
98+
void stop_reading();
99+
100+
void stop_writing();
101+
94102
// These public methods are synchronized so that they can be safely called from outside the
95103
// libquic main loop thread.
96104
bool available() const;
@@ -160,6 +168,9 @@ namespace oxen::quic
160168
opt::watermark _high_water;
161169
opt::watermark _low_water;
162170

171+
bool _is_reading{true};
172+
bool _is_writing{true};
173+
163174
void wrote(size_t bytes) override;
164175

165176
void append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive);

src/stream.cpp

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,50 @@ namespace oxen::quic
127127
});
128128
}
129129

130+
void Stream::stop_reading()
131+
{
132+
endpoint.call([this]() {
133+
if (not _is_reading)
134+
{
135+
log::warning(log_cat, "Stream has already halted read operations!");
136+
return;
137+
}
138+
139+
_is_reading = false;
140+
141+
log::warning(log_cat, "Halting all read operations on stream ID:{}!", _stream_id);
142+
ngtcp2_conn_shutdown_stream_read(*_conn, 0, _stream_id, 0);
143+
});
144+
}
145+
146+
void Stream::stop_writing()
147+
{
148+
endpoint.call([this]() {
149+
if (not _is_writing)
150+
{
151+
log::warning(log_cat, "Stream has already halted write operations!");
152+
return;
153+
}
154+
155+
_is_writing = false;
156+
});
157+
}
158+
130159
bool Stream::is_paused() const
131160
{
132161
return endpoint.call_get([this]() { return _paused; });
133162
}
134163

164+
bool Stream::is_reading() const
165+
{
166+
return endpoint.call_get([this]() { return _is_reading; });
167+
}
168+
169+
bool Stream::is_writing() const
170+
{
171+
return endpoint.call_get([this]() { return _is_writing; });
172+
}
173+
135174
bool Stream::available() const
136175
{
137176
return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); });
@@ -216,6 +255,28 @@ namespace oxen::quic
216255
_conn->packet_io_ready();
217256
else
218257
log::info(log_cat, "Stream not ready for broadcast yet, data appended to buffer and on deck");
258+
259+
if (_is_watermarked)
260+
{
261+
// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
262+
// watermark. If the high water hook exists and is primed, execute it
263+
if (auto unsent = size() - _unacked_size; unsent >= _high_mark)
264+
{
265+
_low_primed = true;
266+
log::info(log_cat, "Low water hook primed!");
267+
268+
if (_high_water and _high_primed)
269+
{
270+
log::info(log_cat, "Executing high watermark hook!");
271+
_high_primed = false;
272+
_high_water(*this);
273+
}
274+
}
275+
276+
// Low/high watermarks were executed and self-cleared, so clean up
277+
if (not _high_water and not _low_water)
278+
return clear_watermarks();
279+
}
219280
}
220281

221282
void Stream::acknowledge(size_t bytes)
@@ -240,28 +301,19 @@ namespace oxen::quic
240301

241302
auto sz = size();
242303

304+
if (not _is_writing and _unacked_size == 0)
305+
{
306+
log::warning(log_cat, "All transmitted data acked; halting all write operations on stream ID:{}", _stream_id);
307+
ngtcp2_conn_shutdown_stream_write(*_conn, 0, _stream_id, 0);
308+
return clear_watermarks();
309+
}
310+
243311
// Do not bother with this block of logic if no watermarks are set
244312
if (_is_watermarked)
245313
{
246-
auto unsent = sz - _unacked_size;
247-
248-
// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
249-
// watermark. If the high water hook exists and is primed, execute it
250-
if (unsent >= _high_mark)
251-
{
252-
_low_primed = true;
253-
log::info(log_cat, "Low water hook primed!");
254-
255-
if (_high_water and _high_primed)
256-
{
257-
log::info(log_cat, "Executing high watermark hook!");
258-
_high_primed = false;
259-
return _high_water(*this);
260-
}
261-
}
262314
// We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high
263315
// watermark. If the low water hook exists and is primed, execute it
264-
else if (unsent <= _low_mark)
316+
if (auto unsent = sz - _unacked_size; unsent <= _low_mark)
265317
{
266318
_high_primed = true;
267319
log::info(log_cat, "High water hook primed!");
@@ -270,7 +322,7 @@ namespace oxen::quic
270322
{
271323
log::info(log_cat, "Executing low watermark hook!");
272324
_low_primed = false;
273-
return _low_water(*this);
325+
_low_water(*this);
274326
}
275327
}
276328

tests/012-watermarks.cpp renamed to tests/012-stream_signalling.cpp

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace oxen::quic::test
99
{
10-
TEST_CASE("012 - Stream Buffer Watermarking", "[012][watermark][streams]")
10+
TEST_CASE("012 - Stream Signalling: Buffer Watermarking", "[012][signalling][watermark][streams]")
1111
{
1212
Network test_net{};
1313
bstring req_msg(100'000, std::byte{'a'});
@@ -145,4 +145,66 @@ namespace oxen::quic::test
145145
REQUIRE_FALSE(client_stream->has_watermarks());
146146
}
147147
}
148+
149+
TEST_CASE("012 - Stream Signalling: Stop Read/Write", "[012][signalling][readwrite][streams]")
150+
{
151+
Network test_net{};
152+
bstring req_msg(1'000, std::byte{'a'});
153+
154+
auto [client_tls, server_tls] = defaults::tls_creds_from_ed_keys();
155+
156+
Address server_local{};
157+
Address client_local{};
158+
159+
std::shared_ptr<Stream> server_stream;
160+
161+
auto client_established = callback_waiter{[](connection_interface&) {}};
162+
auto server_established = callback_waiter{[&](connection_interface& ci) {
163+
server_stream = ci.queue_incoming_stream();
164+
server_stream->send(bstring_view{req_msg});
165+
}};
166+
167+
auto server_endpoint = test_net.endpoint(server_local, server_established);
168+
REQUIRE_NOTHROW(server_endpoint->listen(server_tls));
169+
170+
RemoteAddress client_remote{defaults::SERVER_PUBKEY, "127.0.0.1"s, server_endpoint->local().port()};
171+
172+
auto client_endpoint = test_net.endpoint(client_local, client_established);
173+
auto conn_interface = client_endpoint->connect(client_remote, client_tls);
174+
175+
CHECK(client_established.wait());
176+
CHECK(server_established.wait());
177+
178+
auto p = std::promise<bool>();
179+
auto f = p.get_future();
180+
181+
auto client_stream = conn_interface->open_stream<Stream>([&](Stream&, bstring_view) { p.set_value(true); });
182+
183+
REQUIRE(client_stream->is_reading());
184+
REQUIRE(client_stream->is_writing());
185+
186+
SECTION("Stop Writing")
187+
{
188+
server_stream->stop_writing();
189+
REQUIRE_FALSE(server_stream->is_writing());
190+
191+
client_stream->send(bstring_view{req_msg});
192+
REQUIRE(f.get());
193+
194+
// allow the acks to get back to the client; extra time for slow CI archs
195+
std::this_thread::sleep_for(250ms);
196+
197+
REQUIRE(TestHelper::stream_unacked(*server_stream.get()) == 0);
198+
}
199+
200+
SECTION("Stop Reading")
201+
{
202+
client_stream->stop_reading();
203+
REQUIRE_FALSE(client_stream->is_reading());
204+
205+
client_stream->send(bstring_view{req_msg});
206+
207+
REQUIRE(f.wait_for(1s) == std::future_status::timeout);
208+
}
209+
}
148210
} // namespace oxen::quic::test

tests/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ if(LIBQUIC_BUILD_TESTS)
3838
009-alpns.cpp
3939
010-migration.cpp
4040
011-manual_transmission.cpp
41-
012-watermarks.cpp
41+
012-stream_signalling.cpp
4242

4343
main.cpp
4444
case_logger.cpp

tests/utils.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ namespace oxen::quic
112112
ep._next_rid += by;
113113
}
114114

115+
size_t TestHelper::stream_unacked(Stream& s)
116+
{
117+
return s._unacked_size;
118+
}
119+
115120
std::pair<std::shared_ptr<GNUTLSCreds>, std::shared_ptr<GNUTLSCreds>> test::defaults::tls_creds_from_ed_keys()
116121
{
117122
auto client = GNUTLSCreds::make_from_ed_keys(CLIENT_SEED, CLIENT_PUBKEY);

tests/utils.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ namespace oxen::quic
4949
static int disable_dgram_flip_flop(connection_interface& conn);
5050
static int get_dgram_debug_counter(connection_interface& conn);
5151

52+
static size_t stream_unacked(Stream& s);
53+
5254
// Bumps the connection's next reference id to make it easier to tell which connection is
5355
// which in log output.
5456
static void increment_ref_id(Endpoint& ep, uint64_t by = 1);

0 commit comments

Comments
 (0)