Skip to content

Commit 59b4c2e

Browse files
authored
walredo: add a ping method (#8952)
Not used in production, but in benchmarks, to demonstrate minimal RTT. (It would be nice to not have to copy the 8KiB of zeroes, but, that would require larger protocol changes). Found this useful in investigation #8952.
1 parent 5432155 commit 59b4c2e

File tree

5 files changed

+176
-52
lines changed

5 files changed

+176
-52
lines changed

pageserver/benches/bench_walredo.rs

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Quantify a single walredo manager's throughput under N concurrent callers.
22
//!
33
//! The benchmark implementation ([`bench_impl`]) is parametrized by
4-
//! - `redo_work` => [`Request::short_request`] or [`Request::medium_request`]
4+
//! - `redo_work` => an async closure that takes a `PostgresRedoManager` and performs one redo
55
//! - `n_redos` => number of times the benchmark shell execute the `redo_work`
66
//! - `nclients` => number of clients (more on this shortly).
77
//!
@@ -10,7 +10,7 @@
1010
//! Each task executes the `redo_work` `n_redos/nclients` times.
1111
//!
1212
//! We exercise the following combinations:
13-
//! - `redo_work = short / medium``
13+
//! - `redo_work = ping / short / medium``
1414
//! - `nclients = [1, 2, 4, 8, 16, 32, 64, 128]`
1515
//!
1616
//! We let `criterion` determine the `n_redos` using `iter_custom`.
@@ -27,74 +27,103 @@
2727
//!
2828
//! # Reference Numbers
2929
//!
30-
//! 2024-04-15 on i3en.3xlarge
30+
//! 2024-09-18 on im4gn.2xlarge
3131
//!
3232
//! ```text
33-
//! short/1 time: [24.584 µs 24.737 µs 24.922 µs]
34-
//! short/2 time: [33.479 µs 33.660 µs 33.888 µs]
35-
//! short/4 time: [42.713 µs 43.046 µs 43.440 µs]
36-
//! short/8 time: [71.814 µs 72.478 µs 73.240 µs]
37-
//! short/16 time: [132.73 µs 134.45 µs 136.22 µs]
38-
//! short/32 time: [258.31 µs 260.73 µs 263.27 µs]
39-
//! short/64 time: [511.61 µs 514.44 µs 517.51 µs]
40-
//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
41-
//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
42-
//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
43-
//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
44-
//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
45-
//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
46-
//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
47-
//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
48-
//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
33+
//! ping/1 time: [21.789 µs 21.918 µs 22.078 µs]
34+
//! ping/2 time: [27.686 µs 27.812 µs 27.970 µs]
35+
//! ping/4 time: [35.468 µs 35.671 µs 35.926 µs]
36+
//! ping/8 time: [59.682 µs 59.987 µs 60.363 µs]
37+
//! ping/16 time: [101.79 µs 102.37 µs 103.08 µs]
38+
//! ping/32 time: [184.18 µs 185.15 µs 186.36 µs]
39+
//! ping/64 time: [349.86 µs 351.45 µs 353.47 µs]
40+
//! ping/128 time: [684.53 µs 687.98 µs 692.17 µs]
41+
//! short/1 time: [31.833 µs 32.126 µs 32.428 µs]
42+
//! short/2 time: [35.558 µs 35.756 µs 35.992 µs]
43+
//! short/4 time: [44.850 µs 45.138 µs 45.484 µs]
44+
//! short/8 time: [65.985 µs 66.379 µs 66.853 µs]
45+
//! short/16 time: [127.06 µs 127.90 µs 128.87 µs]
46+
//! short/32 time: [252.98 µs 254.70 µs 256.73 µs]
47+
//! short/64 time: [497.13 µs 499.86 µs 503.26 µs]
48+
//! short/128 time: [987.46 µs 993.45 µs 1.0004 ms]
49+
//! medium/1 time: [137.91 µs 138.55 µs 139.35 µs]
50+
//! medium/2 time: [192.00 µs 192.91 µs 194.07 µs]
51+
//! medium/4 time: [389.62 µs 391.55 µs 394.01 µs]
52+
//! medium/8 time: [776.80 µs 780.33 µs 784.77 µs]
53+
//! medium/16 time: [1.5323 ms 1.5383 ms 1.5459 ms]
54+
//! medium/32 time: [3.0120 ms 3.0226 ms 3.0350 ms]
55+
//! medium/64 time: [5.7405 ms 5.7787 ms 5.8166 ms]
56+
//! medium/128 time: [10.412 ms 10.574 ms 10.718 ms]
4957
//! ```
5058
5159
use anyhow::Context;
5260
use bytes::{Buf, Bytes};
5361
use criterion::{BenchmarkId, Criterion};
62+
use once_cell::sync::Lazy;
5463
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
5564
use pageserver_api::{key::Key, shard::TenantShardId};
5665
use std::{
66+
future::Future,
5767
sync::Arc,
5868
time::{Duration, Instant},
5969
};
6070
use tokio::{sync::Barrier, task::JoinSet};
6171
use utils::{id::TenantId, lsn::Lsn};
6272

6373
fn bench(c: &mut Criterion) {
64-
{
65-
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
66-
for nclients in nclients {
67-
let mut group = c.benchmark_group("short");
68-
group.bench_with_input(
69-
BenchmarkId::from_parameter(nclients),
70-
&nclients,
71-
|b, nclients| {
72-
let redo_work = Arc::new(Request::short_input());
73-
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
74-
},
75-
);
76-
}
77-
}
78-
{
79-
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
80-
for nclients in nclients {
81-
let mut group = c.benchmark_group("medium");
82-
group.bench_with_input(
83-
BenchmarkId::from_parameter(nclients),
84-
&nclients,
85-
|b, nclients| {
86-
let redo_work = Arc::new(Request::medium_input());
87-
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
88-
},
89-
);
90-
}
74+
macro_rules! bench_group {
75+
($name:expr, $redo_work:expr) => {{
76+
let name: &str = $name;
77+
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
78+
for nclients in nclients {
79+
let mut group = c.benchmark_group(name);
80+
group.bench_with_input(
81+
BenchmarkId::from_parameter(nclients),
82+
&nclients,
83+
|b, nclients| {
84+
b.iter_custom(|iters| bench_impl($redo_work, iters, *nclients));
85+
},
86+
);
87+
}
88+
}};
9189
}
90+
//
91+
// benchmark the protocol implementation
92+
//
93+
let pg_version = 14;
94+
bench_group!(
95+
"ping",
96+
Arc::new(move |mgr: Arc<PostgresRedoManager>| async move {
97+
let _: () = mgr.ping(pg_version).await.unwrap();
98+
})
99+
);
100+
//
101+
// benchmarks with actual record redo
102+
//
103+
let make_redo_work = |req: &'static Request| {
104+
Arc::new(move |mgr: Arc<PostgresRedoManager>| async move {
105+
let page = req.execute(&mgr).await.unwrap();
106+
assert_eq!(page.remaining(), 8192);
107+
})
108+
};
109+
bench_group!("short", {
110+
static REQUEST: Lazy<Request> = Lazy::new(Request::short_input);
111+
make_redo_work(&REQUEST)
112+
});
113+
bench_group!("medium", {
114+
static REQUEST: Lazy<Request> = Lazy::new(Request::medium_input);
115+
make_redo_work(&REQUEST)
116+
});
92117
}
93118
criterion::criterion_group!(benches, bench);
94119
criterion::criterion_main!(benches);
95120

96121
// Returns the sum of each client's wall-clock time spent executing their share of the n_redos.
97-
fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration {
122+
fn bench_impl<F, Fut>(redo_work: Arc<F>, n_redos: u64, nclients: u64) -> Duration
123+
where
124+
F: Fn(Arc<PostgresRedoManager>) -> Fut + Send + Sync + 'static,
125+
Fut: Future<Output = ()> + Send + 'static,
126+
{
98127
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
99128

100129
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
@@ -135,17 +164,20 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
135164
})
136165
}
137166

