Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6426c38

Browse files
authoredApr 9, 2025··
fix(http/prom): record bodies when eos reached (#3856)
* chore(app/outbound): `linkerd-mock-http-body` test dependency this adds a development dependency, so we can use this mock body type in the outbound proxy's unit tests. Signed-off-by: katelyn martin <[email protected]> * chore(app/outbound): additional http route metrics tests Signed-off-by: katelyn martin <[email protected]> * chore(app/outbound): additional grpc route metrics tests Signed-off-by: katelyn martin <[email protected]> * fix(http/prom): record bodies when eos reached this commit fixes a bug discovered by @alpeb, which was introduced in proxy v2.288.0. > The associated metric is `outbound_http_route_request_statuses_total`: > > ``` > $ linkerd dg proxy-metrics -n booksapp deploy/webapp|rg outbound_http_route_request_statuses_total.*authors > outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="204",error=""} 5 > outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="201",error="UNKNOWN"} 5 > outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="200",error="UNKNOWN"} 10 > ``` > > The problem was introduced in `edge-25.3.4`, with the proxy `v2.288.0`. > Before that the metrics looked like: > > ``` > $ linkerd dg proxy-metrics -n booksapp deploy/webapp|rg outbound_http_route_request_statuses_total.*authors > outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="200",error=""} 193 > outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="204",error=""} 96 > outbound_http_route_request_statuses_total{parent_group="core",parent_kind="Service",parent_namespace="booksapp",parent_name="authors",parent_port="7001",parent_section_name="",route_group="",route_kind="default",route_namespace="",route_name="http",hostname="",http_status="201",error=""} 96 > ``` > > So the difference is the non-empty value for `error=UNKNOWN` even > when `https_status` is 2xx, which `linkerd viz stat-outbound` > interprets as failed requests. in #3086 we introduced a suite of route- and backend-level metrics. that subsystem contains a body middleware that will report itself as having reached the end-of-stream by delegating directly down to its inner body's `is_end_stream()` hint. this is roughly correct, but is slightly distinct from the actual invariant: a `linkerd_http_prom::record_response::ResponseBody<B>` must call its `end_stream` helper to classify the outcome and increment the corresponding time series in the `outbound_http_route_request_statuses_total` metric family. in #3504 we upgraded our hyper dependency. while doing so, we neglected to include a call to `end_stream` if a data frame is yielded and the inner body reports itself as having reached the end-of-stream. this meant that instrumented bodies would be polled until the end is reached, but were being dropped before a `None` was encountered. this commit fixes this issue in two ways, to be defensive: * invoke `end_stream()` if a non-trailers frame is yielded, and the inner body now reports itself as having ended. this restores the behavior in place prior to #3504. see the relevant component of that diff, here: <https://github.com/linkerd/linkerd2-proxy/pull/3504/files#diff-45d0bc344f76c111551a8eaf5d3f0e0c22ee6e6836a626e46402a6ae3cbc0035L262-R274> * rather than delegating to the inner `<B as Body>::is_end_stream()` method, report the end-of-stream being reached by inspecting whether or not the inner response state has been taken. this is the state that directly indicates whether or not the `ResponseBody<B>` middleware is finished. X-ref: #3504 X-ref: #3086 X-ref: linkerd/linkerd2#8733 Signed-off-by: katelyn martin <[email protected]> --------- Signed-off-by: katelyn martin <[email protected]>
1 parent 985580f commit 6426c38

File tree

4 files changed

+372
-1
lines changed

4 files changed

+372
-1
lines changed
 

‎Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,6 +1675,7 @@ dependencies = [
16751675
"linkerd-io",
16761676
"linkerd-meshtls",
16771677
"linkerd-meshtls-rustls",
1678+
"linkerd-mock-http-body",
16781679
"linkerd-opaq-route",
16791680
"linkerd-proxy-client-policy",
16801681
"linkerd-retry",

‎linkerd/app/outbound/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
7171
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
7272
"test-util",
7373
] }
74+
linkerd-mock-http-body = { path = "../../mock/http-body" }
7475
linkerd-stack = { path = "../../stack", features = ["test-util"] }
7576
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
7677

