Skip to content

Commit a623f3d

Browse files
committed
e2e/loadbalancer: implement hairpin connection cases
Implementing the hairpin connection test cases, and exposing an issue on NLB with internal scheme which fails when the client is trying to access a service loadbalancer which is hosted in the same node. The hairpin connection is caused by the client IP preservation attribute is set to true (default), and the service does not provide an interface to prevent the issue. The e2e is expecting to pass to prevent permanent failures in CI, but it is tracked by an issue #1160.
1 parent 31a4ec4 commit a623f3d

1 file changed

Lines changed: 286 additions & 29 deletions

File tree

tests/e2e/loadbalancer.go

Lines changed: 286 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ package e2e
1616
import (
1717
"context"
1818
"fmt"
19+
"sort"
1920
"strings"
21+
"sync"
22+
"time"
2023

2124
. "github.com/onsi/ginkgo/v2"
2225
v1 "k8s.io/api/core/v1"
2326
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2427
"k8s.io/apimachinery/pkg/util/intstr"
28+
"k8s.io/apimachinery/pkg/util/wait"
2529
clientset "k8s.io/client-go/kubernetes"
2630
"k8s.io/kubernetes/test/e2e/framework"
2731
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
@@ -43,6 +47,10 @@ var (
4347
clusterNodesSelector string
4448
clusterNodesCount int = 0
4549

50+
// clusterNodeSingleWorker is a single worker node used to test cases.
51+
clusterNodeSingleWorker string
52+
clusterNodesMutex sync.Mutex
53+
4654
// lookupNodeSelectors are valid compute/node/worker selectors commonly used in different kubernetes
4755
// distributions.
4856
lookupNodeSelectors = []string{
@@ -71,11 +79,14 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
7179
})
7280

7381
type loadBalancerTestCases struct {
74-
Name string
75-
ResourceSuffix string
76-
Annotations map[string]string
77-
PostConfigService func(cfg *configServiceLB, svc *v1.Service)
78-
PostRunValidation func(cfg *configServiceLB, svc *v1.Service)
82+
Name string
83+
ResourceSuffix string
84+
Annotations map[string]string
85+
PostConfigService func(cfg *configServiceLB, svc *v1.Service)
86+
PostRunValidation func(cfg *configServiceLB, svc *v1.Service)
87+
RemoteTestReachableHTTP bool
88+
RequireAffinity bool
89+
ExpectTestFail bool
7990
}
8091
cases := []loadBalancerTestCases{
8192
{
@@ -93,29 +104,10 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
93104
{
94105
Name: "NLB should configure the loadbalancer with target-node-labels",
95106
ResourceSuffix: "sg-nd",
96-
Annotations: map[string]string{
97-
annotationLBType: "nlb",
98-
},
107+
Annotations: map[string]string{annotationLBType: "nlb"},
99108
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
100109
// discover clusterNodeSelector and patch service
101-
// TODO: move to external function if there are more scenarios to discover nodes.
102-
By("discovering node label used in the kubernetes distributions")
103-
for _, selector := range lookupNodeSelectors {
104-
nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
105-
LabelSelector: selector,
106-
})
107-
framework.ExpectNoError(err, "failed to list worker nodes")
108-
if len(nodeList.Items) > 0 {
109-
clusterNodesCount = len(nodeList.Items)
110-
clusterNodesSelector = selector
111-
break
112-
}
113-
}
114-
115-
if clusterNodesCount == 0 {
116-
framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors))
117-
}
118-
110+
discoverClusterWorkerNode(cs)
119111
By(fmt.Sprintf("found %d nodes with selector %q\n", clusterNodesCount, clusterNodesSelector))
120112
if svc.Annotations == nil {
121113
svc.Annotations = map[string]string{}
@@ -132,6 +124,75 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
132124
framework.ExpectNoError(getLBTargetCount(context.TODO(), lbDNS, clusterNodesCount), "AWS LB target count validation failed")
133125
},
134126
},
127+
{
128+
Name: "internet-facing should support hairpin connection",
129+
ResourceSuffix: "hairpin-clb",
130+
Annotations: map[string]string{},
131+
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
132+
discoverClusterWorkerNode(cs)
133+
if svc.Annotations == nil {
134+
svc.Annotations = map[string]string{}
135+
}
136+
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
137+
framework.Logf("Using service annotations: %v", svc.Annotations)
138+
},
139+
RemoteTestReachableHTTP: true,
140+
},
141+
{
142+
Name: "NLB internet-facing should support hairpin connection",
143+
ResourceSuffix: "hairpin-nlb",
144+
Annotations: map[string]string{annotationLBType: "nlb"},
145+
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
146+
discoverClusterWorkerNode(cs)
147+
if svc.Annotations == nil {
148+
svc.Annotations = map[string]string{}
149+
}
150+
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
151+
framework.Logf("Using service annotations: %v", svc.Annotations)
152+
},
153+
RemoteTestReachableHTTP: true,
154+
RequireAffinity: true,
155+
},
156+
{
157+
Name: "internal should support hairpin connection",
158+
ResourceSuffix: "hp-clb-int",
159+
Annotations: map[string]string{
160+
"service.beta.kubernetes.io/aws-load-balancer-internal": "true",
161+
},
162+
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
163+
discoverClusterWorkerNode(cs)
164+
if svc.Annotations == nil {
165+
svc.Annotations = map[string]string{}
166+
}
167+
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
168+
framework.Logf("Using service annotations: %v", svc.Annotations)
169+
},
170+
RemoteTestReachableHTTP: true,
171+
RequireAffinity: true,
172+
},
173+
// FIXME: https://github.com/kubernetes/cloud-provider-aws/issues/1160
174+
// Hairpin connection work with target type as instance only when preserve client IP is disabled.
175+
// Currently CCM does not provide an interface to create a service with that setup, making an internal
176+
// Service to fail.
177+
{
178+
Name: "NLB internal should support hairpin connection",
179+
ResourceSuffix: "hp-nlb-int",
180+
Annotations: map[string]string{
181+
annotationLBType: "nlb",
182+
"service.beta.kubernetes.io/aws-load-balancer-internal": "true",
183+
},
184+
PostConfigService: func(cfg *configServiceLB, svc *v1.Service) {
185+
discoverClusterWorkerNode(cs)
186+
if svc.Annotations == nil {
187+
svc.Annotations = map[string]string{}
188+
}
189+
svc.Annotations[annotationLBTargetNodeLabels] = fmt.Sprintf("kubernetes.io/hostname=%s", clusterNodeSingleWorker)
190+
framework.Logf("Using service annotations: %v", svc.Annotations)
191+
},
192+
RemoteTestReachableHTTP: true,
193+
RequireAffinity: true,
194+
ExpectTestFail: true,
195+
},
135196
}
136197

