@@ -16,12 +16,16 @@ package e2e
1616import (
1717 "context"
1818 "fmt"
19+ "sort"
1920 "strings"
21+ "sync"
22+ "time"
2023
2124 . "github.com/onsi/ginkgo/v2"
2225 v1 "k8s.io/api/core/v1"
2326 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2427 "k8s.io/apimachinery/pkg/util/intstr"
28+ "k8s.io/apimachinery/pkg/util/wait"
2529 clientset "k8s.io/client-go/kubernetes"
2630 "k8s.io/kubernetes/test/e2e/framework"
2731 e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
4347 clusterNodesSelector string
4448 clusterNodesCount int = 0
4549
50+ // clusterNodeSingleWorker is a single worker node used to test cases.
51+ clusterNodeSingleWorker string
52+ clusterNodesMutex sync.Mutex
53+
4654 // lookupNodeSelectors are valid compute/node/worker selectors commonly used in different kubernetes
4755 // distributions.
4856 lookupNodeSelectors = []string {
@@ -71,11 +79,14 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
7179 })
7280
7381 type loadBalancerTestCases struct {
74- Name string
75- ResourceSuffix string
76- Annotations map [string ]string
77- PostConfigService func (cfg * configServiceLB , svc * v1.Service )
78- PostRunValidation func (cfg * configServiceLB , svc * v1.Service )
82+ Name string
83+ ResourceSuffix string
84+ Annotations map [string ]string
85+ PostConfigService func (cfg * configServiceLB , svc * v1.Service )
86+ PostRunValidation func (cfg * configServiceLB , svc * v1.Service )
87+ RemoteTestReachableHTTP bool
88+ RequireAffinity bool
89+ ExpectTestFail bool
7990 }
8091 cases := []loadBalancerTestCases {
8192 {
@@ -93,29 +104,10 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
93104 {
94105 Name : "NLB should configure the loadbalancer with target-node-labels" ,
95106 ResourceSuffix : "sg-nd" ,
96- Annotations : map [string ]string {
97- annotationLBType : "nlb" ,
98- },
107+ Annotations : map [string ]string {annotationLBType : "nlb" },
99108 PostConfigService : func (cfg * configServiceLB , svc * v1.Service ) {
100109 // discover clusterNodeSelector and patch service
101- // TODO: move to external function if there are more scenarios to discover nodes.
102- By ("discovering node label used in the kubernetes distributions" )
103- for _ , selector := range lookupNodeSelectors {
104- nodeList , err := cs .CoreV1 ().Nodes ().List (context .TODO (), metav1.ListOptions {
105- LabelSelector : selector ,
106- })
107- framework .ExpectNoError (err , "failed to list worker nodes" )
108- if len (nodeList .Items ) > 0 {
109- clusterNodesCount = len (nodeList .Items )
110- clusterNodesSelector = selector
111- break
112- }
113- }
114-
115- if clusterNodesCount == 0 {
116- framework .ExpectNoError (fmt .Errorf ("unable to find node selector for %v" , lookupNodeSelectors ))
117- }
118-
110+ discoverClusterWorkerNode (cs )
119111 By (fmt .Sprintf ("found %d nodes with selector %q\n " , clusterNodesCount , clusterNodesSelector ))
120112 if svc .Annotations == nil {
121113 svc .Annotations = map [string ]string {}
@@ -132,6 +124,75 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
132124 framework .ExpectNoError (getLBTargetCount (context .TODO (), lbDNS , clusterNodesCount ), "AWS LB target count validation failed" )
133125 },
134126 },
127+ {
128+ Name : "internet-facing should support hairpin connection" ,
129+ ResourceSuffix : "hairpin-clb" ,
130+ Annotations : map [string ]string {},
131+ PostConfigService : func (cfg * configServiceLB , svc * v1.Service ) {
132+ discoverClusterWorkerNode (cs )
133+ if svc .Annotations == nil {
134+ svc .Annotations = map [string ]string {}
135+ }
136+ svc .Annotations [annotationLBTargetNodeLabels ] = fmt .Sprintf ("kubernetes.io/hostname=%s" , clusterNodeSingleWorker )
137+ framework .Logf ("Using service annotations: %v" , svc .Annotations )
138+ },
139+ RemoteTestReachableHTTP : true ,
140+ },
141+ {
142+ Name : "NLB internet-facing should support hairpin connection" ,
143+ ResourceSuffix : "hairpin-nlb" ,
144+ Annotations : map [string ]string {annotationLBType : "nlb" },
145+ PostConfigService : func (cfg * configServiceLB , svc * v1.Service ) {
146+ discoverClusterWorkerNode (cs )
147+ if svc .Annotations == nil {
148+ svc .Annotations = map [string ]string {}
149+ }
150+ svc .Annotations [annotationLBTargetNodeLabels ] = fmt .Sprintf ("kubernetes.io/hostname=%s" , clusterNodeSingleWorker )
151+ framework .Logf ("Using service annotations: %v" , svc .Annotations )
152+ },
153+ RemoteTestReachableHTTP : true ,
154+ RequireAffinity : true ,
155+ },
156+ {
157+ Name : "internal should support hairpin connection" ,
158+ ResourceSuffix : "hp-clb-int" ,
159+ Annotations : map [string ]string {
160+ "service.beta.kubernetes.io/aws-load-balancer-internal" : "true" ,
161+ },
162+ PostConfigService : func (cfg * configServiceLB , svc * v1.Service ) {
163+ discoverClusterWorkerNode (cs )
164+ if svc .Annotations == nil {
165+ svc .Annotations = map [string ]string {}
166+ }
167+ svc .Annotations [annotationLBTargetNodeLabels ] = fmt .Sprintf ("kubernetes.io/hostname=%s" , clusterNodeSingleWorker )
168+ framework .Logf ("Using service annotations: %v" , svc .Annotations )
169+ },
170+ RemoteTestReachableHTTP : true ,
171+ RequireAffinity : true ,
172+ },
173+ // FIXME: https://github.com/kubernetes/cloud-provider-aws/issues/1160
174+ // Hairpin connection work with target type as instance only when preserve client IP is disabled.
175+ // Currently CCM does not provide an interface to create a service with that setup, making an internal
176+ // Service to fail.
177+ {
178+ Name : "NLB internal should support hairpin connection" ,
179+ ResourceSuffix : "hp-nlb-int" ,
180+ Annotations : map [string ]string {
181+ annotationLBType : "nlb" ,
182+ "service.beta.kubernetes.io/aws-load-balancer-internal" : "true" ,
183+ },
184+ PostConfigService : func (cfg * configServiceLB , svc * v1.Service ) {
185+ discoverClusterWorkerNode (cs )
186+ if svc .Annotations == nil {
187+ svc .Annotations = map [string ]string {}
188+ }
189+ svc .Annotations [annotationLBTargetNodeLabels ] = fmt .Sprintf ("kubernetes.io/hostname=%s" , clusterNodeSingleWorker )
190+ framework .Logf ("Using service annotations: %v" , svc .Annotations )
191+ },
192+ RemoteTestReachableHTTP : true ,
193+ RequireAffinity : true ,
194+ ExpectTestFail : true ,
195+ },
135196 }
136197
137198 serviceNameBase := "lbconfig-test"
@@ -150,6 +211,8 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
150211 By ("creating a TCP service " + serviceName + " with type=LoadBalancerType in namespace " + ns .Name )
151212 lbConfig := newConfigServiceLB ()
152213 lbConfig .LBJig = e2eservice .NewTestJig (cs , ns .Name , serviceName )
214+
215+ // Hook annotations to support dynamic config
153216 lbServiceConfig := lbConfig .buildService (tc .Annotations )
154217
155218 // Hook: PostConfigService patchs service configuration.
@@ -169,7 +232,7 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
169232
170233 // Run Workloads
171234 By ("creating a pod to be part of the TCP service " + serviceName )
172- _ , err = lbConfig .LBJig .Run (lbConfig .buildReplicationController ())
235+ _ , err = lbConfig .LBJig .Run (lbConfig .buildReplicationController (tc . RequireAffinity ))
173236 framework .ExpectNoError (err )
174237
175238 // Hook: PostRunValidation performs LB validations after it is created (before test).
@@ -190,7 +253,11 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
190253 ingressIP := e2eservice .GetIngressPoint (& lbService .Status .LoadBalancer .Ingress [0 ])
191254 framework .Logf ("Load balancer's ingress IP: %s" , ingressIP )
192255
193- e2eservice .TestReachableHTTP (ingressIP , svcPort , e2eservice .LoadBalancerLagTimeoutAWS )
256+ if tc .RemoteTestReachableHTTP {
257+ framework .ExpectNoError (inClusterTestReachableHTTP (cs , ns .Name , clusterNodeSingleWorker , ingressIP , svcPort , tc .ExpectTestFail ))
258+ } else {
259+ e2eservice .TestReachableHTTP (ingressIP , svcPort , e2eservice .LoadBalancerLagTimeoutAWS )
260+ }
194261
195262 // Update the service to cluster IP
196263 By ("changing TCP service back to type=ClusterIP" )
@@ -281,10 +348,11 @@ func (s *configServiceLB) buildService(extraAnnotations map[string]string) *v1.S
281348// when the test framework is updated.
282349// [1] https://github.com/kubernetes/kubernetes/blob/89d95c9713a8fd189e8ad555120838b3c4f888d1/test/e2e/framework/service/jig.go#L636
283350// [2] https://github.com/kubernetes/kubernetes/issues/119021
284- func (s * configServiceLB ) buildReplicationController () func (rc * v1.ReplicationController ) {
351+ func (s * configServiceLB ) buildReplicationController (affinity bool ) func (rc * v1.ReplicationController ) {
285352 return func (rc * v1.ReplicationController ) {
286353 var replicas int32 = 1
287354 var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
355+ var affinity = affinity
288356 rc .ObjectMeta = metav1.ObjectMeta {
289357 Namespace : s .LBJig .Namespace ,
290358 Name : s .LBJig .Name ,
@@ -322,6 +390,25 @@ func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationCo
322390 },
323391 },
324392 }
393+ if affinity {
394+ rc .Spec .Template .Spec .Affinity = & v1.Affinity {
395+ NodeAffinity : & v1.NodeAffinity {
396+ RequiredDuringSchedulingIgnoredDuringExecution : & v1.NodeSelector {
397+ NodeSelectorTerms : []v1.NodeSelectorTerm {
398+ {
399+ MatchExpressions : []v1.NodeSelectorRequirement {
400+ {
401+ Key : "kubernetes.io/hostname" ,
402+ Operator : v1 .NodeSelectorOpIn ,
403+ Values : []string {clusterNodeSingleWorker },
404+ },
405+ },
406+ },
407+ },
408+ },
409+ },
410+ }
411+ }
325412 }
326413}
327414
@@ -400,3 +487,173 @@ func getLBTargetCount(ctx context.Context, lbDNSName string, expectedTargets int
400487 }
401488 return nil
402489}
490+
491+ // Lookup worker node selectors of the current kubernetes distribution.
492+ func discoverClusterWorkerNode (cs clientset.Interface ) {
493+ // Skip when already discovered
494+ if len (clusterNodesSelector ) > 0 {
495+ return
496+ }
497+ clusterNodesMutex .Lock ()
498+ defer clusterNodesMutex .Unlock ()
499+
500+ var workerNodeList []string
501+ By ("discovering node label used in the kubernetes distributions" )
502+ for _ , selector := range lookupNodeSelectors {
503+ nodeList , err := cs .CoreV1 ().Nodes ().List (context .TODO (), metav1.ListOptions {
504+ LabelSelector : selector ,
505+ })
506+ framework .ExpectNoError (err , "failed to list worker nodes" )
507+ if len (nodeList .Items ) > 0 {
508+ clusterNodesCount = len (nodeList .Items )
509+ clusterNodesSelector = selector
510+ for _ , node := range nodeList .Items {
511+ workerNodeList = append (workerNodeList , node .Name )
512+ }
513+ break
514+ }
515+ }
516+
517+ // Fail when no worker node is found - pourpuse of the function.
518+ if clusterNodesCount == 0 {
519+ framework .ExpectNoError (fmt .Errorf ("unable to find node selector for %v" , lookupNodeSelectors ))
520+ }
521+
522+ // Save the first worker node in the list to be used in cases.
523+ if len (workerNodeList ) > 0 {
524+ sort .Strings (workerNodeList )
525+ clusterNodeSingleWorker = workerNodeList [0 ]
526+ }
527+ }
528+
529+ // func inClusterTestReachableHTTP
530+ // runHTTPTestPod creates a pod to test HTTP connectivity inside the cluster.
531+ // It schedules the pod on the same node as the backend using node affinity.
532+ func inClusterTestReachableHTTP (cs clientset.Interface , namespace , nodeName , targetIP string , targetPort int , expectFailTest bool ) error {
533+ podName := "http-test-pod"
534+
535+ // client http test (curl) pod spec.
536+ pod := & v1.Pod {
537+ ObjectMeta : metav1.ObjectMeta {
538+ Name : podName ,
539+ Namespace : namespace ,
540+ },
541+ Spec : v1.PodSpec {
542+ Containers : []v1.Container {
543+ {
544+ Name : "curl" ,
545+ Image : imageutils .GetE2EImage (imageutils .Agnhost ),
546+ Command : []string {"curl" },
547+ Args : []string {
548+ "--retry" , "20" ,
549+ "--retry-delay" , "15" ,
550+ "--retry-max-time" , "300" ,
551+ "--retry-all-errors" ,
552+ "--trace-time" ,
553+ "-w" , "\\ \" \\ n---> HTTPCode=%{http_code} Time=%{time_total}ms <---\\ n\\ \" " ,
554+ fmt .Sprintf ("http://%s:%d/echo?msg=hello" , targetIP , targetPort ),
555+ },
556+ },
557+ },
558+ SecurityContext : & v1.PodSecurityContext {
559+ RunAsNonRoot : aws .Bool (true ),
560+ SeccompProfile : & v1.SeccompProfile {
561+ Type : v1 .SeccompProfileTypeRuntimeDefault ,
562+ },
563+ },
564+ RestartPolicy : v1 .RestartPolicyNever ,
565+ Affinity : & v1.Affinity {
566+ NodeAffinity : & v1.NodeAffinity {
567+ RequiredDuringSchedulingIgnoredDuringExecution : & v1.NodeSelector {
568+ NodeSelectorTerms : []v1.NodeSelectorTerm {
569+ {
570+ MatchExpressions : []v1.NodeSelectorRequirement {
571+ {
572+ Key : "kubernetes.io/hostname" ,
573+ Operator : v1 .NodeSelectorOpIn ,
574+ Values : []string {nodeName },
575+ },
576+ },
577+ },
578+ },
579+ },
580+ },
581+ },
582+ },
583+ }
584+ ct := pod .Spec .Containers [0 ]
585+ framework .Logf ("PodSpec Image=%v Command=%v Args=%v" , ct .Image , ct .Command , ct .Args )
586+
587+ // Create the pod
588+ _ , err := cs .CoreV1 ().Pods (namespace ).Create (context .TODO (), pod , metav1.CreateOptions {})
589+ if err != nil {
590+ return fmt .Errorf ("failed to create HTTP test pod: %v" , err )
591+ }
592+ // Clean up the pod
593+ defer func () {
594+ err = cs .CoreV1 ().Pods (namespace ).Delete (context .TODO (), podName , metav1.DeleteOptions {})
595+ if err != nil {
596+ framework .Logf ("Failed to delete pod %s: %v" , podName , err )
597+ }
598+ }()
599+
600+ gatherLogs := func (tail int ) {
601+ opts := & v1.PodLogOptions {}
602+ if tail > 0 {
603+ opts .TailLines = aws .Int64 (int64 (tail ))
604+ }
605+ logs , errL := cs .CoreV1 ().Pods (namespace ).GetLogs (podName , opts ).DoRaw (context .TODO ())
606+ if errL != nil {
607+ framework .Logf ("Failed to retrieve pod logs when finished: %w" , errL )
608+ }
609+ framework .Logf ("HTTP test pod logs:\n %s" , string (logs ))
610+ }
611+
612+ // Wait for the test pod to complete. Limit mux be higher than curl retries.
613+ waitCount := 0
614+ err = wait .PollImmediate (15 * time .Second , 10 * time .Minute , func () (bool , error ) {
615+ p , err := cs .CoreV1 ().Pods (namespace ).Get (context .TODO (), podName , metav1.GetOptions {})
616+ if err != nil {
617+ framework .Logf ("Error getting pod %s: %v" , podName , err )
618+ return false , err
619+ }
620+ framework .Logf ("Pod %s status: Phase=%s" , podName , p .Status .Phase )
621+ podFinished := p .Status .Phase == v1 .PodSucceeded || p .Status .Phase == v1 .PodFailed
622+
623+ // frequently collect logs.
624+ if waitCount > 0 && waitCount % 4 == 0 {
625+ gatherLogs (5 )
626+ }
627+ if podFinished {
628+ gatherLogs (0 )
629+ }
630+ waitCount ++
631+ return podFinished , nil
632+ })
633+ // Check overall error
634+ if err != nil {
635+ return fmt .Errorf ("error waiting for pod %s to complete: %v" , podName , err )
636+ }
637+
638+ // Inspect the pod's container status for exit code
639+ pod , errS := cs .CoreV1 ().Pods (namespace ).Get (context .TODO (), podName , metav1.GetOptions {})
640+ if errS != nil {
641+ return fmt .Errorf ("failed to get pod %s: %v" , podName , errS )
642+ }
643+ if len (pod .Status .ContainerStatuses ) == 0 {
644+ return fmt .Errorf ("no container statuses found for pod %s" , podName )
645+ }
646+ containerStatus := pod .Status .ContainerStatuses [0 ]
647+
648+ if containerStatus .State .Terminated != nil {
649+ exitCode := containerStatus .State .Terminated .ExitCode
650+ if exitCode != 0 {
651+ if expectFailTest {
652+ framework .Logf ("WARNING: Test failed, but failure is explicitly allowed due to the 'ExpectTestFail' flag." )
653+ } else {
654+ return fmt .Errorf ("pod %s exited with code %d" , podName , exitCode )
655+ }
656+ }
657+ }
658+ return nil
659+ }
0 commit comments