Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 94 additions & 11 deletions pkg/ebpf/conntrack/conntrack_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,30 @@ type conntrackClient struct {
hydratelocalConntrack bool
localConntrackV4Cache map[utils.ConntrackKey]bool
localConntrackV6Cache map[utils.ConntrackKeyV6]bool

// Consecutive miss tracking for IPv4 - requires 2 consecutive cleanup cycles
// before deletion to avoid race condition during 5-tuple reuse
missingEntriesEvenCycleV4 map[utils.ConntrackKey]bool
missingEntriesOddCycleV4 map[utils.ConntrackKey]bool

// Consecutive miss tracking for IPv6
missingEntriesEvenCycleV6 map[utils.ConntrackKeyV6]bool
missingEntriesOddCycleV6 map[utils.ConntrackKeyV6]bool

// Track current cycle parity (true = even, false = odd)
isEvenCycle bool
}

func NewConntrackClient(conntrackMap goebpfmaps.BpfMap, enableIPv6 bool) *conntrackClient {
return &conntrackClient{
conntrackMap: conntrackMap,
enableIPv6: enableIPv6,
hydratelocalConntrack: true,
conntrackMap: conntrackMap,
enableIPv6: enableIPv6,
hydratelocalConntrack: true,
missingEntriesEvenCycleV4: make(map[utils.ConntrackKey]bool),
missingEntriesOddCycleV4: make(map[utils.ConntrackKey]bool),
missingEntriesEvenCycleV6: make(map[utils.ConntrackKeyV6]bool),
missingEntriesOddCycleV6: make(map[utils.ConntrackKeyV6]bool),
isEvenCycle: true, // Start with even cycle
}
}

Expand Down Expand Up @@ -169,6 +186,7 @@ func (c *conntrackClient) CleanupConntrackMap() {
}
}
// Check if the local cache and kernel cache is in sync
// Consecutive miss approach: require entry to be missing for 2 consecutive cycles
for localConntrackEntry, _ := range c.localConntrackV4Cache {
newKey := utils.ConntrackKey{}
newKey.Source_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Source_ip))
Expand All @@ -179,14 +197,45 @@ func (c *conntrackClient) CleanupConntrackMap() {
newKey.Owner_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Owner_ip))
_, ok := kernelConntrackV4Cache[newKey]
if !ok {
// Delete the entry in local cache since kernel entry is still missing so expired case
// Entry missing from kernel - apply consecutive miss logic
expiredFlow := localConntrackEntry
key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvIntToIPv4(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvIntToIPv4(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvIntToIPv4(expiredFlow.Owner_ip).String())
log().Infof("Conntrack cleanup Delete - %s", key)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow)))


if c.isEvenCycle {
// Even cycle: check if also missing in previous odd cycle
if _, existsInOdd := c.missingEntriesOddCycleV4[newKey]; existsInOdd {
// Missing for 2 consecutive cycles - safe to delete
log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow)))
} else {
// First miss - track in even cycle set
c.missingEntriesEvenCycleV4[newKey] = true
}
} else {
// Odd cycle: check if also missing in previous even cycle
if _, existsInEven := c.missingEntriesEvenCycleV4[newKey]; existsInEven {
// Missing for 2 consecutive cycles - safe to delete
log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow)))
} else {
// First miss - track in odd cycle set
c.missingEntriesOddCycleV4[newKey] = true
}
}
}
}

// Cycle management: clear previous cycle's tracking and toggle
if c.isEvenCycle {
// Clear previous odd cycle entries
c.missingEntriesOddCycleV4 = make(map[utils.ConntrackKey]bool)
} else {
// Clear previous even cycle entries
c.missingEntriesEvenCycleV4 = make(map[utils.ConntrackKey]bool)
}
// Toggle cycle for next cleanup
c.isEvenCycle = !c.isEvenCycle

//c.localConntrackV4Cache = make(map[utils.ConntrackKey]bool)
log().Info("Done cleanup of conntrack map")
c.hydratelocalConntrack = true
Expand Down Expand Up @@ -326,17 +375,51 @@ func (c *conntrackClient) Cleanupv6ConntrackMap() {

}
// Check if the local cache and kernel cache is in sync
// Consecutive miss approach: require entry to be missing for 2 consecutive cycles
for localConntrackEntry, _ := range c.localConntrackV6Cache {
_, ok := kernelConntrackV6Cache[localConntrackEntry]
if !ok {
// Delete the entry in local cache since kernel entry is still missing so expired case
// Entry missing from kernel - apply consecutive miss logic
expiredFlow := localConntrackEntry
key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvByteToIPv6(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvByteToIPv6(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvByteToIPv6(expiredFlow.Owner_ip).String())
log().Infof("Conntrack cleanup Delete - %s", key)
ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0])))

if c.isEvenCycle {
// Even cycle: check if also missing in previous odd cycle
if _, existsInOdd := c.missingEntriesOddCycleV6[localConntrackEntry]; existsInOdd {
// Missing for 2 consecutive cycles - safe to delete
log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key)
ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0])))
} else {
// First miss - track in even cycle set
c.missingEntriesEvenCycleV6[localConntrackEntry] = true
}
} else {
// Odd cycle: check if also missing in previous even cycle
if _, existsInEven := c.missingEntriesEvenCycleV6[localConntrackEntry]; existsInEven {
// Missing for 2 consecutive cycles - safe to delete
log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key)
ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow)
c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0])))
} else {
// First miss - track in odd cycle set
c.missingEntriesOddCycleV6[localConntrackEntry] = true
}
}
}
}

// Cycle management: clear previous cycle's tracking and toggle
if c.isEvenCycle {
// Clear previous odd cycle entries
c.missingEntriesOddCycleV6 = make(map[utils.ConntrackKeyV6]bool)
} else {
// Clear previous even cycle entries
c.missingEntriesEvenCycleV6 = make(map[utils.ConntrackKeyV6]bool)
}
// Toggle cycle for next cleanup
c.isEvenCycle = !c.isEvenCycle

//Lets cleanup all entries in cache
c.localConntrackV6Cache = make(map[utils.ConntrackKeyV6]bool)
log().Info("Done cleanup of conntrack map")
Expand Down
Loading