137198
serviceNameBase := "lbconfig-test"
@@ -150,6 +211,8 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
150211
By("creating a TCP service " + serviceName + " with type=LoadBalancerType in namespace " + ns.Name)
151212
lbConfig := newConfigServiceLB()
152213
lbConfig.LBJig = e2eservice.NewTestJig(cs, ns.Name, serviceName)
214+
215+
// Hook annotations to support dynamic config
153216
lbServiceConfig := lbConfig.buildService(tc.Annotations)
154217

155218
// Hook: PostConfigService patchs service configuration.
@@ -169,7 +232,7 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
169232

170233
// Run Workloads
171234
By("creating a pod to be part of the TCP service " + serviceName)
172-
_, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController())
235+
_, err = lbConfig.LBJig.Run(lbConfig.buildReplicationController(tc.RequireAffinity))
173236
framework.ExpectNoError(err)
174237

175238
// Hook: PostRunValidation performs LB validations after it is created (before test).
@@ -190,7 +253,11 @@ var _ = Describe("[cloud-provider-aws-e2e] loadbalancer", func() {
190253
ingressIP := e2eservice.GetIngressPoint(&lbService.Status.LoadBalancer.Ingress[0])
191254
framework.Logf("Load balancer's ingress IP: %s", ingressIP)
192255

193-
e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS)
256+
if tc.RemoteTestReachableHTTP {
257+
framework.ExpectNoError(inClusterTestReachableHTTP(cs, ns.Name, clusterNodeSingleWorker, ingressIP, svcPort, tc.ExpectTestFail))
258+
} else {
259+
e2eservice.TestReachableHTTP(ingressIP, svcPort, e2eservice.LoadBalancerLagTimeoutAWS)
260+
}
194261

195262
// Update the service to cluster IP
196263
By("changing TCP service back to type=ClusterIP")
@@ -281,10 +348,11 @@ func (s *configServiceLB) buildService(extraAnnotations map[string]string) *v1.S
281348
// when the test framework is updated.
282349
// [1] https://github.com/kubernetes/kubernetes/blob/89d95c9713a8fd189e8ad555120838b3c4f888d1/test/e2e/framework/service/jig.go#L636
283350
// [2] https://github.com/kubernetes/kubernetes/issues/119021
284-
func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationController) {
351+
func (s *configServiceLB) buildReplicationController(affinity bool) func(rc *v1.ReplicationController) {
285352
return func(rc *v1.ReplicationController) {
286353
var replicas int32 = 1
287354
var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
355+
var affinity = affinity
288356
rc.ObjectMeta = metav1.ObjectMeta{
289357
Namespace: s.LBJig.Namespace,
290358
Name: s.LBJig.Name,
@@ -322,6 +390,25 @@ func (s *configServiceLB) buildReplicationController() func(rc *v1.ReplicationCo
322390
},
323391
},
324392
}
393+
if affinity {
394+
rc.Spec.Template.Spec.Affinity = &v1.Affinity{
395+
NodeAffinity: &v1.NodeAffinity{
396+
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
397+
NodeSelectorTerms: []v1.NodeSelectorTerm{
398+
{
399+
MatchExpressions: []v1.NodeSelectorRequirement{
400+
{
401+
Key: "kubernetes.io/hostname",
402+
Operator: v1.NodeSelectorOpIn,
403+
Values: []string{clusterNodeSingleWorker},
404+
},
405+
},
406+
},
407+
},
408+
},
409+
},
410+
}
411+
}
325412
}
326413
}
327414

