Skip to content

Commit f0c6828

Browse files
authored
Merge pull request #126 from dr7ana/watermarks
Stream buffer watermarking
2 parents 826c6db + 5afdf75 commit f0c6828

File tree

6 files changed

+373
-2
lines changed

6 files changed

+373
-2
lines changed

include/oxen/quic/opt.hpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
namespace oxen::quic
1010
{
1111
class Endpoint;
12+
class Stream;
1213

1314
namespace opt
1415
{
@@ -171,5 +172,35 @@ namespace oxen::quic
171172

172173
explicit operator bool() const { return send_hook != nullptr; }
173174
};
175+
176+
// Used to provide callbacks for stream buffer watermarking. Application can pass an optional second parameter to
177+
// indicate that the logic should be executed once before the callback is cleared. The default behavior is for the
178+
// callback to persist and execute repeatedly
179+
struct watermark
180+
{
181+
using buffer_hook_t = std::function<void(Stream&)>;
182+
183+
private:
184+
buffer_hook_t _hook = nullptr;
185+
bool _persist = true;
186+
187+
public:
188+
watermark() = default;
189+
explicit watermark(buffer_hook_t hook, bool persist = true) : _hook{std::move(hook)}, _persist{persist} {}
190+
191+
bool persist() const { return _persist; }
192+
193+
void clear() { _hook = nullptr; }
194+
195+
explicit operator bool() const { return _hook != nullptr; }
196+
197+
void operator()(Stream& s)
198+
{
199+
_hook(s);
200+
201+
if (not _persist)
202+
_hook = nullptr;
203+
}
204+
};
174205
} // namespace opt
175206
} // namespace oxen::quic

include/oxen/quic/stream.hpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "connection_ids.hpp"
1414
#include "error.hpp"
1515
#include "iochannel.hpp"
16+
#include "opt.hpp"
1617
#include "types.hpp"
1718
#include "utils.hpp"
1819

@@ -56,6 +57,40 @@ namespace oxen::quic
5657

5758
const ConnectionID reference_id;
5859

60+
/** Buffer Watermarking:
61+
- Applications can call `::set_watermark(...)` to implement logic to be executed at states dictated by the number
62+
of bytes currently unsent.
63+
- Application must pass `low` and `high` watermark amounts; an execute-on-low callback can be passed with or
64+
without an execute-on-high callback (and vice versa)
65+
- The execute-on-low callback will not be executed until the buffer state rises above the `high` value; it
66+
will not be executed again until the buffer state rises once more above the `high` value
67+
- The execute-on-high callback will not be executed until the buffer state drops below the `low` value; it
68+
will not be executed again until the buffer state drops once more below the `low` value
69+
- Callbacks can be passed with an optional boolean in their opt:: wrapper, indicating "clear after execution";
70+
this will ensure the callback is only executed ONCE before being cleared. The default behavior is repeated
71+
callback execution
72+
- Invoking this function repeatedly will overwrite any currently set thresholds and callbacks
73+
*/
74+
void set_watermark(
75+
size_t low, size_t high, std::optional<opt::watermark> low_hook, std::optional<opt::watermark> high_hook);
76+
77+
// Clears any currently set watermarks on this stream object
78+
void clear_watermarks();
79+
80+
// Do not call this function from within a watermark callback!
81+
bool has_watermarks() const;
82+
83+
/** Stream Pause:
84+
- Applications can call `::pause()` to stop extending the max stream data offset. This has the effect of limiting
85+
the inflow by signalling to the sender that they should pause
86+
- This is reverted by invoking `::resume()`
87+
*/
88+
void pause();
89+
90+
void resume();
91+
92+
bool is_paused() const;
93+
5994
// These public methods are synchronized so that they can be safely called from outside the
6095
// libquic main loop thread.
6196
bool available() const;
@@ -109,8 +144,22 @@ namespace oxen::quic
109144
bool _is_shutdown{false};
110145
bool _sent_fin{false};
111146
bool _ready{false};
147+
bool _paused{false};
112148
int64_t _stream_id;
113149

150+
size_t _paused_offset{0};
151+
152+
bool _is_watermarked{false};
153+
154+
size_t _high_mark{0};
155+
size_t _low_mark{0};
156+
157+
bool _high_primed{false};
158+
bool _low_primed{true};
159+
160+
opt::watermark _high_water;
161+
opt::watermark _low_water;
162+
114163
void wrote(size_t bytes) override;
115164

116165
void append_buffer(bstring_view buffer, std::shared_ptr<void> keep_alive);

src/connection.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1161,6 +1161,9 @@ namespace oxen::quic
11611161
const bool was_closing = stream._is_closing;
11621162
stream._is_closing = stream._is_shutdown = true;
11631163

1164+
if (stream._is_watermarked)
1165+
stream.clear_watermarks();
1166+
11641167
if (!was_closing)
11651168
{
11661169
log::trace(log_cat, "Invoking stream close callback");
@@ -1302,7 +1305,10 @@ namespace oxen::quic
13021305
}
13031306
else
13041307
{
1305-
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size());
1308+
if (str->_paused)
1309+
str->_paused_offset += data.size();
1310+
else
1311+
ngtcp2_conn_extend_max_stream_offset(conn.get(), id, data.size());
13061312
ngtcp2_conn_extend_max_offset(conn.get(), data.size());
13071313
}
13081314

