Skip to content

Commit 460c757

Browse files
proxy/nft: batch netlink ops in EnsurePortFilter and CleanupPortFilters
EnsurePortFilter previously called SetAddElements once per port; accumulate elements and issue a single call. CleanupPortFilters previously delegated to DeletePortFilter / EnsurePortFilter in a loop, each performing GetSetElements and Flush — O(N^2) in number of services * ports. Replace with a single-pass diff: fetch current state once per set, compute additions and removals in memory, batch SetAddElements / SetDeleteElements per set, and Flush exactly once. Addresses gemini-code-assist review feedback on PR #11. Signed-off-by: mattia-eleuteri <mattia@hidora.io>
1 parent 0b1a052 commit 460c757

1 file changed

Lines changed: 97 additions & 19 deletions

File tree

pkg/proxy/nft.go

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,8 @@ func (p *NFTProxyProcessor) EnsurePortFilter(svcIP, podIP string, ports []corev1
594594
return fmt.Errorf("failed to add %s to filtered_pods: %v", podIP, err)
595595
}
596596

597-
// 3. Add allowed (podIP, proto, port) tuples.
597+
// 3. Accumulate allowed (podIP, proto, port) tuples and add them in a single batch.
598+
var elements []nftables.SetElement
598599
for _, sp := range ports {
599600
var protoByte byte
600601
switch sp.Protocol {
@@ -607,10 +608,11 @@ func (p *NFTProxyProcessor) EnsurePortFilter(svcIP, podIP string, ports []corev1
607608
"podIP", podIP, "protocol", sp.Protocol)
608609
continue
609610
}
610-
key := concatPortKey(parsedPodIP, protoByte, uint16(sp.Port))
611-
if err := p.conn.SetAddElements(p.allowedPorts, []nftables.SetElement{{Key: key}}); err != nil {
612-
return fmt.Errorf("failed to add port tuple to allowed_ports (pod=%s proto=%v port=%d): %v",
613-
podIP, sp.Protocol, sp.Port, err)
611+
elements = append(elements, nftables.SetElement{Key: concatPortKey(parsedPodIP, protoByte, uint16(sp.Port))})
612+
}
613+
if len(elements) > 0 {
614+
if err := p.conn.SetAddElements(p.allowedPorts, elements); err != nil {
615+
return fmt.Errorf("failed to add port tuples to allowed_ports for pod %s: %v", podIP, err)
614616
}
615617
}
616618

@@ -651,35 +653,111 @@ func (p *NFTProxyProcessor) DeletePortFilter(svcIP, podIP string) error {
651653

652654
// CleanupPortFilters reconciles port_filter state with the desired snapshot.
653655
// The keep map is keyed by svcIP (for caller convenience) but the on-disk
654-
// nft set keys are pod IPs. We remove any pod IP currently in filtered_pods
655-
// that is not desired, then re-Ensure the desired ones.
656+
// nft set keys are pod IPs. The implementation is a single-pass diff: it
657+
// fetches current state once, computes additions and removals in memory,
658+
// batches SetAddElements / SetDeleteElements per set, and Flushes exactly
659+
// once.
656660
func (p *NFTProxyProcessor) CleanupPortFilters(keep map[string]PortFilterEntry) error {
657661
log.Info("Starting CleanupPortFilters", "keepCount", len(keep))
658662

659-
desired := make(map[string]bool, len(keep))
663+
// 1. Build desired state in memory.
664+
desiredPods := make(map[string]bool, len(keep))
665+
desiredPortKeys := make(map[string][]byte) // hex(key) → key (byte slice)
660666
for _, entry := range keep {
661-
desired[entry.PodIP] = true
667+
parsedPodIP := net.ParseIP(entry.PodIP).To4()
668+
if parsedPodIP == nil {
669+
log.Info("Skipping invalid podIP in cleanup keep map", "podIP", entry.PodIP)
670+
continue
671+
}
672+
desiredPods[entry.PodIP] = true
673+
for _, sp := range entry.Ports {
674+
var protoByte byte
675+
switch sp.Protocol {
676+
case corev1.ProtocolTCP, "":
677+
protoByte = unix.IPPROTO_TCP
678+
case corev1.ProtocolUDP:
679+
protoByte = unix.IPPROTO_UDP
680+
default:
681+
continue
682+
}
683+
key := concatPortKey(parsedPodIP, protoByte, uint16(sp.Port))
684+
desiredPortKeys[fmt.Sprintf("%x", key)] = key
685+
}
662686
}
663687

664-
current, err := p.conn.GetSetElements(p.filteredPods)
688+
// 2. Fetch current state.
689+
currentPods, err := p.conn.GetSetElements(p.filteredPods)
665690
if err != nil {
666691
return fmt.Errorf("failed to list filtered_pods: %v", err)
667692
}
668-
for _, el := range current {
693+
currentPorts, err := p.conn.GetSetElements(p.allowedPorts)
694+
if err != nil {
695+
return fmt.Errorf("failed to list allowed_ports: %v", err)
696+
}
697+
698+
// 3. Compute additions / removals.
699+
var addPods, delPods []nftables.SetElement
700+
var addPorts, delPorts []nftables.SetElement
701+
seenPods := make(map[string]bool, len(currentPods))
702+
for _, el := range currentPods {
669703
ipStr := net.IP(el.Key).String()
670-
if !desired[ipStr] {
671-
log.Info("Removing stale filtered_pod", "podIP", ipStr)
672-
// We don't have the original svcIP in current state; pass empty.
673-
if err := p.DeletePortFilter("", ipStr); err != nil {
674-
return fmt.Errorf("cleanup delete pod %s: %v", ipStr, err)
704+
seenPods[ipStr] = true
705+
if !desiredPods[ipStr] {
706+
delPods = append(delPods, nftables.SetElement{Key: el.Key})
707+
}
708+
}
709+
for podIPStr := range desiredPods {
710+
if !seenPods[podIPStr] {
711+
parsed := net.ParseIP(podIPStr).To4()
712+
if parsed == nil {
713+
continue
675714
}
715+
addPods = append(addPods, nftables.SetElement{Key: parsed})
676716
}
677717
}
678-
for svcIP, entry := range keep {
679-
if err := p.EnsurePortFilter(svcIP, entry.PodIP, entry.Ports); err != nil {
680-
return fmt.Errorf("cleanup ensure %s: %v", svcIP, err)
718+
seenPorts := make(map[string]bool, len(currentPorts))
719+
for _, el := range currentPorts {
720+
keyHex := fmt.Sprintf("%x", el.Key)
721+
seenPorts[keyHex] = true
722+
if _, want := desiredPortKeys[keyHex]; !want {
723+
delPorts = append(delPorts, nftables.SetElement{Key: el.Key})
681724
}
682725
}
726+
for keyHex, key := range desiredPortKeys {
727+
if !seenPorts[keyHex] {
728+
addPorts = append(addPorts, nftables.SetElement{Key: key})
729+
}
730+
}
731+
732+
// 4. Batch ops.
733+
if len(delPods) > 0 {
734+
if err := p.conn.SetDeleteElements(p.filteredPods, delPods); err != nil {
735+
return fmt.Errorf("failed to delete stale filtered_pods: %v", err)
736+
}
737+
}
738+
if len(delPorts) > 0 {
739+
if err := p.conn.SetDeleteElements(p.allowedPorts, delPorts); err != nil {
740+
return fmt.Errorf("failed to delete stale allowed_ports: %v", err)
741+
}
742+
}
743+
if len(addPods) > 0 {
744+
if err := p.conn.SetAddElements(p.filteredPods, addPods); err != nil {
745+
return fmt.Errorf("failed to add filtered_pods: %v", err)
746+
}
747+
}
748+
if len(addPorts) > 0 {
749+
if err := p.conn.SetAddElements(p.allowedPorts, addPorts); err != nil {
750+
return fmt.Errorf("failed to add allowed_ports: %v", err)
751+
}
752+
}
753+
754+
// 5. Single flush.
755+
if err := p.conn.Flush(); err != nil {
756+
return fmt.Errorf("failed to flush CleanupPortFilters: %v", err)
757+
}
758+
log.Info("CleanupPortFilters completed",
759+
"addedPods", len(addPods), "removedPods", len(delPods),
760+
"addedPorts", len(addPorts), "removedPorts", len(delPorts))
683761
return nil
684762
}
685763

0 commit comments

Comments
 (0)