Skip to content

Commit 0bc7417

Browse files
authored
xds: report drops by circuit breaking (#4171)
1 parent e526a29 commit 0bc7417

File tree

6 files changed

+71
-12
lines changed

6 files changed

+71
-12
lines changed

xds/internal/balancer/edsbalancer/eds_impl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,11 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
515515
}
516516
if d.counter != nil {
517517
if err := d.counter.StartRequest(); err != nil {
518+
// Drops by circuit breaking are reported with empty category. They
519+
// will be reported only in total drops, but not in per category.
520+
if d.loadStore != nil {
521+
d.loadStore.CallDropped("")
522+
}
518523
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
519524
}
520525
pr, err := d.p.Pick(info)

xds/internal/balancer/edsbalancer/eds_impl_test.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
142142
}
143143

144144
// The same locality, different drop rate, dropping 50%.
145-
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
145+
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
146146
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
147147
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab5.Build()))
148148

@@ -746,6 +746,10 @@ func (s) TestDropPicker(t *testing.T) {
746746
}
747747

748748
func (s) TestEDS_LoadReport(t *testing.T) {
749+
origCircuitBreakingSupport := env.CircuitBreakingSupport
750+
env.CircuitBreakingSupport = true
751+
defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }()
752+
749753
// We create an xdsClientWrapper with a dummy xdsClientInterface which only
750754
// implements the LoadStore() method to return the underlying load.Store to
751755
// be used.
@@ -758,10 +762,20 @@ func (s) TestEDS_LoadReport(t *testing.T) {
758762
edsb := newEDSBalancerImpl(cc, nil, lsWrapper, nil)
759763
edsb.enqueueChildBalancerStateUpdate = edsb.updateState
760764

765+
const (
766+
testServiceName = "test-service"
767+
cbMaxRequests = 20
768+
)
769+
var maxRequestsTemp uint32 = cbMaxRequests
770+
client.SetMaxRequests(testServiceName, &maxRequestsTemp)
771+
defer client.ClearCounterForTesting(testServiceName)
772+
edsb.updateServiceRequestsCounter(testServiceName)
773+
761774
backendToBalancerID := make(map[balancer.SubConn]internal.LocalityID)
762775

776+
const testDropCategory = "test-drop"
763777
// Two localities, each with one backend.
764-
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
778+
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{testDropCategory: 50})
765779
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
766780
edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build()))
767781
sc1 := <-cc.NewSubConnCh
@@ -788,20 +802,42 @@ func (s) TestEDS_LoadReport(t *testing.T) {
788802
// the picks on sc1 should show up as inProgress.
789803
locality1JSON, _ := locality1.ToString()
790804
locality2JSON, _ := locality2.ToString()
805+
const (
806+
rpcCount = 100
807+
// 50% will be dropped with category testDropCategory.
808+
dropWithCategory = rpcCount / 2
809+
// In the remaining RPCs, only cbMaxRequests are allowed by circuit
810+
// breaking. Others will be dropped by CB.
811+
dropWithCB = rpcCount - dropWithCategory - cbMaxRequests
812+
813+
rpcInProgress = cbMaxRequests / 2 // 50% of RPCs will be never done.
814+
rpcSucceeded = cbMaxRequests / 2 // 50% of RPCs will succeed.
815+
)
791816
wantStoreData := []*load.Data{{
792817
Cluster: testClusterNames[0],
793818
Service: "",
794819
LocalityStats: map[string]load.LocalityData{
795-
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
796-
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
820+
locality1JSON: {RequestStats: load.RequestData{InProgress: rpcInProgress}},
821+
locality2JSON: {RequestStats: load.RequestData{Succeeded: rpcSucceeded}},
822+
},
823+
TotalDrops: dropWithCategory + dropWithCB,
824+
Drops: map[string]uint64{
825+
testDropCategory: dropWithCategory,
797826
},
798827
}}
799-
for i := 0; i < 10; i++ {
828+
829+
var rpcsToBeDone []balancer.PickResult
830+
// Run the picks, but only pick with sc1 will be done later.
831+
for i := 0; i < rpcCount; i++ {
800832
scst, _ := p1.Pick(balancer.PickInfo{})
801833
if scst.Done != nil && scst.SubConn != sc1 {
802-
scst.Done(balancer.DoneInfo{})
834+
rpcsToBeDone = append(rpcsToBeDone, scst)
803835
}
804836
}
837+
// Call done on those sc1 picks.
838+
for _, scst := range rpcsToBeDone {
839+
scst.Done(balancer.DoneInfo{})
840+
}
805841

806842
gotStoreData := loadStore.Stats(testClusterNames[0:1])
807843
if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(load.Data{}, "ReportInterval")); diff != "" {

xds/internal/client/client_requests_counter.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,16 @@ func (c *ServiceRequestsCounter) StartRequest() error {
8787
func (c *ServiceRequestsCounter) EndRequest() {
8888
atomic.AddUint32(&c.numRequests, ^uint32(0))
8989
}
90+
91+
// ClearCounterForTesting clears the counter for the service. Should be only
92+
// used in tests.
93+
func ClearCounterForTesting(serviceName string) {
94+
src.mu.Lock()
95+
defer src.mu.Unlock()
96+
c, ok := src.services[serviceName]
97+
if !ok {
98+
return
99+
}
100+
c.maxRequests = defaultMaxRequests
101+
c.numRequests = 0
102+
}

xds/internal/client/load/store.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,12 @@ func (ls *perClusterStore) stats() *Data {
283283
return true
284284
}
285285
sd.TotalDrops += d
286-
sd.Drops[key.(string)] = d
286+
keyStr := key.(string)
287+
if keyStr != "" {
288+
// Skip drops without category. They are counted in total_drops, but
289+
// not in per category. One example is drops by circuit breaking.
290+
sd.Drops[keyStr] = d
291+
}
287292
return true
288293
})
289294
ls.localityRPCCount.Range(func(key, val interface{}) bool {

xds/internal/client/load/store_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ func TestDrops(t *testing.T) {
4747
drops = map[string]int{
4848
dropCategories[0]: 30,
4949
dropCategories[1]: 40,
50+
"": 10,
5051
}
5152
wantStoreData = &Data{
52-
TotalDrops: 70,
53+
TotalDrops: 80,
5354
Drops: map[string]uint64{
5455
dropCategories[0]: 30,
5556
dropCategories[1]: 40,

xds/internal/testutils/protos.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package testutils
1919

2020
import (
21-
"fmt"
2221
"net"
2322
"strconv"
2423

@@ -59,11 +58,11 @@ type ClusterLoadAssignmentBuilder struct {
5958
}
6059

6160
// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
62-
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
61+
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents map[string]uint32) *ClusterLoadAssignmentBuilder {
6362
var drops []*v2xdspb.ClusterLoadAssignment_Policy_DropOverload
64-
for i, d := range dropPercents {
63+
for n, d := range dropPercents {
6564
drops = append(drops, &v2xdspb.ClusterLoadAssignment_Policy_DropOverload{
66-
Category: fmt.Sprintf("test-drop-%d", i),
65+
Category: n,
6766
DropPercentage: &v2typepb.FractionalPercent{
6867
Numerator: d,
6968
Denominator: v2typepb.FractionalPercent_HUNDRED,

0 commit comments

Comments
 (0)