src/stream.cpp

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,96 @@ namespace oxen::quic
4242
log::trace(log_cat, "Destroying stream {}", _stream_id);
4343
}
4444

45+
void Stream::set_watermark(
46+
size_t low, size_t high, std::optional<opt::watermark> low_cb, std::optional<opt::watermark> high_cb)
47+
{
48+
if (not low_cb and not high_cb)
49+
throw std::invalid_argument{"Must pass at least one callback in call to ::set_watermark()!"};
50+
51+
endpoint.call_soon([this, low, high, low_hook = std::move(low_cb), high_hook = std::move(high_cb)]() {
52+
if (_is_closing || _is_shutdown || _sent_fin)
53+
{
54+
log::warning(log_cat, "Failed to set watermarks; stream is not active!");
55+
return;
56+
}
57+
58+
_low_mark = low;
59+
_high_mark = high;
60+
61+
if (low_hook.has_value())
62+
_low_water = std::move(*low_hook);
63+
else
64+
_low_water.clear();
65+
66+
if (high_hook.has_value())
67+
_high_water = std::move(*high_hook);
68+
else
69+
_high_water.clear();
70+
71+
_is_watermarked = true;
72+
73+
log::info(log_cat, "Stream set watermarks!");
74+
});
75+
}
76+
77+
void Stream::clear_watermarks()
78+
{
79+
endpoint.call_soon([this]() {
80+
if (not _is_watermarked and not _low_water and not _high_water)
81+
{
82+
log::warning(log_cat, "Failed to clear watermarks; stream has none set!");
83+
return;
84+
}
85+
86+
_low_mark = 0;
87+
_high_mark = 0;
88+
if (_low_water)
89+
_low_water.clear();
90+
if (_high_water)
91+
_high_water.clear();
92+
_is_watermarked = false;
93+
log::info(log_cat, "Stream cleared currently set watermarks!");
94+
});
95+
}
96+
97+
void Stream::pause()
98+
{
99+
endpoint.call([this]() {
100+
if (not _paused)
101+
{
102+
log::debug(log_cat, "Pausing stream ID:{}", _stream_id);
103+
assert(_paused_offset == 0);
104+
_paused = true;
105+
}
106+
else
107+
log::debug(log_cat, "Stream ID:{} already paused!", _stream_id);
108+
});
109+
}
110+
111+
void Stream::resume()
112+
{
113+
endpoint.call([this]() {
114+
if (_paused)
115+
{
116+
log::debug(log_cat, "Resuming stream ID:{}", _stream_id);
117+
if (_paused_offset)
118+
{
119+
ngtcp2_conn_extend_max_stream_offset(*_conn, _stream_id, _paused_offset);
120+
_paused_offset = 0;
121+
}
122+
123+
_paused = false;
124+
}
125+
else
126+
log::debug(log_cat, "Stream ID:{} is not paused!", _stream_id);
127+
});
128+
}
129+
130+
bool Stream::is_paused() const
131+
{
132+
return endpoint.call_get([this]() { return _paused; });
133+
}
134+
45135
bool Stream::available() const
46136
{
47137
return endpoint.call_get([this] { return !(_is_closing || _is_shutdown || _sent_fin); });
@@ -52,6 +142,11 @@ namespace oxen::quic
52142
return endpoint.call_get([this] { return _ready; });
53143
}
54144

145+
bool Stream::has_watermarks() const
146+
{
147+
return endpoint.call_get([this]() { return _is_watermarked and _low_water and _high_water; });
148+
}
149+
55150
std::shared_ptr<Stream> Stream::get_stream()
56151
{
57152
return shared_from_this();
@@ -143,7 +238,48 @@ namespace oxen::quic
143238
if (bytes)
144239
user_buffers.front().first.remove_prefix(bytes);
145240

146-
log::trace(log_cat, "{} bytes acked, {} unacked remaining", bytes, size());
241+
auto sz = size();
242+
243+
// Do not bother with this block of logic if no watermarks are set
244+
if (_is_watermarked)
245+
{
246+
auto unsent = sz - _unacked_size;
247+
248+
// We are above the high watermark. We prime the low water hook to be fired the next time we drop below the low
249+
// watermark. If the high water hook exists and is primed, execute it
250+
if (unsent >= _high_mark)
251+
{
252+
_low_primed = true;
253+
log::info(log_cat, "Low water hook primed!");
254+
255+
if (_high_water and _high_primed)
256+
{
257+
log::info(log_cat, "Executing high watermark hook!");
258+
_high_primed = false;
259+
return _high_water(*this);
260+
}
261+
}
262+
// We are below the low watermark. We prime the high water hook to be fired the next time we rise above the high
263+
// watermark. If the low water hook exists and is primed, execute it
264+
else if (unsent <= _low_mark)
265+
{
266+
_high_primed = true;
267+
log::info(log_cat, "High water hook primed!");
268+
269+
if (_low_water and _low_primed)
270+
{
271+
log::info(log_cat, "Executing low watermark hook!");
272+
_low_primed = false;
273+
return _low_water(*this);
274+
}
275+
}
276+
277+
// Low/high watermarks were executed and self-cleared, so clean up
278+
if (not _high_water and not _low_water)
279+
return clear_watermarks();
280+
}
281+
282+
log::trace(log_cat, "{} bytes acked, {} unacked remaining", bytes, sz);
147283
}
148284

149285
void Stream::wrote(size_t bytes)

0 commit comments

Comments
 (0)