‎linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs

Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ use super::{
44
test_util::*,
55
LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics,
66
};
7+
use bytes::{Buf, Bytes};
8+
use http_body::Body;
9+
use http_body_util::BodyExt;
710
use linkerd_app_core::{
811
dns,
912
svc::{
@@ -14,6 +17,10 @@ use linkerd_app_core::{
1417
};
1518
use linkerd_http_prom::body_data::request::RequestBodyFamilies;
1619
use linkerd_proxy_client_policy as policy;
20+
use std::task::Poll;
21+
22+
static GRPC_STATUS: http::HeaderName = http::HeaderName::from_static("grpc-status");
23+
static GRPC_STATUS_OK: http::HeaderValue = http::HeaderValue::from_static("0");
1724

1825
#[tokio::test(flavor = "current_thread", start_paused = true)]
1926
async fn http_request_statuses() {
@@ -520,6 +527,160 @@ async fn http_route_request_body_frames() {
520527
tracing::info!("passed");
521528
}
522529

530+
#[tokio::test(flavor = "current_thread", start_paused = true)]
531+
async fn http_response_body_drop_on_eos() {
532+
use linkerd_app_core::svc::{Service, ServiceExt};
533+
534+
const EXPORT_HOSTNAME_LABELS: bool = false;
535+
let _trace = linkerd_tracing::test::trace_init();
536+
537+
let super::HttpRouteMetrics {
538+
requests,
539+
body_data,
540+
..
541+
} = super::HttpRouteMetrics::default();
542+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
543+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
544+
let (mut svc, mut handle) = mock_http_route_metrics(
545+
&requests,
546+
&body_data,
547+
&parent_ref,
548+
&route_ref,
549+
EXPORT_HOSTNAME_LABELS,
550+
);
551+
552+
// Define a request and a response.
553+
let req = http::Request::default();
554+
let rsp = http::Response::builder()
555+
.status(200)
556+
.body(BoxBody::from_static("contents"))
557+
.unwrap();
558+
559+
// Two counters for 200 responses that do/don't have an error.
560+
let ok = requests.get_statuses(&labels::Rsp(
561+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
562+
labels::HttpRsp {
563+
status: Some(http::StatusCode::OK),
564+
error: None,
565+
},
566+
));
567+
let err = requests.get_statuses(&labels::Rsp(
568+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
569+
labels::HttpRsp {
570+
status: Some(http::StatusCode::OK),
571+
error: Some(labels::Error::Unknown),
572+
},
573+
));
574+
debug_assert_eq!(ok.get(), 0);
575+
debug_assert_eq!(err.get(), 0);
576+
577+
// Send the request, and obtain the response.
578+
let mut body = {
579+
handle.allow(1);
580+
svc.ready().await.expect("ready");
581+
let mut call = svc.call(req);
582+
let (_req, tx) = tokio::select! {
583+
_ = (&mut call) => unreachable!(),
584+
res = handle.next_request() => res.unwrap(),
585+
};
586+
assert_eq!(ok.get(), 0);
587+
tx.send_response(rsp);
588+
call.await.unwrap().into_body()
589+
};
590+
591+
// The counters are not incremented yet.
592+
assert_eq!(ok.get(), 0);
593+
assert_eq!(err.get(), 0);
594+
595+
// Poll a frame out of the body.
596+
let data = body
597+
.frame()
598+
.await
599+
.expect("yields a result")
600+
.expect("yields a frame")
601+
.into_data()
602+
.ok()
603+
.expect("yields data");
604+
assert_eq!(data.chunk(), "contents".as_bytes());
605+
assert_eq!(data.remaining(), "contents".len());
606+
607+
// Show that the body reports itself as being complete.
608+
debug_assert!(body.is_end_stream());
609+
assert_eq!(ok.get(), 1);
610+
assert_eq!(err.get(), 0);
611+
}
612+
613+
#[tokio::test(flavor = "current_thread", start_paused = true)]
614+
async fn http_response_body_drop_early() {
615+
use linkerd_app_core::svc::{Service, ServiceExt};
616+
617+
const EXPORT_HOSTNAME_LABELS: bool = false;
618+
let _trace = linkerd_tracing::test::trace_init();
619+
620+
let super::HttpRouteMetrics {
621+
requests,
622+
body_data,
623+
..
624+
} = super::HttpRouteMetrics::default();
625+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
626+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
627+
let (mut svc, mut handle) = mock_http_route_metrics(
628+
&requests,
629+
&body_data,
630+
&parent_ref,
631+
&route_ref,
632+
EXPORT_HOSTNAME_LABELS,
633+
);
634+
635+
// Define a request and a response.
636+
let req = http::Request::default();
637+
let rsp = http::Response::builder()
638+
.status(200)
639+
.body(BoxBody::from_static("contents"))
640+
.unwrap();
641+
642+
// Two counters for 200 responses that do/don't have an error.
643+
let ok = requests.get_statuses(&labels::Rsp(
644+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
645+
labels::HttpRsp {
646+
status: Some(http::StatusCode::OK),
647+
error: None,
648+
},
649+
));
650+
let err = requests.get_statuses(&labels::Rsp(
651+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
652+
labels::HttpRsp {
653+
status: Some(http::StatusCode::OK),
654+
error: Some(labels::Error::Unknown),
655+
},
656+
));
657+
debug_assert_eq!(ok.get(), 0);
658+
debug_assert_eq!(err.get(), 0);
659+
660+
// Send the request, and obtain the response.
661+
let body = {
662+
handle.allow(1);
663+
svc.ready().await.expect("ready");
664+
let mut call = svc.call(req);
665+
let (_req, tx) = tokio::select! {
666+
_ = (&mut call) => unreachable!(),
667+
res = handle.next_request() => res.unwrap(),
668+
};
669+
assert_eq!(ok.get(), 0);
670+
tx.send_response(rsp);
671+
call.await.unwrap().into_body()
672+
};
673+
674+
// The counters are not incremented yet.
675+
assert_eq!(ok.get(), 0);
676+
assert_eq!(err.get(), 0);
677+
678+
// The body reports an error if it was not completed.
679+
drop(body);
680+
assert_eq!(ok.get(), 0);
681+
assert_eq!(err.get(), 1);
682+
}
683+
523684
#[tokio::test(flavor = "current_thread", start_paused = true)]
524685
async fn grpc_request_statuses_ok() {
525686
const EXPORT_HOSTNAME_LABELS: bool = true;
@@ -723,6 +884,210 @@ async fn grpc_request_statuses_error_body() {
723884
.await;
724885
}
725886

887+
#[tokio::test(flavor = "current_thread", start_paused = true)]
888+
async fn grpc_response_body_drop_on_eos() {
889+
use linkerd_app_core::svc::{Service, ServiceExt};
890+
891+
const EXPORT_HOSTNAME_LABELS: bool = false;
892+
let _trace = linkerd_tracing::test::trace_init();
893+
894+
let super::GrpcRouteMetrics {
895+
requests,
896+
body_data,
897+
..
898+
} = super::GrpcRouteMetrics::default();
899+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
900+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
901+
let (mut svc, mut handle) = mock_grpc_route_metrics(
902+
&requests,
903+
&body_data,
904+
&parent_ref,
905+
&route_ref,
906+
EXPORT_HOSTNAME_LABELS,
907+
);
908+
909+
// Define a request and a response.
910+
let req = http::Request::default();
911+
let rsp = http::Response::builder()
912+
.status(200)
913+
.body({
914+
let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents"))));
915+
let trailers = {
916+
let mut trailers = http::HeaderMap::with_capacity(1);
917+
trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone());
918+
Poll::Ready(Some(Ok(trailers)))
919+
};
920+
let body = linkerd_mock_http_body::MockBody::default()
921+
.then_yield_data(data)
922+
.then_yield_trailer(trailers);
923+
BoxBody::new(body)
924+
})
925+
.unwrap();
926+
927+
// Two counters for 200 responses that do/don't have an error.
928+
let ok = requests.get_statuses(&labels::Rsp(
929+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
930+
labels::GrpcRsp {
931+
status: Some(tonic::Code::Ok),
932+
error: None,
933+
},
934+
));
935+
let err = requests.get_statuses(&labels::Rsp(
936+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
937+
labels::GrpcRsp {
938+
status: Some(tonic::Code::Ok),
939+
error: Some(labels::Error::Unknown),
940+
},
941+
));
942+
debug_assert_eq!(ok.get(), 0);
943+
debug_assert_eq!(err.get(), 0);
944+
945+
// Send the request, and obtain the response.
946+
let mut body = {
947+
handle.allow(1);
948+
svc.ready().await.expect("ready");
949+
let mut call = svc.call(req);
950+
let (_req, tx) = tokio::select! {
951+
_ = (&mut call) => unreachable!(),
952+
res = handle.next_request() => res.unwrap(),
953+
};
954+
assert_eq!(ok.get(), 0);
955+
tx.send_response(rsp);
956+
call.await.unwrap().into_body()
957+
};
958+
959+
// The counters are not incremented yet.
960+
assert_eq!(ok.get(), 0);
961+
assert_eq!(err.get(), 0);
962+
963+
// Poll a frame out of the body.
964+
let data = body
965+
.frame()
966+
.await
967+
.expect("yields a result")
968+
.expect("yields a frame")
969+
.into_data()
970+
.ok()
971+
.expect("yields data");
972+
assert_eq!(data.chunk(), "contents".as_bytes());
973+
assert_eq!(data.remaining(), "contents".len());
974+
975+
// Poll the trailers out of the body.
976+
let trls = body
977+
.frame()
978+
.await
979+
.expect("yields a result")
980+
.expect("yields a frame")
981+
.into_trailers()
982+
.ok()
983+
.expect("yields trailers");
984+
assert_eq!(trls.get(&GRPC_STATUS).unwrap(), GRPC_STATUS_OK);
985+
986+
// Show that the body reports itself as being complete.
987+
debug_assert!(body.is_end_stream());
988+
assert_eq!(ok.get(), 1);
989+
assert_eq!(err.get(), 0);
990+
}
991+
992+
#[tokio::test(flavor = "current_thread", start_paused = true)]
993+
async fn grpc_response_body_drop_early() {
994+
use linkerd_app_core::svc::{Service, ServiceExt};
995+
996+
const EXPORT_HOSTNAME_LABELS: bool = false;
997+
let _trace = linkerd_tracing::test::trace_init();
998+
999+
let super::GrpcRouteMetrics {
1000+
requests,
1001+
body_data,
1002+
..
1003+
} = super::GrpcRouteMetrics::default();
1004+
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
1005+
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
1006+
let (mut svc, mut handle) = mock_grpc_route_metrics(
1007+
&requests,
1008+
&body_data,
1009+
&parent_ref,
1010+
&route_ref,
1011+
EXPORT_HOSTNAME_LABELS,
1012+
);
1013+
1014+
// Define a request and a response.
1015+
let req = http::Request::default();
1016+
let rsp = http::Response::builder()
1017+
.status(200)
1018+
.body({
1019+
let data = Poll::Ready(Some(Ok(Bytes::from_static(b"contents"))));
1020+
let trailers = {
1021+
let mut trailers = http::HeaderMap::with_capacity(1);
1022+
trailers.insert(GRPC_STATUS.clone(), GRPC_STATUS_OK.clone());
1023+
Poll::Ready(Some(Ok(trailers)))
1024+
};
1025+
let body = linkerd_mock_http_body::MockBody::default()
1026+
.then_yield_data(data)
1027+
.then_yield_trailer(trailers);
1028+
BoxBody::new(body)
1029+
})
1030+
.unwrap();
1031+
1032+
// Two counters for 200 responses that do/don't have an error.
1033+
let ok = requests.get_statuses(&labels::Rsp(
1034+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
1035+
labels::GrpcRsp {
1036+
status: Some(tonic::Code::Ok),
1037+
error: None,
1038+
},
1039+
));
1040+
let err = requests.get_statuses(&labels::Rsp(
1041+
labels::Route::new(parent_ref.clone(), route_ref.clone(), None),
1042+
labels::GrpcRsp {
1043+
status: None,
1044+
error: Some(labels::Error::Unknown),
1045+
},
1046+
));
1047+
debug_assert_eq!(ok.get(), 0);
1048+
debug_assert_eq!(err.get(), 0);
1049+
1050+
// Send the request, and obtain the response.
1051+
let mut body = {
1052+
handle.allow(1);
1053+
svc.ready().await.expect("ready");
1054+
let mut call = svc.call(req);
1055+
let (_req, tx) = tokio::select! {
1056+
_ = (&mut call) => unreachable!(),
1057+
res = handle.next_request() => res.unwrap(),
1058+
};
1059+
assert_eq!(ok.get(), 0);
1060+
tx.send_response(rsp);
1061+
call.await.unwrap().into_body()
1062+
};
1063+
1064+
// The counters are not incremented yet.
1065+
assert_eq!(ok.get(), 0);
1066+
assert_eq!(err.get(), 0);
1067+
1068+
// Poll a frame out of the body.
1069+
let data = body
1070+
.frame()
1071+
.await
1072+
.expect("yields a result")
1073+
.expect("yields a frame")
1074+
.into_data()
1075+
.ok()
1076+
.expect("yields data");
1077+
assert_eq!(data.chunk(), "contents".as_bytes());
1078+
assert_eq!(data.remaining(), "contents".len());
1079+
1080+
// The counters are not incremented yet.
1081+
debug_assert!(!body.is_end_stream());
1082+
assert_eq!(ok.get(), 0);
1083+
assert_eq!(err.get(), 0);
1084+
1085+
// Then, drop the body without polling the trailers.
1086+
drop(body);
1087+
assert_eq!(ok.get(), 0);
1088+
assert_eq!(err.get(), 1);
1089+
}
1090+
7261091
// === Utils ===
7271092

7281093
const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method";

‎linkerd/http/prom/src/record_response.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ where
268268
Some(Ok(frame)) => {
269269
if let trls @ Some(_) = frame.trailers_ref() {
270270
end_stream(this.state, Ok(trls));
271+
} else if this.inner.is_end_stream() {
272+
end_stream(this.state, Ok(None));
271273
}
272274
}
273275
Some(Err(error)) => end_stream(this.state, Err(error)),
@@ -278,7 +280,9 @@ where
278280
}
279281

280282
fn is_end_stream(&self) -> bool {
281-
self.inner.is_end_stream()
283+
// If the inner response state is still in place, the end of the stream has not been
284+
// classified and recorded yet.
285+
self.state.is_none()
282286
}
283287
}
284288

0 commit comments

Comments
 (0)
Please sign in to comment.