138-
async fn client(
167+
async fn client<F, Fut>(
139168
mgr: Arc<PostgresRedoManager>,
140169
start: Arc<Barrier>,
141-
redo_work: Arc<Request>,
170+
redo_work: Arc<F>,
142171
n_redos: u64,
143-
) -> Duration {
172+
) -> Duration
173+
where
174+
F: Fn(Arc<PostgresRedoManager>) -> Fut + Send + Sync + 'static,
175+
Fut: Future<Output = ()> + Send + 'static,
176+
{
144177
start.wait().await;
145178
let start = Instant::now();
146179
for _ in 0..n_redos {
147-
let page = redo_work.execute(&mgr).await.unwrap();
148-
assert_eq!(page.remaining(), 8192);
180+
redo_work(Arc::clone(&mgr)).await;
149181
// The real pageserver will rarely if ever do 2 walredos in a row without
150182
// yielding to the executor.
151183
tokio::task::yield_now().await;

pageserver/src/walredo.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,22 @@ impl PostgresRedoManager {
205205
}
206206
}
207207

208+
/// Do a ping request-response roundtrip.
209+
///
210+
/// Not used in production, but by Rust benchmarks.
211+
///
212+
/// # Cancel-Safety
213+
///
214+
/// This method is cancellation-safe.
215+
pub async fn ping(&self, pg_version: u32) -> Result<(), Error> {
216+
self.do_with_walredo_process(pg_version, |proc| async move {
217+
proc.ping(Duration::from_secs(1))
218+
.await
219+
.map_err(Error::Other)
220+
})
221+
.await
222+
}
223+
208224
pub fn status(&self) -> WalRedoManagerStatus {
209225
WalRedoManagerStatus {
210226
last_redo_at: {
@@ -297,6 +313,9 @@ impl PostgresRedoManager {
297313
}
298314
}
299315

316+
/// # Cancel-Safety
317+
///
318+
/// This method is cancel-safe iff `closure` is cancel-safe.
300319
async fn do_with_walredo_process<
301320
F: FnOnce(Arc<Process>) -> Fut,
302321
Fut: Future<Output = Result<O, Error>>,
@@ -537,6 +556,17 @@ mod tests {
537556
use tracing::Instrument;
538557
use utils::{id::TenantId, lsn::Lsn};
539558

559+
#[tokio::test]
560+
async fn test_ping() {
561+
let h = RedoHarness::new().unwrap();
562+
563+
h.manager
564+
.ping(14)
565+
.instrument(h.span())
566+
.await
567+
.expect("ping should work");
568+
}
569+
540570
#[tokio::test]
541571
async fn short_v14_redo() {
542572
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();

pageserver/src/walredo/process.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use self::no_leak_child::NoLeakChild;
66
use crate::{
77
config::PageServerConf,
88
metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
9+
page_cache::PAGE_SZ,
910
span::debug_assert_current_span_has_tenant_id,
1011
walrecord::NeonWalRecord,
1112
};
@@ -237,6 +238,26 @@ impl WalRedoProcess {
237238
res
238239
}
239240

241+
/// Do a ping request-response roundtrip.
242+
///
243+
/// Not used in production, but by Rust benchmarks.
244+
pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
245+
let mut writebuf: Vec<u8> = Vec::with_capacity(4);
246+
protocol::build_ping_msg(&mut writebuf);
247+
let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
248+
else {
249+
anyhow::bail!("WAL redo ping timed out");
250+
};
251+
let response = res?;
252+
if response.len() != PAGE_SZ {
253+
anyhow::bail!(
254+
"WAL redo ping response should respond with page-sized response: {}",
255+
response.len()
256+
);
257+
}
258+
Ok(())
259+
}
260+
240261
/// # Cancel-Safety
241262
///
242263
/// When not polled to completion (e.g. because in `tokio::select!` another

pageserver/src/walredo/process/protocol.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,8 @@ pub(crate) fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
5555
tag.ser_into(buf)
5656
.expect("serialize BufferTag should always succeed");
5757
}
58+
59+
pub(crate) fn build_ping_msg(buf: &mut Vec<u8>) {
60+
buf.put_u8(b'H');
61+
buf.put_u32(4);
62+
}

pgxn/neon_walredo/walredoproc.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* PushPage ('P'): Copy a page image (in the payload) to buffer cache
2525
* ApplyRecord ('A'): Apply a WAL record (in the payload)
2626
* GetPage ('G'): Return a page image from buffer cache.
27+
* Ping ('H'): Return the input message.
2728
*
2829
* Currently, you only get a response to GetPage requests; the response is
2930
* simply a 8k page, without any headers. Errors are logged to stderr.
@@ -133,6 +134,7 @@ static void ApplyRecord(StringInfo input_message);
133134
static void apply_error_callback(void *arg);
134135
static bool redo_block_filter(XLogReaderState *record, uint8 block_id);
135136
static void GetPage(StringInfo input_message);
137+
static void Ping(StringInfo input_message);
136138
static ssize_t buffered_read(void *buf, size_t count);
137139
static void CreateFakeSharedMemoryAndSemaphores();
138140

@@ -394,6 +396,10 @@ WalRedoMain(int argc, char *argv[])
394396
GetPage(&input_message);
395397
break;
396398

399+
case 'H': /* Ping */
400+
Ping(&input_message);
401+
break;
402+
397403
/*
398404
* EOF means we're done. Perform normal shutdown.
399405
*/
@@ -1057,6 +1063,36 @@ GetPage(StringInfo input_message)
10571063
}
10581064

10591065

1066+
static void
1067+
Ping(StringInfo input_message)
1068+
{
1069+
int tot_written;
1070+
/* Response: the input message */
1071+
tot_written = 0;
1072+
do {
1073+
ssize_t rc;
1074+
/* We don't need alignment, but it's bad practice to use char[BLCKSZ] */
1075+
#if PG_VERSION_NUM >= 160000
1076+
static const PGIOAlignedBlock response;
1077+
#else
1078+
static const PGAlignedBlock response;
1079+
#endif
1080+
rc = write(STDOUT_FILENO, &response.data[tot_written], BLCKSZ - tot_written);
1081+
if (rc < 0) {
1082+
/* If interrupted by signal, just retry */
1083+
if (errno == EINTR)
1084+
continue;
1085+
ereport(ERROR,
1086+
(errcode_for_file_access(),
1087+
errmsg("could not write to stdout: %m")));
1088+
}
1089+
tot_written += rc;
1090+
} while (tot_written < BLCKSZ);
1091+
1092+
elog(TRACE, "Page sent back for ping");
1093+
}
1094+
1095+
10601096
/* Buffer used by buffered_read() */
10611097
static char stdin_buf[16 * 1024];
10621098
static size_t stdin_len = 0; /* # of bytes in buffer */

0 commit comments

Comments
 (0)