Skip to content

Commit b4b973c

Browse files
authored
Merge pull request kubernetes#133321 from yliaog/assumecache
Fix dynamicresources_test flakiness
2 parents 4d6b49b + 2a026f6 commit b4b973c

File tree

2 files changed

+74
-11
lines changed

2 files changed

+74
-11
lines changed

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import (
5555
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
5656
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
5757
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
58+
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
5859
"k8s.io/kubernetes/pkg/util/slice"
5960
"k8s.io/utils/ptr"
6061
)
@@ -76,6 +77,10 @@ const (
7677
// BindingTimeoutDefaultSeconds is the default timeout for waiting for
7778
// BindingConditions to be ready.
7879
BindingTimeoutDefaultSeconds = 600
80+
81+
// AssumeExtendedResourceTimeoutDefaultSeconds is the default timeout for waiting
82+
// for the extended resource claim to be updated in assumed cache.
83+
AssumeExtendedResourceTimeoutDefaultSeconds = 120
7984
)
8085

8186
// The state is initialized in PreFilter phase. Because we save the pointer in
@@ -1446,8 +1451,28 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
14461451
if finalErr == nil {
14471452
// This can fail, but only for reasons that are okay (concurrent delete or update).
14481453
// Shouldn't happen in this case.
1454+
if isExtendedResourceClaim {
1455+
// Unlike other claims, extended resource claim is created in API server below.
1456+
// AssumeClaimAfterAPICall returns ErrNotFound when the informer update has not reached assumed cache yet.
1457+
// Hence we must poll and wait for it.
1458+
pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, time.Duration(AssumeExtendedResourceTimeoutDefaultSeconds)*time.Second, true,
1459+
func(ctx context.Context) (bool, error) {
1460+
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
1461+
if errors.Is(err, assumecache.ErrNotFound) {
1462+
return false, nil
1463+
}
1464+
logger.V(5).Info("Claim not stored in assume cache", "claim", klog.KObj(claim), "err", err)
1465+
return false, err
1466+
}
1467+
return true, nil
1468+
})
1469+
if pollErr != nil {
1470+
logger.V(5).Info("Claim not stored in assume cache after retries", "claim", klog.KObj(claim), "err", pollErr)
1471+
}
1472+
}
1473+
} else {
14491474
if err := pl.draManager.ResourceClaims().AssumeClaimAfterAPICall(claim); err != nil {
1450-
logger.V(5).Info("Claim not stored in assume cache", "err", finalErr)
1475+
logger.V(5).Info("Claim not stored in assume cache", "err", err)
14511476
}
14521477
}
14531478
for _, claimUID := range claimUIDs {
@@ -1576,6 +1601,9 @@ func (pl *DynamicResources) bindClaim(ctx context.Context, state *stateData, ind
15761601
// and no binding failure conditions are true,
15771602
// which includes the case that there are no binding conditions.
15781603
func (pl *DynamicResources) isClaimReadyForBinding(claim *resourceapi.ResourceClaim) (bool, error) {
1604+
if claim.Status.Allocation == nil {
1605+
return false, nil
1606+
}
15791607
for _, deviceRequest := range claim.Status.Allocation.Devices.Results {
15801608
if len(deviceRequest.BindingConditions) == 0 {
15811609
continue

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3838
apiruntime "k8s.io/apimachinery/pkg/runtime"
3939
"k8s.io/apimachinery/pkg/types"
40+
"k8s.io/apimachinery/pkg/util/wait"
4041
"k8s.io/client-go/informers"
4142
"k8s.io/client-go/kubernetes/fake"
4243
cgotesting "k8s.io/client-go/testing"
@@ -1317,7 +1318,7 @@ func TestPlugin(t *testing.T) {
13171318
},
13181319
postfilter: result{
13191320
status: fwk.NewStatus(fwk.Unschedulable, `deletion of ResourceClaim completed`),
1320-
removed: []metav1.Object{extendedResourceClaimNode2},
1321+
removed: []metav1.Object{extendedResourceClaim},
13211322
},
13221323
},
13231324
},
@@ -1326,6 +1327,7 @@ func TestPlugin(t *testing.T) {
13261327
pod: podWithExtendedResourceName,
13271328
claims: []*resourceapi.ResourceClaim{extendedResourceClaimNode2},
13281329
classes: []*resourceapi.DeviceClass{deviceClassWithExtendResourceName},
1330+
objs: []apiruntime.Object{workerNodeSlice, podWithExtendedResourceName},
13291331
want: want{
13301332
filter: perNodeResult{
13311333
workerNode.Name: {
@@ -1827,10 +1829,6 @@ func TestPlugin(t *testing.T) {
18271829
initialObjects = testCtx.listAll(t)
18281830
testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
18291831
t.Run("unreserverAfterBindFailure", func(t *testing.T) {
1830-
// in case we delete the claim API object
1831-
// wait for assumed cache to sync with informer
1832-
// then assumed cache should be empty
1833-
time.Sleep(800 * time.Millisecond)
18341832
testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status)
18351833
})
18361834
} else if status.IsSuccess() {
@@ -1905,9 +1903,42 @@ func (tc *testContext) verify(t *testing.T, expected result, initialObjects []me
19051903
if expected.assumedClaim != nil {
19061904
expectAssumedClaims = append(expectAssumedClaims, expected.assumedClaim)
19071905
}
1908-
actualAssumedClaims := tc.listAssumedClaims()
1909-
if diff := cmp.Diff(expectAssumedClaims, actualAssumedClaims, ignoreFieldsInResourceClaims...); diff != "" {
1910-
t.Errorf("Assumed claims are different (- expected, + actual):\n%s", diff)
1906+
// actualAssumedClaims are claims in assumed cache with different latest and api object
1907+
// sameAssumedClaims are claims in assumed cache with same latest and api object
1908+
actualAssumedClaims, sameAssumedClaims := tc.listAssumedClaims()
1909+
1910+
// error when expecting no claims in assumed cache with different latest and api object
1911+
if len(expectAssumedClaims) == 0 && len(actualAssumedClaims) != 0 {
1912+
// In case we delete the claim API object, wait for assumed cache to sync with informer,
1913+
// then assumed cache should be empty.
1914+
err := wait.PollUntilContextTimeout(tc.ctx, 200*time.Millisecond, time.Minute, true,
1915+
func(ctx context.Context) (bool, error) {
1916+
actualAssumedClaims, sameAssumedClaims = tc.listAssumedClaims()
1917+
return len(actualAssumedClaims) == 0, nil
1918+
})
1919+
if err != nil || len(actualAssumedClaims) != 0 {
1920+
t.Errorf("Assumed claims are different, err=%v, expected: nil, actual:\n%v", err, actualAssumedClaims)
1921+
}
1922+
}
1923+
if len(expectAssumedClaims) > 0 {
1924+
// it is not an error as long as the expected claim is present in the assumed cache, no
1925+
// matter its latest and api object are different or not.
1926+
for _, expected := range expectAssumedClaims {
1927+
seen := false
1928+
for _, actual := range actualAssumedClaims {
1929+
if cmp.Equal(expected, actual, ignoreFieldsInResourceClaims...) {
1930+
seen = true
1931+
}
1932+
}
1933+
for _, same := range sameAssumedClaims {
1934+
if cmp.Equal(expected, same, ignoreFieldsInResourceClaims...) {
1935+
seen = true
1936+
}
1937+
}
1938+
if !seen {
1939+
t.Errorf("Assumed claims are different, expected: %v not found", expected)
1940+
}
1941+
}
19111942
}
19121943

19131944
var expectInFlightClaims []metav1.Object
@@ -1932,18 +1963,22 @@ func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
19321963
return
19331964
}
19341965

1935-
func (tc *testContext) listAssumedClaims() []metav1.Object {
1966+
func (tc *testContext) listAssumedClaims() ([]metav1.Object, []metav1.Object) {
19361967
var assumedClaims []metav1.Object
1968+
var sameClaims []metav1.Object
19371969
for _, obj := range tc.draManager.resourceClaimTracker.cache.List(nil) {
19381970
claim := obj.(*resourceapi.ResourceClaim)
19391971
obj, _ := tc.draManager.resourceClaimTracker.cache.Get(claim.Namespace + "/" + claim.Name)
19401972
apiObj, _ := tc.draManager.resourceClaimTracker.cache.GetAPIObj(claim.Namespace + "/" + claim.Name)
19411973
if obj != apiObj {
19421974
assumedClaims = append(assumedClaims, claim)
1975+
} else {
1976+
sameClaims = append(sameClaims, claim)
19431977
}
19441978
}
19451979
sortObjects(assumedClaims)
1946-
return assumedClaims
1980+
sortObjects(sameClaims)
1981+
return assumedClaims, sameClaims
19471982
}
19481983

19491984
func (tc *testContext) listInFlightClaims() []metav1.Object {

0 commit comments

Comments
 (0)