Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,46 @@ The last option is the simplest and most flexible, but it has a limitation: Kube
but only traffic on specific ports (see: [Kubernetes Issue #23864](https://github.com/kubernetes/kubernetes/issues/23864)).
Additionally, kube-proxy does not perform SNAT, which causes outgoing traffic from the pod to use the default gateway of the host where it is running.

To address these issues, we have added an additional controller that performs 1:1 NAT for services annotated with `networking.cozystack.io/wholeIP=true`.
To address these issues, we have added an additional controller that performs 1:1 NAT for services annotated with `networking.cozystack.io/wholeIP`.

## How It Works

cozy-proxy is a simple Kubernetes controller that watches for services with the `networking.cozystack.io/wholeIP=true` annotation. When it finds such a service, it creates an NFT rule that forwards all traffic from the service's external IP to the pod's IP and vice versa. It also disables connection tracking (conntrack) for traffic between the service and the pod, offloading that work to NFTables.
cozy-proxy is a simple Kubernetes controller that watches for services with the `networking.cozystack.io/wholeIP` annotation. When it finds such a service, it creates an NFT rule that forwards traffic from the service's external IP to the pod's IP and vice versa, performing source-IP preservation for egress traffic.

This controller can be used together with kube-proxy and Cilium in kube-proxy replacement mode.

## Service annotations

cozy-proxy reacts to Services that carry the `networking.cozystack.io/wholeIP`
annotation. The annotation value selects the ingress mode:

| Value | Behavior |
|-----------|-----------------------------------------------------------------------------------------------------------------|
| `"true"` | **Whole-IP passthrough.** All TCP/UDP traffic to the LoadBalancer IP is forwarded to the backend pod. |
| `"false"` | **Per-port filtering.** Only TCP/UDP traffic to ports listed in `Service.spec.ports` is forwarded; rest dropped.|
| absent | Service is not managed by cozy-proxy. |

In both managed modes, egress traffic from the backend pod is SNATed to the
LoadBalancer IP for source-IP preservation.

## Datapath

The nftables ruleset placed in table `ip cozy_proxy` consists of:

- Chain `egress_snat` at priority `raw` (-300): rewrites packet source IP via
the `pod_svc` map for outbound traffic from managed pods. Runs before
conntrack so the recorded tuple has `saddr=LB_IP`.
- Chain `ingress_dnat` at priority `mangle` (-150): rewrites packet
destination IP via the `svc_pod` map for inbound traffic to a LoadBalancer
IP. Runs after conntrack so reply packets of egress flows are matched
correctly.
- Chain `port_filter` at priority `filter` (0): for Services in port-filter
mode (`wholeIP: "false"`), drops ingress packets whose
`(daddr, l4proto, dport)` is not in `allowed_ports`. The chain accepts
packets in conntrack states `established` or `related` first, so reply
packets of egress flows bypass the filter even when their dport is the VM's
ephemeral source port.

## Installation

Install controller using Helm-chart:
Expand All @@ -35,7 +67,7 @@ helm install cozy-proxy charts/cozy-proxy -n kube-system

## Usage

Create a LoadBalancer service with `networking.cozystack.io/wholeIP=true` annotation:
Create a LoadBalancer service with `networking.cozystack.io/wholeIP: "true"` annotation:

```yaml
apiVersion: v1
Expand Down
147 changes: 123 additions & 24 deletions pkg/controllers/services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,20 @@ func (c *ServicesController) addServiceFunc(obj interface{}) {
if err == nil && ep != nil && hasValidEndpointIP(ep) && hasValidServiceIP(svc) {
se.Endpoint = ep
c.Services.Set(svc.Namespace, svc.Name, se)
svcIP := svc.Status.LoadBalancer.Ingress[0].IP
podIP := ep.Subsets[0].Addresses[0].IP
// Ensure NAT mapping rules are set.
c.Proxy.EnsureRules(svc.Status.LoadBalancer.Ingress[0].IP, ep.Subsets[0].Addresses[0].IP)
c.Proxy.EnsureRules(svcIP, podIP)
// Apply (or remove) port filtering depending on annotation value.
if wholeIPPassthrough(svc) {
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter", "svcIP", svcIP, "podIP", podIP)
}
} else {
if err := c.Proxy.EnsurePortFilter(svcIP, podIP, svc.Spec.Ports); err != nil {
log.Error(err, "failed to ensure port filter", "svcIP", svcIP, "podIP", podIP)
}
}
}
Comment on lines 231 to 248
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The port filtering logic introduced in this PR is currently only applied during Service events. However, changes to Endpoints also affect the NAT and filtering state. The addEndpointFunc, deleteEndpointFunc, and updateEndpointFunc handlers (which are not modified in this diff but are part of the controller) should be updated to manage port filters similarly to how they manage NAT rules. Without these updates, the firewall state may become inconsistent when pods are added or removed until a Service update event is triggered.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 0b1a052. Mirrored the EnsureRules / DeleteRules calls in addEndpointFunc, updateEndpointFunc, and deleteEndpointFunc with EnsurePortFilter / DeletePortFilter so the firewall tracks endpoint changes in real time, not only on Service events.

}

Expand All @@ -253,7 +265,12 @@ func (c *ServicesController) deleteServiceFunc(obj interface{}) {
return
}

c.Proxy.DeleteRules(se.Service.Status.LoadBalancer.Ingress[0].IP, se.Endpoint.Subsets[0].Addresses[0].IP)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
podIP := se.Endpoint.Subsets[0].Addresses[0].IP
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter on svc deletion", "svcIP", svcIP, "podIP", podIP)
}
c.Proxy.DeleteRules(svcIP, podIP)
c.Services.Delete(svc.Namespace, svc.Name)
}

Expand All @@ -270,10 +287,12 @@ func (c *ServicesController) updateServiceFunc(oldObj, newObj interface{}) {
if !hasWholeIPAnnotation(svc) {
if se, exists := c.Services.Get(svc.Namespace, svc.Name); exists {
if hasValidServiceIP(se.Service) && hasValidEndpointIP(se.Endpoint) {
c.Proxy.DeleteRules(
se.Service.Status.LoadBalancer.Ingress[0].IP,
se.Endpoint.Subsets[0].Addresses[0].IP,
)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
podIP := se.Endpoint.Subsets[0].Addresses[0].IP
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter", "svcIP", svcIP, "podIP", podIP)
}
c.Proxy.DeleteRules(svcIP, podIP)
}
c.Services.Delete(svc.Namespace, svc.Name)
}
Expand All @@ -284,10 +303,12 @@ func (c *ServicesController) updateServiceFunc(oldObj, newObj interface{}) {
if !hasValidServiceIP(svc) {
if se, exists := c.Services.Get(svc.Namespace, svc.Name); exists {
if hasValidServiceIP(se.Service) && hasValidEndpointIP(se.Endpoint) {
c.Proxy.DeleteRules(
se.Service.Status.LoadBalancer.Ingress[0].IP,
se.Endpoint.Subsets[0].Addresses[0].IP,
)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
podIP := se.Endpoint.Subsets[0].Addresses[0].IP
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter", "svcIP", svcIP, "podIP", podIP)
}
c.Proxy.DeleteRules(svcIP, podIP)
}
c.Services.Delete(svc.Namespace, svc.Name)
}
Expand Down Expand Up @@ -324,10 +345,18 @@ func (c *ServicesController) updateServiceFunc(oldObj, newObj interface{}) {

// At this point, both the Service and Endpoint have valid IPs.
// Ensure NAT mapping is up-to-date.
c.Proxy.EnsureRules(
svc.Status.LoadBalancer.Ingress[0].IP,
ep.Subsets[0].Addresses[0].IP,
)
svcIP := svc.Status.LoadBalancer.Ingress[0].IP
podIP := ep.Subsets[0].Addresses[0].IP
c.Proxy.EnsureRules(svcIP, podIP)
if wholeIPPassthrough(svc) {
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter", "svcIP", svcIP, "podIP", podIP)
}
} else {
if err := c.Proxy.EnsurePortFilter(svcIP, podIP, svc.Spec.Ports); err != nil {
log.Error(err, "failed to ensure port filter", "svcIP", svcIP, "podIP", podIP)
}
}

// Update or add the service mapping with the new endpoint.
c.Services.Set(svc.Namespace, svc.Name, &ServiceEndpoints{Service: svc, Endpoint: ep})
Expand All @@ -354,10 +383,19 @@ func (c *ServicesController) addEndpointFunc(obj interface{}) {

// If both the Service and the Endpoint have valid IPs, ensure NAT mapping rules.
if hasValidServiceIP(se.Service) && hasValidEndpointIP(ep) {
c.Proxy.EnsureRules(
se.Service.Status.LoadBalancer.Ingress[0].IP,
ep.Subsets[0].Addresses[0].IP,
)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
podIP := ep.Subsets[0].Addresses[0].IP
c.Proxy.EnsureRules(svcIP, podIP)
// Mirror the port-filter state to the new endpoint.
if wholeIPPassthrough(se.Service) {
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter on endpoint add", "svcIP", svcIP, "podIP", podIP)
}
} else {
if err := c.Proxy.EnsurePortFilter(svcIP, podIP, se.Service.Spec.Ports); err != nil {
log.Error(err, "failed to ensure port filter on endpoint add", "svcIP", svcIP, "podIP", podIP)
}
}
}
}

Expand All @@ -377,7 +415,12 @@ func (c *ServicesController) deleteEndpointFunc(obj interface{}) {
if !hasValidServiceIP(se.Service) || !hasValidEndpointIP(se.Endpoint) {
return
}
c.Proxy.DeleteRules(se.Service.Status.LoadBalancer.Ingress[0].IP, se.Endpoint.Subsets[0].Addresses[0].IP)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
podIP := se.Endpoint.Subsets[0].Addresses[0].IP
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter on endpoint delete", "svcIP", svcIP, "podIP", podIP)
}
c.Proxy.DeleteRules(svcIP, podIP)
// Set the endpoint to nil.
c.Services.SetEndpoint(ep.Namespace, ep.Name, nil)
}
Expand All @@ -397,7 +440,12 @@ func (c *ServicesController) updateEndpointFunc(oldObj, newObj interface{}) {
}
if !hasValidEndpointIP(ep) {
if hasValidServiceIP(se.Service) && hasValidEndpointIP(se.Endpoint) {
c.Proxy.DeleteRules(se.Service.Status.LoadBalancer.Ingress[0].IP, se.Endpoint.Subsets[0].Addresses[0].IP)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
oldPodIP := se.Endpoint.Subsets[0].Addresses[0].IP
if err := c.Proxy.DeletePortFilter(svcIP, oldPodIP); err != nil {
log.Error(err, "failed to delete port filter on endpoint invalidation", "svcIP", svcIP, "podIP", oldPodIP)
}
c.Proxy.DeleteRules(svcIP, oldPodIP)
}
c.Services.SetEndpoint(ep.Namespace, ep.Name, ep)
return
Expand All @@ -408,7 +456,18 @@ func (c *ServicesController) updateEndpointFunc(oldObj, newObj interface{}) {
if !hasValidEndpointIP(ep) {
return
}
c.Proxy.EnsureRules(se.Service.Status.LoadBalancer.Ingress[0].IP, ep.Subsets[0].Addresses[0].IP)
svcIP := se.Service.Status.LoadBalancer.Ingress[0].IP
podIP := ep.Subsets[0].Addresses[0].IP
c.Proxy.EnsureRules(svcIP, podIP)
if wholeIPPassthrough(se.Service) {
if err := c.Proxy.DeletePortFilter(svcIP, podIP); err != nil {
log.Error(err, "failed to delete port filter on endpoint update", "svcIP", svcIP, "podIP", podIP)
}
} else {
if err := c.Proxy.EnsurePortFilter(svcIP, podIP, se.Service.Spec.Ports); err != nil {
log.Error(err, "failed to ensure port filter on endpoint update", "svcIP", svcIP, "podIP", podIP)
}
}
c.Services.SetEndpoint(ep.Namespace, ep.Name, ep)
}

Expand Down Expand Up @@ -447,10 +506,28 @@ func hasValidEndpointIP(ep *v1.Endpoints) bool {
return ep.Subsets[0].Addresses[0].IP != ""
}

// hasWholeIPAnnotation checks if the service has the wholeIP annotation set to true.
const wholeIPAnnotation = "networking.cozystack.io/wholeIP"

// hasWholeIPAnnotation reports whether the service is opted into cozy-proxy
// management via the wholeIP annotation. Any value triggers management — the
// value's meaning (passthrough vs port-filter) is determined by wholeIPPassthrough.
func hasWholeIPAnnotation(svc *v1.Service) bool {
val, ok := svc.Annotations["networking.cozystack.io/wholeIP"]
return ok && val == "true"
if svc == nil {
return false
}
_, ok := svc.Annotations[wholeIPAnnotation]
return ok
}

// wholeIPPassthrough reports whether ingress traffic should bypass port
// filtering. Defaults to true (current behavior) for any value other than
// the explicit string "false", preserving backward compatibility for services
// rendered by older charts (annotation: "true") and services with no value.
func wholeIPPassthrough(svc *v1.Service) bool {
if svc == nil || svc.Annotations == nil {
return true
}
return svc.Annotations[wholeIPAnnotation] != "false"
}

// cleanupRemovedServices performs an initial cleanup for removed services.
Expand Down Expand Up @@ -478,5 +555,27 @@ func (c *ServicesController) cleanupRemovedServices() error {
if err := c.Proxy.CleanupRules(keepMap); err != nil {
return fmt.Errorf("failed to perform initial cleanup: %w", err)
}
// Build per-svc port filter snapshot for services in non-passthrough mode.
// Keyed by svcIP (caller convenience); the entry carries podIP and ports
// because the actual nft set keys are pod IPs (post-DNAT match).
keepFilters := make(map[string]nat.PortFilterEntry)
for _, se := range allServices {
if se.Service == nil || se.Endpoint == nil {
continue
}
if !hasValidServiceIP(se.Service) || !hasValidEndpointIP(se.Endpoint) {
continue
}
if wholeIPPassthrough(se.Service) {
continue
}
keepFilters[se.Service.Status.LoadBalancer.Ingress[0].IP] = nat.PortFilterEntry{
PodIP: se.Endpoint.Subsets[0].Addresses[0].IP,
Ports: se.Service.Spec.Ports,
}
}
if err := c.Proxy.CleanupPortFilters(keepFilters); err != nil {
return fmt.Errorf("failed to perform port-filter cleanup: %w", err)
}
return nil
}
53 changes: 53 additions & 0 deletions pkg/controllers/services_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package controllers

import (
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func svcWith(annot map[string]string) *v1.Service {
return &v1.Service{ObjectMeta: metav1.ObjectMeta{Annotations: annot}}
}

func TestHasWholeIPAnnotation(t *testing.T) {
cases := []struct {
name string
svc *v1.Service
expect bool
}{
{"true value", svcWith(map[string]string{"networking.cozystack.io/wholeIP": "true"}), true},
{"false value", svcWith(map[string]string{"networking.cozystack.io/wholeIP": "false"}), true},
{"absent annotation", svcWith(map[string]string{}), false},
{"nil annotations", &v1.Service{}, false},
{"unrelated annotation", svcWith(map[string]string{"foo": "bar"}), false},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if got := hasWholeIPAnnotation(c.svc); got != c.expect {
t.Errorf("hasWholeIPAnnotation = %v, want %v", got, c.expect)
}
})
}
}

func TestWholeIPPassthrough(t *testing.T) {
cases := []struct {
name string
svc *v1.Service
expect bool
}{
{"true value", svcWith(map[string]string{"networking.cozystack.io/wholeIP": "true"}), true},
{"false value", svcWith(map[string]string{"networking.cozystack.io/wholeIP": "false"}), false},
{"absent annotation defaults to passthrough", svcWith(map[string]string{}), true},
{"nil annotations defaults to passthrough", &v1.Service{}, true},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if got := wholeIPPassthrough(c.svc); got != c.expect {
t.Errorf("wholeIPPassthrough = %v, want %v", got, c.expect)
}
})
}
}
24 changes: 23 additions & 1 deletion pkg/proxy/dummy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package proxy

import "fmt"
import (
"fmt"

corev1 "k8s.io/api/core/v1"
)

type DummyProxyProcessor struct{}

Expand All @@ -23,3 +27,21 @@ func (d *DummyProxyProcessor) CleanupRules(KeepMap map[string]string) error {
fmt.Println("CleanupRules called with KeepMap:", KeepMap)
return nil
}

func (d *DummyProxyProcessor) EnsurePortFilter(SvcIP, PodIP string, Ports []corev1.ServicePort) error {
fmt.Printf("EnsurePortFilter called with SvcIP: %s, PodIP: %s, Ports: %+v\n", SvcIP, PodIP, Ports)
return nil
}

func (d *DummyProxyProcessor) DeletePortFilter(SvcIP, PodIP string) error {
fmt.Printf("DeletePortFilter called with SvcIP: %s, PodIP: %s\n", SvcIP, PodIP)
return nil
}

func (d *DummyProxyProcessor) CleanupPortFilters(keep map[string]PortFilterEntry) error {
fmt.Printf("CleanupPortFilters called with %d entries\n", len(keep))
return nil
}

// Compile-time assertion that DummyProxyProcessor satisfies ProxyProcessor.
var _ ProxyProcessor = (*DummyProxyProcessor)(nil)
Loading