@@ -400,3 +487,173 @@ func getLBTargetCount(ctx context.Context, lbDNSName string, expectedTargets int
400487
}
401488
return nil
402489
}
490+
491+
// Lookup worker node selectors of the current kubernetes distribution.
492+
func discoverClusterWorkerNode(cs clientset.Interface) {
493+
// Skip when already discovered
494+
if len(clusterNodesSelector) > 0 {
495+
return
496+
}
497+
clusterNodesMutex.Lock()
498+
defer clusterNodesMutex.Unlock()
499+
500+
var workerNodeList []string
501+
By("discovering node label used in the kubernetes distributions")
502+
for _, selector := range lookupNodeSelectors {
503+
nodeList, err := cs.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
504+
LabelSelector: selector,
505+
})
506+
framework.ExpectNoError(err, "failed to list worker nodes")
507+
if len(nodeList.Items) > 0 {
508+
clusterNodesCount = len(nodeList.Items)
509+
clusterNodesSelector = selector
510+
for _, node := range nodeList.Items {
511+
workerNodeList = append(workerNodeList, node.Name)
512+
}
513+
break
514+
}
515+
}
516+
517+
// Fail when no worker node is found - pourpuse of the function.
518+
if clusterNodesCount == 0 {
519+
framework.ExpectNoError(fmt.Errorf("unable to find node selector for %v", lookupNodeSelectors))
520+
}
521+
522+
// Save the first worker node in the list to be used in cases.
523+
if len(workerNodeList) > 0 {
524+
sort.Strings(workerNodeList)
525+
clusterNodeSingleWorker = workerNodeList[0]
526+
}
527+
}
528+
529+
// func inClusterTestReachableHTTP
530+
// runHTTPTestPod creates a pod to test HTTP connectivity inside the cluster.
531+
// It schedules the pod on the same node as the backend using node affinity.
532+
func inClusterTestReachableHTTP(cs clientset.Interface, namespace, nodeName, targetIP string, targetPort int, expectFailTest bool) error {
533+
podName := "http-test-pod"
534+
535+
// client http test (curl) pod spec.
536+
pod := &v1.Pod{
537+
ObjectMeta: metav1.ObjectMeta{
538+
Name: podName,
539+
Namespace: namespace,
540+
},
541+
Spec: v1.PodSpec{
542+
Containers: []v1.Container{
543+
{
544+
Name: "curl",
545+
Image: imageutils.GetE2EImage(imageutils.Agnhost),
546+
Command: []string{"curl"},
547+
Args: []string{
548+
"--retry", "20",
549+
"--retry-delay", "15",
550+
"--retry-max-time", "540",
551+
"--retry-all-errors",
552+
"--trace-time",
553+
"-w", "\\\"\\n---> HTTPCode=%{http_code} Time=%{time_total}ms <---\\n\\\"",
554+
fmt.Sprintf("http://%s:%d/echo?msg=hello", targetIP, targetPort),
555+
},
556+
},
557+
},
558+
SecurityContext: &v1.PodSecurityContext{
559+
RunAsNonRoot: aws.Bool(true),
560+
SeccompProfile: &v1.SeccompProfile{
561+
Type: v1.SeccompProfileTypeRuntimeDefault,
562+
},
563+
},
564+
RestartPolicy: v1.RestartPolicyNever,
565+
Affinity: &v1.Affinity{
566+
NodeAffinity: &v1.NodeAffinity{
567+
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
568+
NodeSelectorTerms: []v1.NodeSelectorTerm{
569+
{
570+
MatchExpressions: []v1.NodeSelectorRequirement{
571+
{
572+
Key: "kubernetes.io/hostname",
573+
Operator: v1.NodeSelectorOpIn,
574+
Values: []string{nodeName},
575+
},
576+
},
577+
},
578+
},
579+
},
580+
},
581+
},
582+
},
583+
}
584+
ct := pod.Spec.Containers[0]
585+
framework.Logf("PodSpec Image=%v Command=%v Args=%v", ct.Image, ct.Command, ct.Args)
586+
587+
// Create the pod
588+
_, err := cs.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
589+
if err != nil {
590+
return fmt.Errorf("failed to create HTTP test pod: %v", err)
591+
}
592+
// Clean up the pod
593+
defer func() {
594+
err = cs.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{})
595+
if err != nil {
596+
framework.Logf("Failed to delete pod %s: %v", podName, err)
597+
}
598+
}()
599+
600+
gatherLogs := func(tail int) {
601+
opts := &v1.PodLogOptions{}
602+
if tail > 0 {
603+
opts.TailLines = aws.Int64(int64(tail))
604+
}
605+
logs, errL := cs.CoreV1().Pods(namespace).GetLogs(podName, opts).DoRaw(context.TODO())
606+
if errL != nil {
607+
framework.Logf("Failed to retrieve pod logs when finished: %w", errL)
608+
}
609+
framework.Logf("HTTP test pod logs:\n%s", string(logs))
610+
}
611+
612+
// Wait for the test pod to complete. Limit mux be higher than curl retries.
613+
waitCount := 0
614+
err = wait.PollImmediate(15*time.Second, 10*time.Minute, func() (bool, error) {
615+
p, err := cs.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
616+
if err != nil {
617+
framework.Logf("Error getting pod %s: %v", podName, err)
618+
return false, err
619+
}
620+
framework.Logf("Pod %s status: Phase=%s", podName, p.Status.Phase)
621+
podFinished := p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed
622+
623+
// frequently collect logs.
624+
if waitCount > 0 && waitCount%4 == 0 {
625+
gatherLogs(5)
626+
}
627+
if podFinished {
628+
gatherLogs(0)
629+
}
630+
waitCount++
631+
return podFinished, nil
632+
})
633+
// Check overall error
634+
if err != nil {
635+
return fmt.Errorf("error waiting for pod %s to complete: %v", podName, err)
636+
}
637+
638+
// Inspect the pod's container status for exit code
639+
pod, errS := cs.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
640+
if errS != nil {
641+
return fmt.Errorf("failed to get pod %s: %v", podName, errS)
642+
}
643+
if len(pod.Status.ContainerStatuses) == 0 {
644+
return fmt.Errorf("no container statuses found for pod %s", podName)
645+
}
646+
containerStatus := pod.Status.ContainerStatuses[0]
647+
648+
if containerStatus.State.Terminated != nil {
649+
exitCode := containerStatus.State.Terminated.ExitCode
650+
if exitCode != 0 {
651+
if expectFailTest {
652+
framework.Logf("WARNING: Test failed, but failure is explicitly allowed due to the 'ExpectTestFail' flag.")
653+
} else {
654+
return fmt.Errorf("pod %s exited with code %d", podName, exitCode)
655+
}
656+
}
657+
}
658+
return nil
659+
}

0 commit comments

Comments
 (0)