diff --git a/pkg/ebpf/conntrack/conntrack_client.go b/pkg/ebpf/conntrack/conntrack_client.go index cd77d9ce..1c50ad88 100644 --- a/pkg/ebpf/conntrack/conntrack_client.go +++ b/pkg/ebpf/conntrack/conntrack_client.go @@ -34,13 +34,30 @@ type conntrackClient struct { hydratelocalConntrack bool localConntrackV4Cache map[utils.ConntrackKey]bool localConntrackV6Cache map[utils.ConntrackKeyV6]bool + + // Consecutive miss tracking for IPv4 - requires 2 consecutive cleanup cycles + // before deletion to avoid race condition during 5-tuple reuse + missingEntriesEvenCycleV4 map[utils.ConntrackKey]bool + missingEntriesOddCycleV4 map[utils.ConntrackKey]bool + + // Consecutive miss tracking for IPv6 + missingEntriesEvenCycleV6 map[utils.ConntrackKeyV6]bool + missingEntriesOddCycleV6 map[utils.ConntrackKeyV6]bool + + // Track current cycle parity (true = even, false = odd) + isEvenCycle bool } func NewConntrackClient(conntrackMap goebpfmaps.BpfMap, enableIPv6 bool) *conntrackClient { return &conntrackClient{ - conntrackMap: conntrackMap, - enableIPv6: enableIPv6, - hydratelocalConntrack: true, + conntrackMap: conntrackMap, + enableIPv6: enableIPv6, + hydratelocalConntrack: true, + missingEntriesEvenCycleV4: make(map[utils.ConntrackKey]bool), + missingEntriesOddCycleV4: make(map[utils.ConntrackKey]bool), + missingEntriesEvenCycleV6: make(map[utils.ConntrackKeyV6]bool), + missingEntriesOddCycleV6: make(map[utils.ConntrackKeyV6]bool), + isEvenCycle: true, // Start with even cycle } } @@ -169,6 +186,7 @@ func (c *conntrackClient) CleanupConntrackMap() { } } // Check if the local cache and kernel cache is in sync + // Consecutive miss approach: require entry to be missing for 2 consecutive cycles for localConntrackEntry, _ := range c.localConntrackV4Cache { newKey := utils.ConntrackKey{} newKey.Source_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Source_ip)) @@ -179,14 +197,45 @@ func (c *conntrackClient) CleanupConntrackMap() { newKey.Owner_ip = utils.ConvIPv4ToInt(utils.ConvIntToIPv4(localConntrackEntry.Owner_ip)) _, ok := kernelConntrackV4Cache[newKey] if !ok { - // Delete the entry in local cache since kernel entry is still missing so expired case + // Entry missing from kernel - apply consecutive miss logic expiredFlow := localConntrackEntry key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvIntToIPv4(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvIntToIPv4(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvIntToIPv4(expiredFlow.Owner_ip).String()) - log().Infof("Conntrack cleanup Delete - %s", key) - c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow))) - + + if c.isEvenCycle { + // Even cycle: check if also missing in previous odd cycle + if _, existsInOdd := c.missingEntriesOddCycleV4[newKey]; existsInOdd { + // Missing for 2 consecutive cycles - safe to delete + log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key) + c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow))) + } else { + // First miss - track in even cycle set + c.missingEntriesEvenCycleV4[newKey] = true + } + } else { + // Odd cycle: check if also missing in previous even cycle + if _, existsInEven := c.missingEntriesEvenCycleV4[newKey]; existsInEven { + // Missing for 2 consecutive cycles - safe to delete + log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key) + c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&expiredFlow))) + } else { + // First miss - track in odd cycle set + c.missingEntriesOddCycleV4[newKey] = true + } + } } } + + // Cycle management: clear previous cycle's tracking and toggle + if c.isEvenCycle { + // Clear previous odd cycle entries + c.missingEntriesOddCycleV4 = make(map[utils.ConntrackKey]bool) + } else { + // Clear previous even cycle entries + c.missingEntriesEvenCycleV4 = make(map[utils.ConntrackKey]bool) + } + // Toggle cycle for next cleanup + c.isEvenCycle = !c.isEvenCycle + //c.localConntrackV4Cache = make(map[utils.ConntrackKey]bool) log().Info("Done cleanup of conntrack map") c.hydratelocalConntrack = true @@ -326,17 +375,51 @@ func (c *conntrackClient) Cleanupv6ConntrackMap() { } // Check if the local cache and kernel cache is in sync + // Consecutive miss approach: require entry to be missing for 2 consecutive cycles for localConntrackEntry, _ := range c.localConntrackV6Cache { _, ok := kernelConntrackV6Cache[localConntrackEntry] if !ok { - // Delete the entry in local cache since kernel entry is still missing so expired case + // Entry missing from kernel - apply consecutive miss logic expiredFlow := localConntrackEntry key := fmt.Sprintf("Conntrack Key : Source IP - %s Source port - %d Dest IP - %s Dest port - %d Protocol - %d Owner IP - %s", utils.ConvByteToIPv6(expiredFlow.Source_ip).String(), expiredFlow.Source_port, utils.ConvByteToIPv6(expiredFlow.Dest_ip).String(), expiredFlow.Dest_port, expiredFlow.Protocol, utils.ConvByteToIPv6(expiredFlow.Owner_ip).String()) - log().Infof("Conntrack cleanup Delete - %s", key) - ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow) - c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0]))) + + if c.isEvenCycle { + // Even cycle: check if also missing in previous odd cycle + if _, existsInOdd := c.missingEntriesOddCycleV6[localConntrackEntry]; existsInOdd { + // Missing for 2 consecutive cycles - safe to delete + log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key) + ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow) + c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0]))) + } else { + // First miss - track in even cycle set + c.missingEntriesEvenCycleV6[localConntrackEntry] = true + } + } else { + // Odd cycle: check if also missing in previous even cycle + if _, existsInEven := c.missingEntriesEvenCycleV6[localConntrackEntry]; existsInEven { + // Missing for 2 consecutive cycles - safe to delete + log().Infof("Conntrack cleanup Delete (consecutive miss) - %s", key) + ceByteSlice := utils.ConvConntrackV6ToByte(expiredFlow) + c.conntrackMap.DeleteMapEntry(uintptr(unsafe.Pointer(&ceByteSlice[0]))) + } else { + // First miss - track in odd cycle set + c.missingEntriesOddCycleV6[localConntrackEntry] = true + } + } } } + + // Cycle management: clear previous cycle's tracking and toggle + if c.isEvenCycle { + // Clear previous odd cycle entries + c.missingEntriesOddCycleV6 = make(map[utils.ConntrackKeyV6]bool) + } else { + // Clear previous even cycle entries + c.missingEntriesEvenCycleV6 = make(map[utils.ConntrackKeyV6]bool) + } + // Toggle cycle for next cleanup + c.isEvenCycle = !c.isEvenCycle + //Lets cleanup all entries in cache c.localConntrackV6Cache = make(map[utils.ConntrackKeyV6]bool) log().Info("Done cleanup of conntrack map") diff --git a/validation-scripts/KERNEL_TIMEOUT_SETUP.md b/validation-scripts/KERNEL_TIMEOUT_SETUP.md new file mode 100644 index 00000000..a8d75064 --- /dev/null +++ b/validation-scripts/KERNEL_TIMEOUT_SETUP.md @@ -0,0 +1,285 @@ +# Kernel Conntrack Timeout Setup Guide + +This guide explains how to set up the kernel conntrack timeout for reproducing the race condition. + +## Why Kernel Timeout Matters + +The race condition **requires** entries to be: +1. ✅ Present in NPA local cache +2. ✅ **Missing from kernel conntrack** ← This is critical! +3. ✅ Cleanup detects the mismatch +4. ✅ New connection reuses the same 5-tuple during cleanup + +Without kernel expiry, entries remain in both kernel and cache, so cleanup won't try to delete them. + +## Setup Steps + +### Step 1: Deploy the Conntrack Tuner + +This DaemonSet runs on all nodes and sets the kernel timeout to 30 seconds. + +```bash +# Apply the tuner DaemonSet +kubectl apply -f k8s_conntrack_tuner.yaml + +# Wait for pods to be ready (one per node) +kubectl wait --for=condition=ready pod -l app=conntrack-tuner -n kube-system --timeout=60s + +# Verify it's running +kubectl get pods -n kube-system -l app=conntrack-tuner +``` + +Expected output: +``` +NAME READY STATUS RESTARTS AGE +conntrack-tuner-abc12 1/1 Running 0 10s +conntrack-tuner-def34 1/1 Running 0 10s +``` + +### Step 2: Verify the Timeout Was Set + +Check the logs to confirm: + +```bash +# Check logs from one of the tuner pods +kubectl logs -n kube-system -l app=conntrack-tuner --tail=20 +``` + +You should see: +``` +======================================== +Conntrack Timeout Tuner Starting +======================================== + +Current TCP established timeout: +180 +seconds + +Setting TCP established timeout to 30 seconds... +New timeout: 30 seconds + +✓ Successfully set conntrack timeout to 30 seconds + +======================================== +Configuration complete on ip-10-0-1-234 +======================================== + +Keeping pod running... (delete DaemonSet to stop) +``` + +### Step 3: Run Your Test + +Now the test will work correctly: + +```bash +# The kernel will expire connections after 30 seconds +# So the test can create the race condition + +python3 conntrack_race_reproducer.py +``` + +### Step 4: Clean Up After Testing + +**IMPORTANT:** Reset the kernel timeout when done! + +```bash +# Step 4a: Delete the tuner DaemonSet +kubectl delete -f k8s_conntrack_tuner.yaml + +# Step 4b: Apply the reset DaemonSet +kubectl apply -f k8s_conntrack_reset.yaml + +# Step 4c: Wait for reset to complete +kubectl wait --for=condition=complete job -l app=conntrack-reset -n kube-system --timeout=60s + +# Step 4d: Verify reset +kubectl logs -n kube-system -l app=conntrack-reset --tail=20 + +# Step 4e: Delete the reset DaemonSet +kubectl delete -f k8s_conntrack_reset.yaml +``` + +## Troubleshooting + +### Issue: Permission Denied + +**Symptom:** +``` +Error: pods is forbidden: User "..." cannot create resource "pods" +``` + +**Solution:** +You need cluster-admin or sufficient RBAC permissions to create privileged DaemonSets in kube-system. + +```bash +# Check your permissions +kubectl auth can-i create daemonsets -n kube-system +kubectl auth can-i create pods/exec -n kube-system + +# If "no", ask your cluster admin for permissions or use their account +``` + +### Issue: Pods Not Starting + +**Symptom:** +``` +NAME READY STATUS RESTARTS AGE +conntrack-tuner-abc12 0/1 ContainerCreating 0 30s +``` + +**Solution:** +Check pod events: + +```bash +kubectl describe pod -n kube-system -l app=conntrack-tuner +``` + +Common issues: +- Image pull failures (check network connectivity) +- SecurityContext denied (cluster policy restrictions) +- No nodes match toleration (check node taints) + +### Issue: Conntrack Module Not Loaded + +**Symptom in logs:** +``` +ERROR: Conntrack module not loaded +``` + +**Solution:** +This means netfilter/conntrack isn't loaded on your nodes. This is rare, but if it happens: + +1. Check if conntrack is actually needed in your cluster +2. The module might load automatically when needed +3. Or your kernel might not have netfilter compiled in (very rare) + +### Issue: Timeout Not Changing + +**Symptom in logs:** +``` +✗ Failed to set conntrack timeout +``` + +**Solution:** + +Check if the file is read-only or protected: + +```bash +# Exec into tuner pod +kubectl exec -it -n kube-system -- sh + +# Try manually +echo 30 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established +cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established +``` + +## Security Considerations + +### What This DaemonSet Does + +- ✅ Runs with `privileged: true` security context +- ✅ Uses `hostNetwork` and `hostPID` +- ✅ Modifies kernel parameters on all nodes +- ✅ Uses minimal resources (10m CPU, 16Mi memory) + +### Security Best Practices + +1. **Only run in test/dev clusters**: Don't run this in production +2. **Time-limited**: Delete the DaemonSet after testing +3. **Always reset**: Use the reset DaemonSet to restore defaults +4. **Audit**: This creates audit logs showing privileged access + +### Impact on Cluster + +**While the tuner is running:** +- All TCP connections will expire after 30 seconds of inactivity +- This affects **all pods and services** on affected nodes +- Long-running idle connections may drop +- Reconnection attempts should work normally + +**Why 30 seconds is safe for testing:** +- Most applications reconnect automatically +- Test pods are isolated in their own namespace +- Duration is short (only during active testing) +- Easy to reset back to normal + +## Advanced: Custom Timeout Values + +To use a different timeout value: + +```bash +# Edit k8s_conntrack_tuner.yaml +# Change line: echo 30 > /proc/sys/net/netfilter/... +# To: echo 60 > /proc/sys/net/netfilter/... + +# Then update test_config.yaml: +kernel_timeout: 60 +cleanup_period: 120 # Should be > kernel_timeout +``` + +## Verification Commands + +```bash +# Check if tuner is running +kubectl get ds conntrack-tuner -n kube-system + +# View logs from all tuner pods +kubectl logs -n kube-system -l app=conntrack-tuner --tail=10 --all-containers=true + +# Check how many nodes have the tuner +kubectl get pods -n kube-system -l app=conntrack-tuner -o wide + +# Verify timeout on a specific node +kubectl exec -it -n kube-system -- cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established +``` + +## Complete Test Workflow + +```bash +# 1. Deploy test resources +kubectl apply -f k8s_test_pods.yaml + +# 2. Set kernel timeout +kubectl apply -f k8s_conntrack_tuner.yaml +kubectl wait --for=condition=ready pod -l app=conntrack-tuner -n kube-system --timeout=60s + +# 3. Verify timeout is set +kubectl logs -n kube-system -l app=conntrack-tuner --tail=5 + +# 4. Run the test +python3 conntrack_race_reproducer.py + +# 5. Clean up test pods +kubectl delete -f k8s_test_pods.yaml + +# 6. Reset kernel timeout +kubectl delete -f k8s_conntrack_tuner.yaml +kubectl apply -f k8s_conntrack_reset.yaml +kubectl wait --for=condition=ready pod -l app=conntrack-reset -n kube-system --timeout=60s +kubectl delete -f k8s_conntrack_reset.yaml +``` + +## Quick Reference + +| File | Purpose | When to Use | +|------|---------|-------------| +| `k8s_conntrack_tuner.yaml` | Sets timeout to 30s | Before testing | +| `k8s_conntrack_reset.yaml` | Resets timeout to 180s | After testing | +| `test_config.yaml` | Configure test parameters | Adjust for your timing | + +## Questions? + +- **How long to keep tuner running?** Only during active testing (minutes to hours) +- **Does it affect production?** Yes! Only use in dev/test clusters +- **Can I use different timeout?** Yes, edit the YAML files +- **What if I forget to reset?** Cluster will function normally but connections expire faster +- **Is this reversible?** Yes, 100% reversible with reset DaemonSet + +## Summary + +1. ✅ Deploy `k8s_conntrack_tuner.yaml` → Sets 30s timeout +2. ✅ Run `conntrack_race_reproducer.py` → Tests the race +3. ✅ Deploy `k8s_conntrack_reset.yaml` → Resets to 180s +4. ✅ Clean up both DaemonSets → Back to normal + +The kernel timeout is **essential** for reproducing the race condition - this setup makes it easy and safe! diff --git a/validation-scripts/NPA_CLEANUP_ANALYSIS.md b/validation-scripts/NPA_CLEANUP_ANALYSIS.md new file mode 100644 index 00000000..ae29fcaf --- /dev/null +++ b/validation-scripts/NPA_CLEANUP_ANALYSIS.md @@ -0,0 +1,207 @@ +# AWS Network Policy Agent - Conntrack Cleanup Analysis + +## Source Code Investigation + +I analyzed the AWS Network Policy Agent source code to understand the actual cleanup logging patterns. + +### Repository +- **URL**: https://github.com/aws/aws-network-policy-agent +- **File**: `pkg/ebpf/conntrack/conntrack_client.go` +- **Functions**: `CleanupConntrackMap()` and `Cleanupv6ConntrackMap()` + +## Cleanup Process Flow + +### 1. Initial Check +```go +log().Info("Check for any stale entries in the conntrack map") +``` + +### 2. Two-Phase Approach + +#### Phase A: First Run (Hydration) +If `hydratelocalConntrack == true`: +- Reads all entries from eBPF map +- Populates local cache: `localConntrackV4Cache` +- Logs: `"hydrated local conntrack cache"` +- Sets `hydratelocalConntrack = false` + +#### Phase B: Subsequent Runs (Cleanup) +If `hydratelocalConntrack == false`: +- Reads kernel conntrack table via netlink +- Builds `kernelConntrackV4Cache` from kernel data +- Compares local cache vs kernel cache +- For entries in local but NOT in kernel: + - Logs: `"Conntrack cleanup Delete - {details}"` + - Deletes from eBPF map +- Logs: `"Done cleanup of conntrack map"` +- Sets `hydratelocalConntrack = true` for next cycle + +## The Race Condition Window + +``` +T=0: Cleanup starts: "Check for any stale entries..." +T=0-2s: Reading kernel conntrack via netlink (SNAPSHOT taken) +T=2-4s: Comparing local vs kernel cache + ↓ RACE WINDOW: New connections here get missed! +T=4s: Deleting stale entries from eBPF map +T=5s: "Done cleanup of conntrack map" +``` + +### Why the Race Occurs + +1. **Snapshot is static**: Taken at T=0-2s +2. **Processing takes time**: T=2-4s to compare caches +3. **New connection during processing**: + - Creates kernel entry at T=3s + - Creates eBPF entry at T=3s + - BUT: Snapshot from T=0-2s doesn't have it +4. **Deletion based on old snapshot**: + - At T=4s, entry found in local cache but NOT in snapshot + - Incorrectly identified as "stale" + - Deleted from eBPF map +5. **Response packet denied**: + - Response arrives at T=5s + - eBPF lookup fails (entry was deleted) + - Traffic denied! + +## Log Patterns for Detection + +### Primary Patterns (AWS NPA Specific) +```python +r"Check for any stale entries in the conntrack map" # Cleanup start +r"Done cleanup of conntrack map" # Cleanup end +r"Conntrack cleanup Delete" # Individual deletions +r"hydrated local conntrack cache" # Cache hydration +``` + +### Fallback Patterns (Generic) +```python +r"cleanup.*conntrack" +r"conntrack.*cleanup" +``` + +## Detection in the Script + +The updated script now looks for these exact AWS NPA log messages: + +```python +def detect_cleanup_cycle(self): + logs = self.kube.get_npa_logs( + self.config.npa_namespace, + self.config.npa_pod_label, + since="10m" + ) + + patterns = [ + r"Check for any stale entries in the conntrack map", + r"Done cleanup of conntrack map", + r"Conntrack cleanup Delete", + r"hydrated local conntrack cache", + # Fallbacks... + ] +``` + +## Cleanup Cycle Timing + +### Controlled By +The cleanup period is controlled by the NPA configuration flag: +``` +--conntrack-cache-cleanup-period=300 +``` +Default: 300 seconds (5 minutes) + +### Detection Strategy +1. **Parse logs** for cleanup start/end messages +2. **Calculate intervals** between consecutive cleanups +3. **Predict next cleanup** based on detected interval +4. **Fallback to config** if detection fails + +## Example Log Output + +``` +2024-12-04 20:00:00 [INFO] Check for any stale entries in the conntrack map +2024-12-04 20:00:01 [INFO] hydrated local conntrack cache +2024-12-04 20:00:01 [INFO] Done cleanup of conntrack map +... +2024-12-04 20:05:00 [INFO] Check for any stale entries in the conntrack map +2024-12-04 20:05:02 [INFO] Conntrack cleanup Delete - Conntrack Key : Source IP - 10.0.1.5 ... +2024-12-04 20:05:02 [INFO] Conntrack cleanup Delete - Conntrack Key : Source IP - 10.0.1.8 ... +2024-12-04 20:05:03 [INFO] Done cleanup of conntrack map +``` + +## Why Detection Might Fail + +### Common Reasons + +1. **No Recent Cleanup** + - Script checks last 10 minutes only + - If cleanup period is 300s, may need to wait + +2. **Log Level Configuration** + - NPA might be configured to not log INFO level + - Check with: `kubectl logs -n kube-system -l k8s-app=aws-node` + +3. **NPA Not Running** + - Verify: `kubectl get pods -n kube-system -l k8s-app=aws-node` + +4. **Different Label** + - Some deployments use different labels + - Check: `kubectl get pods -n kube-system | grep -E 'aws|node|network'` + +## Verification + +To verify the script can now detect cleanups: + +```bash +# Check if NPA logs contain cleanup messages +kubectl logs -n kube-system -l k8s-app=aws-node --tail=500 | grep -E "Check for any stale|Done cleanup|Conntrack cleanup" + +# Run the test +python3 conntrack_race_reproducer.py -v +``` + +If detection succeeds, you'll see: +``` +[INFO] Detecting NPA cleanup cycle timing... +[INFO] Detected cleanup interval: ~300s +``` + +If it fails, you'll see: +``` +[WARNING] Could not detect cleanup cycle from logs, will use configured period +``` +This is **normal and OK** - the script will use fallback timing estimation. + +## The Proposed Fix + +### Consecutive Miss Approach + +Instead of deleting on first miss, require 2 consecutive misses: + +```python +# Maintain two sets +missing_entries_even_cycle = set() +missing_entries_odd_cycle = set() +is_even_cycle = True + +# Every cleanup cycle +if is_even_cycle: + for entry in local_cache: + if entry not in kernel_cache: + if entry in missing_entries_odd_cycle: + delete_from_ebpf(entry) # Missing 2x → delete + else: + missing_entries_even_cycle.add(entry) # Track + missing_entries_odd_cycle.clear() + is_even_cycle = False +else: + # Similar for odd cycle... +``` + +This ensures that entries must be missing from kernel for **2 full cleanup cycles** before deletion, virtually eliminating the race. + +## References + +- AWS Network Policy Agent: https://github.com/aws/aws-network-policy-agent +- Conntrack Client: `pkg/ebpf/conntrack/conntrack_client.go` +- Cleanup Functions: `CleanupConntrackMap()` and `Cleanupv6ConntrackMap()` diff --git a/validation-scripts/QUICKSTART.md b/validation-scripts/QUICKSTART.md new file mode 100644 index 00000000..78bee5f6 --- /dev/null +++ b/validation-scripts/QUICKSTART.md @@ -0,0 +1,134 @@ +# Quick Start Guide + +## TL;DR - Get Running in 5 Minutes + +```bash +# 1. Run setup (validates prerequisites) +./setup.sh + +# 2. Deploy test pods +kubectl apply -f k8s_test_pods.yaml + +# 3. Set kernel timeout (REQUIRED for race reproduction) +kubectl apply -f k8s_conntrack_tuner.yaml +kubectl wait --for=condition=ready pod -l app=conntrack-tuner -n kube-system --timeout=60s + +# 4. Wait for pods ready +kubectl wait --for=condition=ready pod --all -n conntrack-test --timeout=60s + +# 5. Run the test +python3 conntrack_race_reproducer.py + +# 6. Clean up kernel timeout (IMPORTANT!) +kubectl delete -f k8s_conntrack_tuner.yaml +kubectl apply -f k8s_conntrack_reset.yaml +kubectl wait --for=condition=ready pod -l app=conntrack-reset -n kube-system --timeout=60s +kubectl delete -f k8s_conntrack_reset.yaml +``` + +## What This Does + +Reproduces a race condition where: +- Kernel expires a connection after 30 seconds (via tuner) +- NPA cleanup snapshots conntrack (missing the expired entry) +- **NEW connection reuses same 5-tuple during cleanup** +- Cleanup deletes eBPF entry for the NEW connection +- Response packets get denied → **RACE TRIGGERED** + +**⚠️ IMPORTANT:** The kernel timeout tuner is **required** - without it, entries won't expire and the race won't occur! + +## Expected Output + +### Success +``` +🔥 RACE CONDITION DETECTED! 3 connections denied +``` + +### No Race (Try More Iterations) +``` +python3 conntrack_race_reproducer.py -i 20 +``` + +## Configuration + +Edit `test_config.yaml` before running: + +```yaml +# Match your cluster +npa_namespace: kube-system +npa_pod_label: app=aws-node + +# For faster testing (requires node access) +kernel_timeout: 30 +cleanup_period: 60 +``` + +## Key Parameters + +| Flag | Purpose | Example | +|------|---------|---------| +| `-i` | More iterations | `python3 conntrack_race_reproducer.py -i 30` | +| `-c` | Custom config | `python3 conntrack_race_reproducer.py -c prod_config.yaml` | +| `-v` | Verbose output | `python3 conntrack_race_reproducer.py -v` | + +## Troubleshooting + +### No race detected? +```bash +# Try more iterations +python3 conntrack_race_reproducer.py -i 30 + +# Check NPA is running +kubectl get pods -n kube-system -l app=aws-node + +# Run multiple instances (increases probability) +for i in {1..3}; do python3 conntrack_race_reproducer.py -i 10 & done +wait +``` + +### Pods not starting? +```bash +kubectl describe pod test-client -n conntrack-test +kubectl describe pod test-server -n conntrack-test +``` + +### Permission errors? +```bash +kubectl auth can-i create pods -n conntrack-test +kubectl auth can-i exec pods -n conntrack-test +``` + +## Clean Up + +```bash +kubectl delete -f k8s_test_pods.yaml +``` + +## How It Works (5 Phases) + +1. **Initial Connections** → Populate tables +2. **Kernel Timeout** → Wait for expiry +3. **Cleanup Prediction** → Sync with NPA cycle +4. **Race Trigger** → Spray connections during cleanup +5. **Verification** → Check for denials + +## The Fix (Consecutive Miss Approach) + +Require 2 consecutive cleanup cycles to miss an entry before deletion: +- Even cycle: Check odd set → delete or add to even set +- Odd cycle: Check even set → delete or add to odd set + +**Result:** Race probability → ~0% + +## Files Reference + +- `conntrack_race_reproducer.py` - Main test script +- `test_config.yaml` - Configuration +- `k8s_test_pods.yaml` - Test pods +- `setup.sh` - Prerequisites checker +- `README.md` - Full documentation +- `QUICKSTART.md` - This file + +--- + +For detailed information, see [README.md](README.md) diff --git a/validation-scripts/README.md b/validation-scripts/README.md new file mode 100644 index 00000000..9e62a5e0 --- /dev/null +++ b/validation-scripts/README.md @@ -0,0 +1,398 @@ +# Conntrack Race Condition Reproducer + +This test suite reproduces a rare race condition in the Network Policy Agent's (NPA) conntrack cleanup logic that can lead to denial of legitimate traffic. + +## Problem Overview + +### The Race Condition + +The Network Policy Agent maintains a userspace cache of kernel conntrack entries and periodically cleans up stale entries from the eBPF map. A race condition occurs when: + +1. A connection exists in kernel conntrack, local cache, and eBPF map +2. Kernel expires the entry after timeout (~180s) +3. NPA cleanup cycle starts and snapshots the kernel conntrack table +4. **During cleanup processing**, a new connection reuses the same 5-tuple (source IP:port → dest IP:port + protocol) +5. The new connection creates a fresh kernel entry and eBPF map entry +6. Cleanup deletes the eBPF entry based on the pre-reuse snapshot +7. When response arrives, the reverse flow lookup fails → **traffic denied** + +### Impact + +- Rare but impactful in production clusters with high port reuse +- Legitimate traffic gets denied/dropped +- Hard to reproduce due to precise timing requirements +- Only manifests under specific conditions + +## Prerequisites + +### Required Tools + +- `kubectl` configured with cluster access +- Python 3.8 or later +- PyYAML library: `pip install pyyaml` + +### Cluster Requirements + +- Kubernetes cluster with Network Policy Agent deployed +- Permissions to: + - Create namespace and pods + - Execute commands in pods + - View logs from NPA pods +- At least 2 worker nodes (recommended) + +### Optional: Reduce Timeouts for Faster Testing + +To speed up reproduction, you can reduce kernel conntrack timeout (requires node access): + +```bash +# SSH into worker nodes or use a privileged DaemonSet +echo 30 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established +``` + +**Warning:** This affects all connections on the node. Reset after testing: +```bash +echo 180 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established +``` + +## Quick Start + +### 1. Deploy Test Resources + +```bash +# Create test namespace and pods +kubectl apply -f k8s_test_pods.yaml + +# Wait for pods to be ready +kubectl wait --for=condition=ready pod/test-client -n conntrack-test --timeout=60s +kubectl wait --for=condition=ready pod/test-server -n conntrack-test --timeout=60s + +# Verify deployment +kubectl get pods -n conntrack-test +``` + +Expected output: +``` +NAME READY STATUS RESTARTS AGE +test-client 1/1 Running 0 30s +test-server 1/1 Running 0 30s +``` + +### 2. Configure Test Parameters + +Edit `test_config.yaml` to match your environment: + +```yaml +# Adjust these based on your setup +namespace: conntrack-test +npa_namespace: kube-system # Where NPA pods run +npa_pod_label: app=aws-node # Label to find NPA pods + +# Timing (reduced for testing) +kernel_timeout: 30 # Match with node configuration +cleanup_period: 60 # Match with NPA --conntrack-cache-cleanup-period + +# Test intensity +iterations: 10 +connections_per_wave: 50 +``` + +### 3. Run the Test + +```bash +# Basic run +python3 conntrack_race_reproducer.py + +# With custom config +python3 conntrack_race_reproducer.py -c my_config.yaml + +# More iterations for better coverage +python3 conntrack_race_reproducer.py -i 20 + +# Verbose output +python3 conntrack_race_reproducer.py -v +``` + +### 4. Interpret Results + +**Success (Race Detected):** +``` +🔥 RACE CONDITION DETECTED! 3 connections denied + This indicates the cleanup deleted active connection entries + +TEST SUMMARY +============================================================ +Total attempts: 10 +Race conditions detected: 3 +Success rate: 30.0% + +✓ RACE CONDITION SUCCESSFULLY REPRODUCED +``` + +**No Race Detected:** +``` +✗ No race in attempt 10 + +TEST SUMMARY +============================================================ +Total attempts: 10 +Race conditions detected: 0 +Success rate: 0.0% + +⚠ Race condition not reproduced in any attempts +``` + +## How It Works + +### Test Phases + +Each test attempt goes through 5 phases: + +#### Phase 1: Initial Connections +- Creates `connections_per_wave` connections from client to server +- Populates kernel conntrack, local cache, and eBPF map +- Establishes baseline state + +#### Phase 2: Kernel Timeout +- Waits for `kernel_timeout` seconds +- Kernel expires the connection +- Entry remains in local cache (stale) +- Entry remains in eBPF map (stale) + +#### Phase 3: Cleanup Prediction +- Analyzes NPA logs to detect cleanup cycle timing +- Predicts when next cleanup will occur +- Synchronizes test execution + +#### Phase 4: Race Trigger +- Creates connections during the cleanup window +- Attempts to reuse 5-tuples from Phase 1 +- If timing is right, new connection created while cleanup processes old snapshot +- Cleanup deletes eBPF entry for active connection + +#### Phase 5: Verification +- Checks if connections were denied/refused +- Verifies race condition occurred +- Logs detailed results + +### Detection Methods + +The script detects the race through: + +1. **Connection Failures:** Monitors for refused/timeout errors +2. **Pattern Analysis:** Looks for denials of legitimate traffic +3. **Statistical Approach:** Multiple attempts increase detection probability + +### Why This Approach Works + +Despite using simplified tooling (kubectl + netcat), this approach successfully reproduces the race because: + +- The race window is large (1-2 seconds) +- Statistical spraying covers the timing window +- Port exhaustion forces 5-tuple reuse +- Multiple iterations compensate for timing imprecision + +## Troubleshooting + +### Pods Not Starting + +```bash +# Check pod status +kubectl describe pod test-client -n conntrack-test +kubectl describe pod test-server -n conntrack-test + +# Common issues: +# - Image pull failures: Check network connectivity +# - Resource constraints: Ensure cluster has capacity +``` + +### No Race Detected + +**Possible reasons:** + +1. **Timing misalignment:** NPA cleanup cycle prediction was off +2. **Insufficient iterations:** Increase with `-i 20` or higher +3. **Port reuse not occurring:** OS may be using different ports +4. **NPA not running:** Verify NPA pods are active + +**Solutions:** + +```bash +# Verify NPA is running +kubectl get pods -n kube-system -l app=aws-node + +# Check NPA logs for cleanup events +kubectl logs -n kube-system -l app=aws-node --tail=100 | grep -i cleanup + +# Increase test iterations +python3 conntrack_race_reproducer.py -i 30 + +# Run during high cluster load (more port reuse) +# Consider running multiple test instances simultaneously +``` + +### Permission Errors + +```bash +# If you get permission errors, check: +kubectl auth can-i create pods -n conntrack-test +kubectl auth can-i exec pods -n conntrack-test +kubectl auth can-i get pods -n kube-system + +# You may need cluster-admin or specific RBAC permissions +``` + +## Configuration Reference + +### test_config.yaml Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `namespace` | `conntrack-test` | Test namespace for client/server pods | +| `client_pod` | `test-client` | Name of client pod | +| `server_pod` | `test-server` | Name of server pod | +| `server_port` | `8080` | TCP port for connections | +| `npa_namespace` | `kube-system` | Namespace where NPA runs | +| `npa_pod_label` | `app=aws-node` | Label selector for NPA pods | +| `kernel_timeout` | `30` | Kernel conntrack timeout (seconds) | +| `cleanup_period` | `60` | NPA cleanup cycle period (seconds) | +| `iterations` | `10` | Number of test attempts | +| `connections_per_wave` | `50` | Connections per wave | + +### Environment Variables + +The script respects the following environment variables: + +```bash +# Override kubeconfig location +export KUBECONFIG=/path/to/kubeconfig + +# Increase kubectl timeout +export KUBECTL_TIMEOUT=60 +``` + +## Advanced Usage + +### Running Multiple Tests Simultaneously + +For higher probability of hitting the race: + +```bash +# Terminal 1 +python3 conntrack_race_reproducer.py -i 20 & + +# Terminal 2 +python3 conntrack_race_reproducer.py -i 20 & + +# Terminal 3 +python3 conntrack_race_reproducer.py -i 20 & + +# Wait for all to complete +wait +``` + +### Custom Timing Parameters + +For production environments (slower but more realistic): + +```yaml +# test_config_production.yaml +kernel_timeout: 180 +cleanup_period: 300 +iterations: 5 +connections_per_wave: 100 +``` + +```bash +python3 conntrack_race_reproducer.py -c test_config_production.yaml +``` + +### Analyzing NPA Logs + +To manually verify cleanup timing: + +```bash +# Stream NPA logs +kubectl logs -n kube-system -l app=aws-node -f | grep -i conntrack + +# Look for patterns like: +# - "cleanup started" +# - "removed X stale entries" +# - "conntrack cache cleanup" +``` + +## Proposed Solution + +The **Consecutive Miss Approach** solves this race condition: + +### Solution Overview + +Instead of immediate deletion, require entries to be missing from kernel conntrack for **2 consecutive cleanup cycles** before deletion. + +### Implementation + +Maintain 2 additional sets in userspace: +- `missing_entries_even_cycle_set` +- `missing_entries_odd_cycle_set` + +**Every even cleanup cycle:** +1. When stale entry found → Check if in `missing_entries_odd_cycle_set` +2. If yes → Delete from eBPF map +3. If no → Add to `missing_entries_even_cycle_set` +4. Reset `missing_entries_odd_cycle_set` + +**Every odd cleanup cycle:** +1. When stale entry found → Check if in `missing_entries_even_cycle_set` +2. If yes → Delete from eBPF map +3. If no → Add to `missing_entries_odd_cycle_set` +4. Reset `missing_entries_even_cycle_set` + +### Benefits + +- ✅ Userspace-only changes (safe) +- ✅ Reduces race probability to ~0% +- ✅ Simple to implement and understand +- ✅ No performance impact + +### Trade-offs + +- Stale entries remain for up to `2 × conntrack-cache-cleanup-period` +- Slightly increased memory usage for tracking sets + +## Cleanup + +When testing is complete: + +```bash +# Delete test resources +kubectl delete -f k8s_test_pods.yaml + +# Or just delete the namespace +kubectl delete namespace conntrack-test + +# Reset kernel timeout (if modified) +# SSH to nodes and run: +echo 180 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established +``` + +## Files in This Repository + +- `conntrack_race_reproducer.py` - Main test script +- `test_config.yaml` - Configuration file +- `k8s_test_pods.yaml` - Kubernetes test resources +- `README.md` - This file + +## Support and Feedback + +For issues or questions: +1. Check the Troubleshooting section above +2. Review NPA logs for cleanup activity +3. Verify test pod connectivity +4. Try increasing iterations and timeout values + +## License + +This test suite is provided as-is for Network Policy Agent testing purposes. + +--- + +**Note:** This reproducer is designed for testing and validation only. Do not run in production clusters without understanding the impact of modified timeout values and test traffic. diff --git a/validation-scripts/TROUBLESHOOTING.md b/validation-scripts/TROUBLESHOOTING.md new file mode 100644 index 00000000..b48925a8 --- /dev/null +++ b/validation-scripts/TROUBLESHOOTING.md @@ -0,0 +1,288 @@ +# Troubleshooting Guide + +## Server Connection Refused Error + +If you see errors like: +``` +nc: connect to 192.168.72.235 port 8080 (tcp) failed: Connection refused +``` + +### Quick Fix + +The server pod needs time to start listening. Try these steps: + +#### 1. Check if pods are running +```bash +kubectl get pods -n conntrack-test +``` + +Expected output: +``` +NAME READY STATUS RESTARTS AGE +test-client 1/1 Running 0 2m +test-server 1/1 Running 0 2m +``` + +#### 2. Check server logs +```bash +kubectl logs test-server -n conntrack-test +``` + +You should see: +``` +Starting TCP server on port 8080... +``` + +#### 3. Manually test connectivity +```bash +# Get server IP +SERVER_IP=$(kubectl get pod test-server -n conntrack-test -o jsonpath='{.status.podIP}') +echo "Server IP: $SERVER_IP" + +# Test from client pod +kubectl exec -n conntrack-test test-client -- nc -zv $SERVER_IP 8080 +``` + +#### 4. If server isn't listening, restart it +```bash +# Delete and recreate pods +kubectl delete pod test-server -n conntrack-test +kubectl wait --for=condition=ready pod/test-server -n conntrack-test --timeout=60s + +# Wait a few seconds for server to start listening +sleep 5 + +# Test again +python3 conntrack_race_reproducer.py +``` + +### Alternative: Use a Different Server Image + +If netcat issues persist, you can modify `k8s_test_pods.yaml` to use Python's built-in HTTP server: + +```yaml + - name: server + image: python:3.9-slim + command: + - /bin/sh + - -c + - | + cat > /tmp/server.py << 'EOF' + import socket + import time + + HOST = '0.0.0.0' + PORT = 8080 + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind((HOST, PORT)) + s.listen() + print(f'Server listening on {HOST}:{PORT}') + + while True: + conn, addr = s.accept() + with conn: + print(f'Connected by {addr}') + data = conn.recv(1024) + if data: + conn.sendall(data) + EOF + + python3 /tmp/server.py +``` + +Then redeploy: +```bash +kubectl delete -f k8s_test_pods.yaml +kubectl apply -f k8s_test_pods.yaml +kubectl wait --for=condition=ready pod --all -n conntrack-test --timeout=60s +sleep 5 # Give server time to start +python3 conntrack_race_reproducer.py +``` + +## Common Issues + +### Issue: Pods not starting + +**Check events:** +```bash +kubectl describe pod test-server -n conntrack-test +kubectl describe pod test-client -n conntrack-test +``` + +**Common causes:** +- Image pull failures (check network/registry access) +- Resource constraints (check cluster capacity) +- ImagePullBackOff (check image availability) + +**Solution:** +```bash +# Try using a different base image +# Edit k8s_test_pods.yaml and change: +# image: nicolaka/netshoot:latest +# to: +# image: busybox:latest +``` + +### Issue: Server listening but connections fail + +**Check if server is actually listening:** +```bash +kubectl exec -n conntrack-test test-server -- netstat -tlnp 2>/dev/null || \ +kubectl exec -n conntrack-test test-server -- ss -tlnp +``` + +**Check iptables/network policies:** +```bash +# List network policies +kubectl get networkpolicies -n conntrack-test + +# If policies are blocking, delete them temporarily +kubectl delete networkpolicy --all -n conntrack-test +``` + +### Issue: Permission errors + +**Check RBAC permissions:** +```bash +kubectl auth can-i create pods -n conntrack-test +kubectl auth can-i exec pods -n conntrack-test +kubectl auth can-i get pods -n kube-system +``` + +**If permissions denied, ask cluster admin for:** +- Pod creation in conntrack-test namespace +- Pod exec in conntrack-test namespace +- View logs from kube-system namespace + +### Issue: NPA not found + +**Check NPA deployment:** +```bash +# List all pods in kube-system +kubectl get pods -n kube-system + +# Look for NPA pods (common names: aws-node, calico, cilium) +kubectl get pods -n kube-system | grep -E 'aws-node|calico|cilium' +``` + +**Update config if NPA has different label:** +```yaml +# In test_config.yaml +npa_namespace: kube-system +npa_pod_label: app=your-npa-label # Change this +``` + +### Issue: Race never detected + +**This is actually somewhat normal!** The race is timing-sensitive. + +**Try these:** + +1. **More iterations:** + ```bash + python3 conntrack_race_reproducer.py -i 30 + ``` + +2. **Run multiple instances:** + ```bash + for i in {1..5}; do + python3 conntrack_race_reproducer.py -i 10 & + done + wait + ``` + +3. **Adjust timing in config:** + ```yaml + # Make timing match your environment + kernel_timeout: 30 # Actual kernel timeout + cleanup_period: 60 # Actual NPA cleanup period + connections_per_wave: 100 # More connections + ``` + +4. **Run during high load** (more port reuse) + +5. **Reduce kernel timeout on nodes** (faster testing): + ```bash + # SSH to nodes or use privileged DaemonSet + echo 30 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established + ``` + +## Debug Mode + +Run with verbose logging: +```bash +python3 conntrack_race_reproducer.py -v +``` + +This shows detailed command output and timing information. + +## Manual Test + +Test connectivity manually: +```bash +# Terminal 1: Watch server logs +kubectl logs -f test-server -n conntrack-test + +# Terminal 2: Create connections +SERVER_IP=$(kubectl get pod test-server -n conntrack-test -o jsonpath='{.status.podIP}') + +for i in {1..10}; do + echo "Connection $i" | kubectl exec -n conntrack-test test-client -- nc -v $SERVER_IP 8080 + sleep 0.5 +done +``` + +## Clean Start + +If all else fails, start fresh: +```bash +# Delete everything +kubectl delete namespace conntrack-test +kubectl delete -f k8s_test_pods.yaml + +# Wait for cleanup +sleep 10 + +# Redeploy +kubectl apply -f k8s_test_pods.yaml + +# Wait for ready +kubectl wait --for=condition=ready pod --all -n conntrack-test --timeout=120s + +# Give extra time for server +sleep 10 + +# Verify manually +SERVER_IP=$(kubectl get pod test-server -n conntrack-test -o jsonpath='{.status.podIP}') +kubectl exec -n conntrack-test test-client -- nc -zv $SERVER_IP 8080 + +# If successful, run test +python3 conntrack_race_reproducer.py +``` + +## Getting Help + +If issues persist, gather this information: + +```bash +# Environment info +kubectl version +kubectl get nodes +kubectl get pods -n conntrack-test -o wide + +# Pod details +kubectl describe pod test-server -n conntrack-test +kubectl describe pod test-client -n conntrack-test + +# Pod logs +kubectl logs test-server -n conntrack-test +kubectl logs test-client -n conntrack-test + +# Network info +kubectl get svc -n conntrack-test +kubectl get networkpolicies -n conntrack-test +``` + +Share these details when asking for help. diff --git a/validation-scripts/conntrack_race_reproducer.py b/validation-scripts/conntrack_race_reproducer.py new file mode 100755 index 00000000..cfece925 --- /dev/null +++ b/validation-scripts/conntrack_race_reproducer.py @@ -0,0 +1,1165 @@ +#!/usr/bin/env python3 +""" +Conntrack Race Condition Reproducer + +This script reproduces the race condition in Network Policy Agent's conntrack +cleanup logic where 5-tuple reuse during cleanup causes legitimate traffic denial. + +Race Sequence: +1. Connection established → kernel conntrack + eBPF map entry +2. Kernel expires entry (~180s) → stale in local cache +3. Cleanup cycle starts → snapshot kernel (entry missing) +4. NEW connection with same 5-tuple → creates kernel entry +5. Cleanup deletes eBPF entry (based on old snapshot) +6. Response arrives → lookup fails → RACE TRIGGERED + +Author: Network Policy Team +""" + +import argparse +import json +import logging +import math +import subprocess +import sys +import time +import yaml +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Tuple +import re + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + + +@dataclass +class TestConfig: + """Configuration for the race condition test""" + namespace: str = "conntrack-test" + client_pod: str = "test-client" + server_pod: str = "test-server" + server_port: int = 8080 + kernel_timeout: int = 30 # Reduced for testing (normally 180s) + cleanup_period: int = 60 # Reduced for testing (normally 300s) + iterations: int = 10 + connections_per_wave: int = 50 + npa_pod_label: str = "k8s-app=aws-node" + npa_namespace: str = "kube-system" + + @classmethod + def from_yaml(cls, filepath: str) -> 'TestConfig': + """Load configuration from YAML file""" + try: + with open(filepath, 'r') as f: + data = yaml.safe_load(f) + return cls(**data) + except FileNotFoundError: + logger.warning(f"Config file {filepath} not found, using defaults") + return cls() + + +class KubeHelper: + """Helper class for Kubernetes operations""" + + @staticmethod + def run_command(cmd: List[str], check=True, capture_output=True) -> subprocess.CompletedProcess: + """Run a shell command and return result""" + try: + result = subprocess.run( + cmd, + check=check, + capture_output=capture_output, + text=True, + timeout=30 + ) + return result + except subprocess.CalledProcessError as e: + logger.error(f"Command failed: {' '.join(cmd)}") + logger.error(f"Error: {e.stderr}") + raise + except subprocess.TimeoutExpired: + logger.error(f"Command timed out: {' '.join(cmd)}") + raise + + @staticmethod + def pod_exists(namespace: str, pod_name: str) -> bool: + """Check if a pod exists""" + cmd = ['kubectl', 'get', 'pod', pod_name, '-n', namespace, '--no-headers'] + result = KubeHelper.run_command(cmd, check=False) + return result.returncode == 0 + + @staticmethod + def pod_ready(namespace: str, pod_name: str) -> bool: + """Check if a pod is ready""" + cmd = ['kubectl', 'get', 'pod', pod_name, '-n', namespace, + '-o', 'jsonpath={.status.conditions[?(@.type=="Ready")].status}'] + result = KubeHelper.run_command(cmd, check=False) + return result.stdout.strip() == "True" + + @staticmethod + def exec_in_pod(namespace: str, pod_name: str, command: List[str]) -> subprocess.CompletedProcess: + """Execute command in a pod""" + cmd = ['kubectl', 'exec', '-n', namespace, pod_name, '--'] + command + return KubeHelper.run_command(cmd) + + @staticmethod + def get_pod_ip(namespace: str, pod_name: str) -> Optional[str]: + """Get pod IP address""" + cmd = ['kubectl', 'get', 'pod', pod_name, '-n', namespace, + '-o', 'jsonpath={.status.podIP}'] + result = KubeHelper.run_command(cmd, check=False) + return result.stdout.strip() if result.returncode == 0 else None + + @staticmethod + def get_npa_logs(namespace: str, label: str, since: str = "5m") -> str: + """Get NPA logs""" + cmd = ['kubectl', 'logs', '-n', namespace, '-l', label, + '--tail=1000', f'--since={since}'] + result = KubeHelper.run_command(cmd, check=False) + return result.stdout if result.returncode == 0 else "" + + @staticmethod + def get_pod_node(namespace: str, pod_name: str) -> Optional[str]: + """Get the node where a pod is running""" + cmd = ['kubectl', 'get', 'pod', pod_name, '-n', namespace, + '-o', 'jsonpath={.spec.nodeName}'] + result = KubeHelper.run_command(cmd, check=False) + return result.stdout.strip() if result.returncode == 0 else None + + @staticmethod + def get_tuner_pod_on_node(node_name: str) -> Optional[str]: + """Get the conntrack-tuner pod running on a specific node""" + cmd = ['kubectl', 'get', 'pods', '-n', 'kube-system', + '-l', 'app=conntrack-tuner', + '-o', 'json'] + result = KubeHelper.run_command(cmd, check=False) + if result.returncode != 0: + return None + + try: + pods_data = json.loads(result.stdout) + for pod in pods_data.get('items', []): + if pod.get('spec', {}).get('nodeName') == node_name: + return pod.get('metadata', {}).get('name') + except: + pass + return None + + +class ConntrackRaceReproducer: + """Main class for reproducing conntrack race condition""" + + # Fixed port range for deterministic 5-tuple collision + FIXED_PORT_START = 50000 + FIXED_PORT_COUNT = 20 # Ports 50000-50019 + + def __init__(self, config: TestConfig): + self.config = config + self.kube = KubeHelper() + self.race_detected = False + self.last_cleanup_time: Optional[datetime] = None + self.learned_cleanup_timing: Optional[int] = None # Seconds after kernel timeout + self.last_spray_absolute_time: Optional[float] = None # Absolute wall-clock time of last spray + self.server_node: Optional[str] = None + self.client_node: Optional[str] = None + self.tuner_pod: Optional[str] = None + self.client_tuner_pod: Optional[str] = None # Tuner pod on client's node + self.fixed_ports = list(range(self.FIXED_PORT_START, self.FIXED_PORT_START + self.FIXED_PORT_COUNT)) + + def validate_environment(self) -> bool: + """Validate test environment prerequisites""" + logger.info("=== Validating Environment ===") + + # Check kubectl + try: + self.kube.run_command(['kubectl', 'version', '--client']) + logger.info("✓ kubectl is available") + except Exception as e: + logger.error("✗ kubectl not found or not working") + return False + + # Check namespace + if not self.kube.pod_exists(self.config.namespace, self.config.client_pod): + logger.error(f"✗ Client pod {self.config.client_pod} not found in namespace {self.config.namespace}") + logger.error(" Run: kubectl apply -f k8s_test_pods.yaml") + return False + logger.info(f"✓ Client pod exists") + + if not self.kube.pod_exists(self.config.namespace, self.config.server_pod): + logger.error(f"✗ Server pod {self.config.server_pod} not found in namespace {self.config.namespace}") + return False + logger.info(f"✓ Server pod exists") + + # Check pod readiness + if not self.kube.pod_ready(self.config.namespace, self.config.client_pod): + logger.error(f"✗ Client pod not ready") + return False + logger.info(f"✓ Client pod is ready") + + if not self.kube.pod_ready(self.config.namespace, self.config.server_pod): + logger.error(f"✗ Server pod not ready") + return False + logger.info(f"✓ Server pod is ready") + + # Get pod IPs + server_ip = self.kube.get_pod_ip(self.config.namespace, self.config.server_pod) + if not server_ip: + logger.error("✗ Could not get server pod IP") + return False + logger.info(f"✓ Server IP: {server_ip}") + + # Find client node and tuner pod for conntrack checks + self.client_node = self.kube.get_pod_node(self.config.namespace, self.config.client_pod) + if self.client_node: + logger.info(f"✓ Client node: {self.client_node}") + self.client_tuner_pod = self.kube.get_tuner_pod_on_node(self.client_node) + if self.client_tuner_pod: + logger.info(f"✓ Client tuner pod found: {self.client_tuner_pod}") + else: + logger.warning("⚠ Client tuner pod not found - kernel conntrack checks will be limited") + + # Find server node and tuner pod for eBPF polling + self.server_node = self.kube.get_pod_node(self.config.namespace, self.config.server_pod) + if self.server_node: + logger.info(f"✓ Server node: {self.server_node}") + self.tuner_pod = self.kube.get_tuner_pod_on_node(self.server_node) + if self.tuner_pod: + logger.info(f"✓ Server tuner pod found: {self.tuner_pod}") + else: + logger.warning("⚠ Server tuner pod not found - eBPF polling will be limited") + + # Test connectivity + logger.info("Testing server connectivity...") + time.sleep(2) # Give server time to start listening + success, output = self.create_connection(server_ip, self.config.server_port, timeout=5) + if not success: + logger.error(f"✗ Cannot connect to server at {server_ip}:{self.config.server_port}") + logger.error(f" Error: {output}") + logger.error(" The server pod may not be listening yet.") + logger.error(" Try: kubectl logs test-server -n conntrack-test") + logger.error(" Or wait a few seconds and try again") + return False + logger.info(f"✓ Server is accepting connections") + + logger.info("=== Environment validation passed ===\n") + return True + + def detect_cleanup_cycle(self) -> Optional[datetime]: + """Detect NPA cleanup cycle timing from logs""" + logger.info("Detecting NPA cleanup cycle timing...") + + logs = self.kube.get_npa_logs( + self.config.npa_namespace, + self.config.npa_pod_label, + since="10m" + ) + + # Look for cleanup-related log patterns (actual AWS NPA messages) + patterns = [ + r"Check for any stale entries in the conntrack map", # Cleanup start + r"Done cleanup of conntrack map", # Cleanup end + r"Conntrack cleanup Delete", # Individual deletions + r"hydrated local conntrack cache", # Cache hydration + # Fallback patterns for other implementations + r"cleanup.*conntrack", + r"conntrack.*cleanup", + ] + + cleanup_times = [] + for line in logs.split('\n'): + for pattern in patterns: + if re.search(pattern, line, re.IGNORECASE): + # Try to extract timestamp + timestamp_match = re.search(r'\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}', line) + if timestamp_match: + try: + ts = datetime.fromisoformat(timestamp_match.group().replace('T', ' ')) + cleanup_times.append(ts) + except: + pass + + if len(cleanup_times) >= 2: + # Calculate average interval + intervals = [(cleanup_times[i+1] - cleanup_times[i]).total_seconds() + for i in range(len(cleanup_times)-1)] + avg_interval = sum(intervals) / len(intervals) + logger.info(f"Detected cleanup interval: ~{avg_interval:.0f}s") + self.last_cleanup_time = cleanup_times[-1] + return cleanup_times[-1] + + logger.warning("Could not detect cleanup cycle from logs, will use configured period") + return None + + def predict_next_cleanup(self) -> datetime: + """Predict when the next cleanup cycle will occur""" + if self.last_cleanup_time: + next_cleanup = self.last_cleanup_time + timedelta(seconds=self.config.cleanup_period) + else: + # Estimate based on current time + next_cleanup = datetime.now() + timedelta(seconds=self.config.cleanup_period) + + logger.info(f"Next cleanup predicted at: {next_cleanup.strftime('%H:%M:%S')}") + return next_cleanup + + def create_connection(self, target_ip: str, target_port: int, timeout: int = 5) -> Tuple[bool, str]: + """Create a TCP connection from client to server (random source port)""" + try: + # Use nc (netcat) to create connection + # Note: We use -w flag instead of timeout command to avoid exit code issues + cmd = ['nc', '-v', '-w', str(timeout), target_ip, str(target_port)] + result = self.kube.exec_in_pod( + self.config.namespace, + self.config.client_pod, + cmd + ) + output = result.stdout + result.stderr + # Check if connection succeeded (even if nc returns non-zero) + if 'succeeded' in output.lower() or 'connected' in output.lower(): + return True, output + return True, output + except subprocess.CalledProcessError as e: + # Even on error, check if connection actually succeeded + output = e.stdout + e.stderr if hasattr(e, 'stdout') else str(e) + if 'succeeded' in output.lower() or 'connected' in output.lower(): + return True, output + return False, output + except Exception as e: + return False, str(e) + + def create_connection_with_port(self, target_ip: str, target_port: int, source_port: int, timeout: int = 2) -> Tuple[bool, str]: + """Create a TCP connection bound to a specific source port""" + try: + # Python one-liner to create connection with specific source port + # Enhanced error reporting to distinguish failure types + python_cmd = f"""python3 -c " +import socket +import sys +import errno +try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout({timeout}) + sock.bind(('0.0.0.0', {source_port})) + sock.connect(('{target_ip}', {target_port})) + print('Connected from port {source_port}') + sock.close() + sys.exit(0) +except socket.timeout: + print('TIMEOUT: Connection timed out') + sys.exit(1) +except ConnectionRefusedError: + print('REFUSED: Connection refused by server') + sys.exit(1) +except OSError as e: + if e.errno == errno.EADDRINUSE: + print('BIND_ERROR: Address already in use') + elif e.errno == errno.EADDRNOTAVAIL: + print('BIND_ERROR: Cannot assign requested address') + else: + print(f'OS_ERROR: {{e}}') + sys.exit(1) +except Exception as e: + print(f'ERROR: {{type(e).__name__}}: {{e}}') + sys.exit(1) +" """ + + result = self.kube.exec_in_pod( + self.config.namespace, + self.config.client_pod, + ['sh', '-c', python_cmd] + ) + output = result.stdout + result.stderr + + if 'Connected' in output: + return True, output + return False, output + + except subprocess.CalledProcessError as e: + output = e.stdout + e.stderr if hasattr(e, 'stdout') else str(e) + return False, output + except Exception as e: + return False, str(e) + + def create_connection_wave(self, target_ip: str, count: int) -> Dict[str, int]: + """Create multiple connections in quick succession (random ports)""" + results = {'success': 0, 'failed': 0, 'timeout': 0} + + logger.info(f"Creating {count} connections...") + for i in range(count): + success, output = self.create_connection(target_ip, self.config.server_port, timeout=2) + + if success: + results['success'] += 1 + elif 'timeout' in output.lower(): + results['timeout'] += 1 + else: + results['failed'] += 1 + + # Brief pause to avoid overwhelming (minimal throttling) + if i % 1000 == 0 and i > 0: + time.sleep(0.01) + # Progress indicator every 500 connections + if i % 1000 == 0: + logger.info(f" Created {i}/{count} connections...") + + logger.info(f"Wave complete: {results['success']} success, {results['failed']} failed, {results['timeout']} timeout") + return results + + def create_connection_wave_fixed_ports(self, target_ip: str, ports: List[int]) -> Dict[str, int]: + """Create connections using specific source ports for deterministic 5-tuple collision""" + results = {'success': 0, 'failed': 0, 'timeout': 0} + + logger.info(f"Creating {len(ports)} connections with fixed ports {ports[0]}-{ports[-1]} (4 parallel threads)...") + + def create_single_connection(port): + """Helper function for parallel execution""" + success, output = self.create_connection_with_port(target_ip, self.config.server_port, port, timeout=2) + return port, success, output + + # Use ThreadPoolExecutor with 4 parallel threads + max_workers = 4 + completed = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all connection tasks + futures = {executor.submit(create_single_connection, port): port for port in ports} + + # Process results as they complete + for future in as_completed(futures): + port, success, output = future.result() + + if success: + results['success'] += 1 + elif 'timeout' in output.lower(): + results['timeout'] += 1 + else: + results['failed'] += 1 + logger.debug(f" Port {port} failed: {output[:100]}") + + completed += 1 + # Progress indicator every 20 connections + if completed % 20 == 0: + logger.info(f" Created {completed}/{len(ports)} connections...") + + logger.info(f"Wave complete: {results['success']} success, {results['failed']} failed, {results['timeout']} timeout") + return results + + def poll_ebpf_for_cleanup(self, server_ip: str, poll_duration: int = 120) -> Optional[int]: + """ + Poll eBPF conntrack map to detect when cleanup occurs. + Returns: seconds elapsed when cleanup detected, or None if not detected + """ + if not self.tuner_pod: + logger.warning("Cannot poll eBPF - tuner pod not available") + return None + + logger.info(f"[eBPF Polling] Monitoring conntrack map for {poll_duration}s...") + logger.info(f"[eBPF Polling] Tracking total entry count in map") + + start_time = time.time() + last_entry_count = -1 + poll_interval = 2 # Check every 2 seconds + + try: + while (time.time() - start_time) < poll_duration: + elapsed = int(time.time() - start_time) + + # Query eBPF map via tuner pod (pinned path) + cmd = ['bpftool', 'map', 'dump', 'pinned', '/sys/fs/bpf/globals/aws/maps/global_aws_conntrack_map'] + try: + result = self.kube.exec_in_pod('kube-system', self.tuner_pod, cmd) + output = result.stdout + + # Count total entries in map (each entry has a "key:" line) + entry_count = output.count('key:') + + # Detect significant drop (cleanup occurred) + # Require at least 10 entries initially to avoid false positives + # Detect cleanup if entry count drops by 10% or more + if last_entry_count >= 10 and entry_count < (last_entry_count * 0.9): + logger.info(f"[eBPF Polling] ✓ CLEANUP DETECTED at T+{elapsed}s!") + logger.info(f"[eBPF Polling] Total entry count dropped: {last_entry_count} → {entry_count}") + return elapsed + + if entry_count != last_entry_count: + logger.info(f"[eBPF Polling] T+{elapsed}s: {entry_count} total entries") + last_entry_count = entry_count + + except subprocess.CalledProcessError as e: + logger.debug(f"[eBPF Polling] Query failed (may be normal): {e}") + + time.sleep(poll_interval) + + logger.warning(f"[eBPF Polling] No cleanup detected in {poll_duration}s window") + return None + + except Exception as e: + logger.error(f"[eBPF Polling] Error: {e}") + return None + + def ip_to_hex_be(self, ip_str: str) -> str: + """Convert IP address to big-endian hex representation (network byte order)""" + parts = ip_str.split('.') + # Big-endian: normal byte order (no reversal) + return ' '.join([f'{int(p):02x}' for p in parts]) + + def port_to_hex_le(self, port: int) -> str: + """Convert port to little-endian hex representation""" + # Port is uint16, little-endian + return f'{port & 0xff:02x} {(port >> 8) & 0xff:02x}' + + def check_kernel_conntrack(self, source_ip: str, source_port: int, + dest_ip: str, dest_port: int, + protocol: str = "tcp") -> bool: + """ + Check if a 5-tuple exists in kernel conntrack table. + Returns: True if found, False otherwise + + Reads /proc/net/nf_conntrack directly instead of using conntrack command. + This avoids needing the conntrack binary and works with just file system access. + """ + if not self.client_tuner_pod: + logger.debug("No client tuner pod available - cannot check kernel conntrack") + return False + + try: + # Read /proc/net/nf_conntrack directly + # Format: ipv4 2 tcp 6 117 ESTABLISHED src=X.X.X.X dst=Y.Y.Y.Y sport=NNNN dport=MMMM ... + cmd = ['cat', '/proc/net/nf_conntrack'] + + result = self.kube.exec_in_pod( + 'kube-system', + self.client_tuner_pod, + cmd + ) + + output = result.stdout + + # Search for the specific 5-tuple + # Look for lines containing our source IP, dest IP, source port, and dest port + # The format can have src/dst in either order depending on connection direction + for line in output.split('\n'): + if protocol in line.lower(): + # Check if this line has our 5-tuple (forward or reverse direction) + has_src_ip = f'src={source_ip}' in line + has_dst_ip = f'dst={dest_ip}' in line + has_sport = f'sport={source_port}' in line + has_dport = f'dport={dest_port}' in line + + if has_src_ip and has_dst_ip and has_sport and has_dport: + return True + + return False + + except subprocess.CalledProcessError: + # Command failed or file not accessible + return False + except Exception as e: + logger.debug(f"Error checking kernel conntrack: {e}") + return False + + def check_ebpf_map(self, source_ip: str, source_port: int, + dest_ip: str, dest_port: int, + owner_ip: str, protocol: str = "tcp") -> bool: + """ + Check if a 5-tuple exists in eBPF conntrack map. + Returns: True if found, False otherwise + """ + if not self.tuner_pod: + logger.debug("No tuner pod available - cannot check eBPF map") + return False + + try: + # Query eBPF map + cmd = ['bpftool', 'map', 'dump', 'pinned', + '/sys/fs/bpf/globals/aws/maps/global_aws_conntrack_map'] + + result = self.kube.exec_in_pod('kube-system', self.tuner_pod, cmd) + output = result.stdout + + # Build expected hex pattern for the 5-tuple + # Actual format: source_ip(4) source_port(2) padding(2) dest_ip(4) dest_port(2) protocol(1) padding(1) owner_ip(4) + proto_byte = "06" if protocol == "tcp" else "11" + padding_2 = "00 00" # 2-byte padding after source port + padding_1 = "00" # 1-byte padding after protocol + + # Convert IPs to big-endian (network byte order), ports to little-endian + src_ip_hex = self.ip_to_hex_be(source_ip) + dst_ip_hex = self.ip_to_hex_be(dest_ip) + owner_ip_hex = self.ip_to_hex_be(owner_ip) + src_port_hex = self.port_to_hex_le(source_port) + dst_port_hex = self.port_to_hex_le(dest_port) + + # Build pattern with padding bytes + pattern = f"{src_ip_hex} {src_port_hex} {padding_2} {dst_ip_hex} {dst_port_hex} {proto_byte} {padding_1} {owner_ip_hex}" + pattern_normalized = pattern.replace(' ', '').lower() + + # Normalize output for comparison (remove spaces, lowercase) + output_normalized = output.replace(' ', '').replace('\n', '').lower() + + # Search for pattern + if pattern_normalized in output_normalized: + return True + + return False + + except subprocess.CalledProcessError: + return False + except Exception as e: + logger.debug(f"Error checking eBPF map: {e}") + return False + + def check_5tuple_in_conntrack(self, source_ip: str, source_port: int, + dest_ip: str, dest_port: int, + protocol: str = "tcp") -> Tuple[bool, bool]: + """ + Check if a 5-tuple exists in kernel conntrack AND eBPF map. + Returns: (in_kernel, in_ebpf) + """ + # Check kernel conntrack + in_kernel = self.check_kernel_conntrack(source_ip, source_port, dest_ip, dest_port, protocol) + + # Check eBPF map (try both owner IPs: source and dest) + # The NPA code stores entries with both source and dest as owner + in_ebpf_src = self.check_ebpf_map(source_ip, source_port, dest_ip, dest_port, source_ip, protocol) + in_ebpf_dst = self.check_ebpf_map(source_ip, source_port, dest_ip, dest_port, dest_ip, protocol) + in_ebpf = in_ebpf_src or in_ebpf_dst + + return (in_kernel, in_ebpf) + + def verify_all_connections(self, client_ip: str, server_ip: str, ports: List[int]) -> Dict[str, List[int]]: + """ + Check all ports in kernel conntrack and eBPF map for validation. + Optimized: reads kernel and eBPF data ONCE, then checks all ports in memory. + Returns: dict with lists of ports in each state + """ + results = { + 'kernel_only': [], # In kernel but not eBPF (RACE pattern) + 'ebpf_only': [], # In eBPF but not kernel (unusual) + 'both': [], # In both (normal active) + 'neither': [] # In neither (expired/cleaned) + } + + logger.info(f"\n [Validation] Checking ALL {len(ports)} connections in kernel/eBPF...") + logger.info(f" Optimized: Reading data sources once, then checking in memory...") + + # Step 1: Read kernel conntrack ONCE + logger.info(f" Reading kernel conntrack table...") + kernel_entries = set() + if self.client_tuner_pod: + try: + cmd = ['cat', '/proc/net/nf_conntrack'] + result = self.kube.exec_in_pod('kube-system', self.client_tuner_pod, cmd) + + # Parse and cache all relevant entries + for line in result.stdout.split('\n'): + if 'tcp' in line.lower(): + # Extract all ports from this line + for port in ports: + if (f'src={client_ip}' in line and f'dst={server_ip}' in line and + f'sport={port}' in line and f'dport={self.config.server_port}' in line): + kernel_entries.add(port) + break + + logger.info(f" Found {len(kernel_entries)} entries in kernel conntrack") + except Exception as e: + logger.warning(f" Failed to read kernel conntrack: {e}") + + # Step 2: Read eBPF map ONCE + logger.info(f" Reading eBPF conntrack map...") + ebpf_entries = set() + if self.tuner_pod: + try: + cmd = ['bpftool', 'map', 'dump', 'pinned', + '/sys/fs/bpf/globals/aws/maps/global_aws_conntrack_map'] + result = self.kube.exec_in_pod('kube-system', self.tuner_pod, cmd) + output = result.stdout + + # DEBUG: Show raw eBPF map output sample + logger.info(f" [DEBUG] Raw eBPF map output (first 500 chars):") + logger.info(f" {output[:500]}...") + logger.info(f" [DEBUG] Total output length: {len(output)} chars") + logger.info(f" [DEBUG] Number of 'key:' entries: {output.count('key:')}") + + # Normalize output once + output_normalized = output.replace(' ', '').replace('\n', '').lower() + + # Check all ports against the single eBPF dump + proto_byte = "06" # TCP + padding_2 = "00 00" # 2-byte padding after source port + padding_1 = "00" # 1-byte padding after protocol + dst_ip_hex = self.ip_to_hex_be(server_ip) + dst_port_hex = self.port_to_hex_le(self.config.server_port) + src_ip_hex = self.ip_to_hex_be(client_ip) + + # DEBUG: Show example pattern for first port + first_port = ports[0] + first_port_hex = self.port_to_hex_le(first_port) + example_pattern_src = f"{src_ip_hex} {first_port_hex} {padding_2} {dst_ip_hex} {dst_port_hex} {proto_byte} {padding_1} {src_ip_hex}" + example_pattern_dst = f"{src_ip_hex} {first_port_hex} {padding_2} {dst_ip_hex} {dst_port_hex} {proto_byte} {padding_1} {dst_ip_hex}" + logger.info(f" [DEBUG] Example search patterns for port {first_port}:") + logger.info(f" Pattern (src owner): {example_pattern_src}") + logger.info(f" Pattern (dst owner): {example_pattern_dst}") + logger.info(f" Normalized (src): {example_pattern_src.replace(' ', '').lower()}") + logger.info(f" Normalized (dst): {example_pattern_dst.replace(' ', '').lower()}") + + for port in ports: + src_port_hex = self.port_to_hex_le(port) + + # Try with source as owner (with padding bytes) + pattern_src = f"{src_ip_hex} {src_port_hex} {padding_2} {dst_ip_hex} {dst_port_hex} {proto_byte} {padding_1} {src_ip_hex}" + # Try with dest as owner + pattern_dst = f"{src_ip_hex} {src_port_hex} {padding_2} {dst_ip_hex} {dst_port_hex} {proto_byte} {padding_1} {dst_ip_hex}" + + pattern_src_norm = pattern_src.replace(' ', '').lower() + pattern_dst_norm = pattern_dst.replace(' ', '').lower() + + if pattern_src_norm in output_normalized or pattern_dst_norm in output_normalized: + ebpf_entries.add(port) + + logger.info(f" Found {len(ebpf_entries)} entries in eBPF map") + except Exception as e: + logger.warning(f" Failed to read eBPF map: {e}") + + # Step 3: Categorize all ports based on cached data (instant) + logger.info(f" Categorizing {len(ports)} ports...") + for port in ports: + in_kernel = port in kernel_entries + in_ebpf = port in ebpf_entries + + if in_kernel and not in_ebpf: + results['kernel_only'].append(port) + elif in_ebpf and not in_kernel: + results['ebpf_only'].append(port) + elif in_kernel and in_ebpf: + results['both'].append(port) + else: + results['neither'].append(port) + + logger.info(f" ✓ Categorization complete") + return results + + def verify_connection_denial(self) -> bool: + """Check if connections are being denied (race condition occurred)""" + server_ip = self.kube.get_pod_ip(self.config.namespace, self.config.server_pod) + + # Try a simple connection + success, output = self.create_connection(server_ip, self.config.server_port, timeout=3) + + if not success and ('refused' in output.lower() or 'timeout' in output.lower()): + logger.warning("⚠ Connection denied - potential race condition!") + return True + + return False + + def execute_race_attempt(self, attempt_num: int, is_learning_phase: bool) -> bool: + """Execute a single attempt to trigger the race condition""" + logger.info(f"\n{'='*70}") + if is_learning_phase: + logger.info(f"ITERATION {attempt_num} - LEARNING PHASE") + logger.info(f" Will poll eBPF to learn cleanup timing") + else: + logger.info(f"ITERATION {attempt_num} - OPTIMIZED (using learned timing)") + logger.info(f"{'='*70}") + + server_ip = self.kube.get_pod_ip(self.config.namespace, self.config.server_pod) + kernel_timeout_start = None + + # Phase 1: Create 100 connections with FIXED source ports to populate + logger.info(f"\n[Phase 1] Creating {len(self.fixed_ports)} connections with FIXED ports {self.fixed_ports[0]}-{self.fixed_ports[-1]}") + logger.info(f" 5-tuples: ({self.config.client_pod}:50000-50099 → {server_ip}:8080)") + results = self.create_connection_wave_fixed_ports(server_ip, self.fixed_ports) + + # Phase 2: Wait 30s for kernel timeout + logger.info(f"\n[Phase 2] Waiting {self.config.kernel_timeout}s for kernel timeout") + logger.info(f" Kernel entries will expire, but remain in local cache...") + kernel_timeout_start = time.time() + + for remaining in range(self.config.kernel_timeout, 0, -5): + logger.info(f" T+{self.config.kernel_timeout - remaining}s / {self.config.kernel_timeout}s") + time.sleep(5) + + logger.info(f"✓ Kernel timeout complete (entries stale in cache)") + + # Phase 3: Poll eBPF (Iteration 1 only) OR use learned timing (Iterations 2-5) + if is_learning_phase: + logger.info(f"\n[Phase 3] LEARNING MODE - Polling eBPF to detect cleanup") + logger.info(f" Will monitor conntrack map until cleanup is detected...") + + cleanup_offset = self.poll_ebpf_for_cleanup(server_ip, poll_duration=120) + + if cleanup_offset: + self.learned_cleanup_timing = cleanup_offset + # First cleanup absolute: kernel_timeout + cleanup_offset + first_cleanup_abs = self.config.kernel_timeout + cleanup_offset + # Next cleanup: first + period + next_cleanup_abs = first_cleanup_abs + self.config.cleanup_period + # Spray 55-65s from first cleanup (5s before next cleanup) + spray_start_abs = first_cleanup_abs + 55 + + logger.info(f"[Phase 3] ✓ LEARNED: Cleanup at T+{cleanup_offset}s after kernel timeout") + logger.info(f"[Phase 3] First cleanup: T+{first_cleanup_abs}s absolute") + logger.info(f"[Phase 3] Next cleanup: T+{next_cleanup_abs}s absolute") + logger.info(f"[Phase 3] Will spray at T+{spray_start_abs}s (55s from first cleanup)") + + # Calculate wait time + elapsed = time.time() - kernel_timeout_start + wait_time = max(0, spray_start_abs - elapsed) + + if wait_time > 0: + logger.info(f"[Phase 3] Waiting {wait_time:.1f}s to reach spray window...") + time.sleep(wait_time) + else: + logger.warning("[Phase 3] Could not detect cleanup via eBPF polling") + logger.warning("[Phase 3] Falling back to estimated timing (T+55-60)") + + # Wait to T+55 + elapsed = time.time() - kernel_timeout_start + wait_time = max(0, 55 - elapsed) + if wait_time > 0: + logger.info(f"[Phase 3] Waiting {wait_time:.1f}s to reach T+55...") + time.sleep(wait_time) + else: + logger.info(f"\n[Phase 3] OPTIMIZED MODE - Synchronize with 60s cleanup cycle") + + if self.last_spray_absolute_time is not None: + # Calculate next spray time based on 60s cleanup cycle + current_time = time.time() + elapsed_since_last_spray = current_time - self.last_spray_absolute_time + + # Find next valid spray window (nearest multiple of 60s from last spray) + cycles_passed = math.ceil(elapsed_since_last_spray / self.config.cleanup_period) + next_spray_absolute_time = self.last_spray_absolute_time + (cycles_passed * self.config.cleanup_period) + + wait_time = next_spray_absolute_time - current_time + + logger.info(f"[Phase 3] Last spray was {elapsed_since_last_spray:.1f}s ago") + logger.info(f"[Phase 3] Next spray in {wait_time:.1f}s (cycle #{cycles_passed} from first spray)") + logger.info(f"[Phase 3] Target: {cycles_passed * self.config.cleanup_period}s from first spray") + + if wait_time > 0: + logger.info(f"[Phase 3] Waiting {wait_time:.1f}s to synchronize with cleanup cycle...") + time.sleep(wait_time) + else: + logger.warning(f"[Phase 3] Already past target time by {-wait_time:.1f}s, spraying immediately") + else: + logger.warning("[Phase 3] No previous spray timing available, using default T+55-60") + elapsed = time.time() - kernel_timeout_start + wait_time = max(0, 55 - elapsed) + if wait_time > 0: + time.sleep(wait_time) + + # Phase 4: Spray connections with SAME FIXED PORTS + current_offset = time.time() - kernel_timeout_start + + # Calculate target window for display + if self.learned_cleanup_timing: + first_cleanup_abs = self.config.kernel_timeout + self.learned_cleanup_timing + spray_target = f"T+{first_cleanup_abs + 55}s-{first_cleanup_abs + 65}s (targeting next cleanup)" + else: + spray_target = "T+55-65s (estimated)" + + logger.info(f"\n[Phase 4] CONNECTION SPRAY at T+{current_offset:.1f}s (Target: {spray_target})") + logger.info(f" Reusing SAME {len(self.fixed_ports)} fixed ports for 100% 5-tuple collision!") + logger.info(f" 5-tuples: ({self.config.client_pod}:50000-50099 → {server_ip}:8080) [SAME as Phase 1]") + + # Record spray start time for synchronizing future iterations + self.last_spray_absolute_time = time.time() + + spray_results = [] + spray_count = len(self.fixed_ports) # 100 connections with fixed ports + + def create_spray_connection(port): + """Helper function for parallel spray execution""" + success, output = self.create_connection_with_port(server_ip, self.config.server_port, port, timeout=1) + return success, output, port + + # Use ThreadPoolExecutor with 4 parallel threads + max_workers = 4 + completed = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all spray connection tasks + futures = {executor.submit(create_spray_connection, port): port for port in self.fixed_ports} + + # Process results as they complete + for future in as_completed(futures): + result = future.result() + spray_results.append(result) + + completed += 1 + if completed % 20 == 0: + elapsed_spray = time.time() - kernel_timeout_start + logger.info(f" Sprayed {completed}/{spray_count} (T+{elapsed_spray:.1f}s)") + + # Phase 5: Check for race condition + logger.info(f"\n[Phase 5] Analyzing results (categorizing failure types)") + time.sleep(0.05) + + # Categorize failures by type + race_denials = [] # REFUSED - actual race condition + bind_errors = [] # BIND_ERROR - parallelism issues + timeouts = [] # TIMEOUT - network/timeout issues + other_errors = [] # Other failures + + for success, output, port in spray_results: + if success: + continue + + # Categorize based on error message + if 'REFUSED:' in output: + race_denials.append(port) + logger.info(output) + elif 'BIND_ERROR:' in output: + bind_errors.append(port) + elif 'TIMEOUT:' in output: + timeouts.append(port) + else: + other_errors.append((port, output[:80])) + + successful = spray_count - len(race_denials) - len(bind_errors) - len(timeouts) - len(other_errors) + + # Log categorized results + logger.info(f" Spray results breakdown:") + logger.info(f" ✓ Success: {successful}/{spray_count}") + logger.info(f" ✗ REFUSED (Race condition): {len(race_denials)}/{spray_count}") + logger.info(f" ⚠ BIND_ERROR (Parallelism): {len(bind_errors)}/{spray_count}") + logger.info(f" ⏱ TIMEOUT: {len(timeouts)}/{spray_count}") + logger.info(f" ? OTHER: {len(other_errors)}/{spray_count}") + + # Warn about parallelism issues if present + if len(bind_errors) > 0: + logger.warning(f" ⚠ {len(bind_errors)} bind errors detected - may need to reduce parallelism") + logger.warning(f" Affected ports: {bind_errors[:5]}{'...' if len(bind_errors) > 5 else ''}") + + # FIRST ITERATION ONLY: Comprehensive validation of ALL connections + if is_learning_phase: + logger.info(f"\n[Phase 5 - Full Validation] Checking ALL {spray_count} connections (First iteration only)") + logger.info(f" This validates our kernel/eBPF verification logic...") + + client_ip = self.kube.get_pod_ip(self.config.namespace, self.config.client_pod) + validation_results = self.verify_all_connections(client_ip, server_ip, self.fixed_ports) + + logger.info(f"\n Full Validation Results:") + logger.info(f" 🔥 Kernel ONLY (race pattern): {len(validation_results['kernel_only'])}/{spray_count} ports") + logger.info(f" ✓ BOTH (active connections): {len(validation_results['both'])}/{spray_count} ports") + logger.info(f" ✗ NEITHER (expired/cleaned): {len(validation_results['neither'])}/{spray_count} ports") + logger.info(f" ? eBPF ONLY (unusual): {len(validation_results['ebpf_only'])}/{spray_count} ports") + + if len(validation_results['kernel_only']) > 0: + logger.info(f"\n Kernel-only ports (race candidates): {validation_results['kernel_only'][:20]}{'...' if len(validation_results['kernel_only']) > 20 else ''}") + + race_detected = False + confirmed_race_denials = [] + + # Verify REFUSED errors by checking kernel vs eBPF conntrack + if len(race_denials) > 0: + logger.info(f"\n Verifying {len(race_denials)} REFUSED connections against kernel/eBPF conntrack...") + + # Get client IP + client_ip = self.kube.get_pod_ip(self.config.namespace, self.config.client_pod) + + # Sample verification (check up to 10 ports to avoid overwhelming the system) + sample_size = min(len(race_denials), 10) + sample_ports = race_denials[:sample_size] + + for port in sample_ports: + # Check if this 5-tuple is in kernel but NOT in eBPF + in_kernel, in_ebpf = self.check_5tuple_in_conntrack( + client_ip, port, server_ip, self.config.server_port + ) + + if in_kernel and not in_ebpf: + # CONFIRMED RACE CONDITION! + confirmed_race_denials.append(port) + logger.error(f" ✓ Port {port}: CONFIRMED RACE (kernel: ✓, eBPF: ✗)") + elif not in_kernel and not in_ebpf: + logger.info(f" • Port {port}: Both expired (kernel: ✗, eBPF: ✗) - normal cleanup") + elif in_kernel and in_ebpf: + logger.warning(f" ⚠ Port {port}: Both present (kernel: ✓, eBPF: ✓) - server/other issue") + else: + logger.warning(f" ? Port {port}: Unusual state (kernel: ✗, eBPF: ✓)") + + # If we found ANY confirmed race conditions in the sample, it's a race + if len(confirmed_race_denials) > 0: + logger.error(f"\n 🔥 CONFIRMED RACE CONDITION!") + logger.error(f" {len(confirmed_race_denials)}/{sample_size} sampled ports show race pattern") + logger.error(f" (kernel has entry, eBPF missing) → Cleanup deleted active entries") + logger.error(f" Confirmed ports: {confirmed_race_denials}") + race_detected = True + else: + logger.warning(f" ⚠ REFUSED errors detected but could not confirm race pattern") + logger.warning(f" This may be a server issue or timing mismatch") + + # Legacy logging for backward compatibility + if len(race_denials) > 0: + logger.info(f"\n Summary: {len(race_denials)}/{spray_count} connections REFUSED") + logger.info(f" Affected ports: {race_denials[:10]}{'...' if len(race_denials) > 10 else ''}") + + if len(timeouts) > 10: # Significant number of timeouts + logger.warning(f"⚠ High timeout rate: {len(timeouts)}/{spray_count}") + logger.warning(f" May indicate race condition, network congestion, or server overload") + + # Additional verification + if self.verify_connection_denial(): + logger.error(f"🔥 RACE CONDITION DETECTED via verification check") + race_detected = True + + if race_detected: + self.race_detected = True + return True + + logger.info(f"✓ No race detected in this iteration") + return False + + def run_test(self) -> bool: + """Run the complete race condition reproduction test""" + logger.info("\n" + "="*70) + logger.info("CONNTRACK RACE CONDITION REPRODUCER - MULTI-ITERATION STRATEGY") + logger.info("="*70) + + # Validate environment + if not self.validate_environment(): + logger.error("Environment validation failed. Please fix issues and retry.") + return False + + # Explain strategy + logger.info("\n" + "="*70) + logger.info("TEST STRATEGY") + logger.info("="*70) + logger.info("Iteration 1 (Learning Phase):") + logger.info(" • Phase 1: Create 100 connections with FIXED ports (50000-50099)") + logger.info(" • Phase 2: Wait 30s (kernel timeout)") + logger.info(" • Phase 3: Poll eBPF → detect cleanup → STOP polling") + logger.info(" • Phase 4: Spray with SAME ports (100% 5-tuple collision)") + logger.info("") + logger.info("Iterations 2-5 (Optimized):") + logger.info(" • Phase 1: Create 100 connections with FIXED ports") + logger.info(" • Phase 2: Wait 30s") + logger.info(" • Phase 3: Skip polling (use learned timing)") + logger.info(" • Phase 4: Spray with SAME ports (guaranteed collision)") + logger.info("="*70) + + # Run 5 iterations + num_iterations = min(self.config.iterations, 5) + logger.info(f"\nExecuting {num_iterations} iterations...\n") + + attempts_with_race = 0 + + for i in range(1, num_iterations + 1): + is_learning = (i == 1) + + try: + if self.execute_race_attempt(i, is_learning_phase=is_learning): + attempts_with_race += 1 + logger.info(f"\n✓ Race detected in iteration {i}") + else: + logger.info(f"\n✗ No race in iteration {i}") + + except KeyboardInterrupt: + logger.warning("\n\nTest interrupted by user") + break + except Exception as e: + logger.error(f"Error in iteration {i}: {e}") + import traceback + traceback.print_exc() + continue + + # Print summary + logger.info("\n" + "="*70) + logger.info("TEST SUMMARY") + logger.info("="*70) + logger.info(f"Total iterations: {num_iterations}") + logger.info(f"Race conditions detected: {attempts_with_race}") + logger.info(f"Success rate: {(attempts_with_race/num_iterations)*100:.1f}%") + + if self.learned_cleanup_timing: + logger.info(f"\nLearned Timing:") + logger.info(f" Cleanup occurs at T+{self.learned_cleanup_timing}s after kernel timeout") + + if self.race_detected: + logger.info("\n✓ RACE CONDITION SUCCESSFULLY REPRODUCED") + logger.info("\nThe race occurs when:") + logger.info(" 1. Kernel expires a connection (30s timeout in test)") + logger.info(" 2. NPA's cleanup cycle snapshots kernel conntrack") + logger.info(" 3. A NEW connection reuses the same 5-tuple") + logger.info(" 4. Cleanup deletes the eBPF entry (based on old snapshot)") + logger.info(" 5. Response packet lookup fails → traffic denied") + logger.info("\n💡 Proposed fix: Require 2 consecutive misses before deletion") + return True + else: + logger.warning("\n⚠ Race condition not reproduced") + logger.info("\nPossible reasons:") + logger.info(" - Cleanup timing doesn't align with spray window") + logger.info(" - eBPF map updates may be protecting against race") + logger.info(" - Port reuse not occurring frequently enough") + logger.info("\nConsider:") + logger.info(" - Check if conntrack-tuner is running") + logger.info(" - Verify kernel timeout is actually 30s") + logger.info(" - Monitor eBPF map directly: bpftool map dump name conntrack_map") + return False + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser( + description='Reproduce conntrack race condition in Network Policy Agent', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Run with default configuration + python3 conntrack_race_reproducer.py + + # Run with custom config file + python3 conntrack_race_reproducer.py -c my_config.yaml + + # Run with more iterations + python3 conntrack_race_reproducer.py -i 20 + + # Verbose output + python3 conntrack_race_reproducer.py -v + """ + ) + + parser.add_argument('-c', '--config', default='test_config.yaml', + help='Path to configuration file (default: test_config.yaml)') + parser.add_argument('-i', '--iterations', type=int, + help='Number of test iterations (overrides config)') + parser.add_argument('-v', '--verbose', action='store_true', + help='Enable verbose logging') + + args = parser.parse_args() + + if args.verbose: + logger.setLevel(logging.DEBUG) + + # Load configuration + config = TestConfig.from_yaml(args.config) + + # Override with CLI args + if args.iterations: + config.iterations = args.iterations + + # Create reproducer and run test + reproducer = ConntrackRaceReproducer(config) + + try: + success = reproducer.run_test() + sys.exit(0 if success else 1) + except KeyboardInterrupt: + logger.warning("\nTest interrupted by user") + sys.exit(130) + except Exception as e: + logger.error(f"Unexpected error: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/validation-scripts/k8s_conntrack_reset.yaml b/validation-scripts/k8s_conntrack_reset.yaml new file mode 100644 index 00000000..39afa62b --- /dev/null +++ b/validation-scripts/k8s_conntrack_reset.yaml @@ -0,0 +1,83 @@ +--- +# Reset DaemonSet - Restores default conntrack timeouts (180s) +# +# USAGE: +# 1. First delete the tuner: kubectl delete ds conntrack-tuner -n kube-system +# 2. Apply this reset: kubectl apply -f k8s_conntrack_reset.yaml +# 3. Wait for completion: kubectl wait --for=condition=ready pod -l app=conntrack-reset -n kube-system --timeout=60s +# 4. Clean up: kubectl delete -f k8s_conntrack_reset.yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: conntrack-reset + namespace: kube-system + labels: + app: conntrack-reset + purpose: cleanup +spec: + selector: + matchLabels: + app: conntrack-reset + template: + metadata: + labels: + app: conntrack-reset + spec: + hostNetwork: true + hostPID: true + tolerations: + - operator: Exists + containers: + - name: reset + image: busybox:latest + securityContext: + privileged: true + command: ["/bin/sh"] + args: + - -c + - | + echo "========================================" + echo "Conntrack Timeout Reset Starting" + echo "Node: $(hostname)" + echo "========================================" + echo "" + + if [ -f /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established ]; then + # Show current timeout + CURRENT=$(cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established) + echo "Current timeout: ${CURRENT} seconds" + + # Reset to default (180s) + echo "Resetting to 180 seconds..." + echo 180 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established + + # Verify + NEW_TIMEOUT=$(cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established) + echo "New timeout: ${NEW_TIMEOUT} seconds" + echo "" + + if [ "$NEW_TIMEOUT" = "180" ]; then + echo "✓ Successfully reset conntrack timeout to 180 seconds" + else + echo "✗ Failed to reset conntrack timeout" + exit 1 + fi + else + echo "ℹ Conntrack module not loaded, nothing to reset" + fi + + echo "" + echo "========================================" + echo "Reset complete on $(hostname)" + echo "========================================" + + # Exit after one-time execution + exit 0 + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 50m + memory: 32Mi + restartPolicy: OnFailure diff --git a/validation-scripts/k8s_conntrack_tuner.yaml b/validation-scripts/k8s_conntrack_tuner.yaml new file mode 100644 index 00000000..b60c2f36 --- /dev/null +++ b/validation-scripts/k8s_conntrack_tuner.yaml @@ -0,0 +1,196 @@ +--- +# Privileged DaemonSet to set kernel conntrack timeout on all nodes +# This is required for faster testing of the race condition +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: conntrack-tuner + namespace: kube-system + labels: + app: conntrack-tuner + purpose: testing +spec: + selector: + matchLabels: + app: conntrack-tuner + template: + metadata: + labels: + app: conntrack-tuner + spec: + hostNetwork: true + hostPID: true + # Tolerate all taints to run on all nodes + tolerations: + - operator: Exists + containers: + - name: tuner + # Using Alpine with bpftool from static binary + image: alpine:latest + securityContext: + privileged: true + volumeMounts: + - name: bpffs + mountPath: /sys/fs/bpf + command: ["/bin/sh"] + args: + - -c + - | + echo "========================================" + echo "Conntrack Timeout Tuner Starting" + echo "========================================" + echo "" + + # Install wget (busybox has it but Alpine's might be better) + echo "Installing bpftool from static binary..." + + # Download static bpftool binary directly (no package manager) + echo " Downloading..." + if wget -q -O /tmp/bpftool.tar.gz https://github.com/libbpf/bpftool/releases/download/v7.3.0/bpftool-v7.3.0-amd64.tar.gz 2>&1; then + echo " ✓ Download complete" + + echo " Extracting..." + if tar -xzf /tmp/bpftool.tar.gz -C /usr/local/bin/ 2>&1; then + chmod +x /usr/local/bin/bpftool + rm /tmp/bpftool.tar.gz + echo "✓ bpftool installed successfully" + + if [ -x /usr/local/bin/bpftool ]; then + echo " Version: $(bpftool --version 2>&1 | head -1 || echo 'unable to get version')" + else + echo " ⚠ bpftool extracted but not executable" + fi + else + echo " ⚠ Failed to extract bpftool" + fi + else + echo " ⚠ Failed to download bpftool" + echo " eBPF polling will not be available" + fi + echo "" + + # Check if conntrack module is loaded + if [ ! -f /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established ]; then + echo "ERROR: Conntrack module not loaded" + echo "This is expected if netfilter/conntrack is not in use" + exit 1 + fi + + # Show current timeout + echo "Current TCP established timeout:" + cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established + echo "seconds" + echo "" + + # Set new timeout + echo "Setting TCP established timeout to 30 seconds..." + echo 30 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established + + # Verify + NEW_TIMEOUT=$(cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established) + echo "New timeout: ${NEW_TIMEOUT} seconds" + echo "" + + if [ "$NEW_TIMEOUT" = "30" ]; then + echo "✓ Successfully set conntrack timeout to 30 seconds" + else + echo "✗ Failed to set conntrack timeout" + exit 1 + fi + + echo "" + echo "Available tools for debugging:" + echo " - bpftool: $(which bpftool 2>/dev/null || echo 'not found')" + if [ -n "$(which bpftool 2>/dev/null)" ]; then + echo " Version: $(bpftool --version 2>&1 | head -1)" + fi + echo "" + echo "========================================" + echo "Configuration complete on $(hostname)" + echo "========================================" + echo "" + echo "To reset to default (180s):" + echo " echo 180 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established" + echo "" + echo "To query eBPF maps:" + echo " bpftool map list" + echo " bpftool map dump name conntrack_map" + echo "" + echo "Keeping pod running... (delete DaemonSet to stop)" + + # Keep the pod running + while true; do + sleep 3600 + done + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 50m + memory: 32Mi + volumes: + - name: bpffs + hostPath: + path: /sys/fs/bpf + type: DirectoryOrCreate + restartPolicy: Always + +--- +# Reset DaemonSet - Run this to restore default timeouts +# NOTE: Do not apply this at the same time as conntrack-tuner! +# Usage: +# 1. Delete conntrack-tuner first: kubectl delete -f k8s_conntrack_tuner.yaml +# 2. Then apply this: kubectl apply -f k8s_conntrack_reset.yaml +# 3. Wait for completion +# 4. Delete this: kubectl delete -f k8s_conntrack_reset.yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: conntrack-reset + namespace: kube-system + labels: + app: conntrack-reset + purpose: cleanup +spec: + selector: + matchLabels: + app: conntrack-reset + template: + metadata: + labels: + app: conntrack-reset + spec: + hostNetwork: true + hostPID: true + tolerations: + - operator: Exists + containers: + - name: reset + image: busybox:latest + securityContext: + privileged: true + command: ["/bin/sh"] + args: + - -c + - | + echo "Resetting conntrack timeout to default (180s)..." + + if [ -f /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established ]; then + echo 180 > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established + NEW_TIMEOUT=$(cat /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established) + echo "✓ Reset to: ${NEW_TIMEOUT} seconds on $(hostname)" + else + echo "✓ Conntrack module not loaded on $(hostname), nothing to reset" + fi + + # Exit after one-time execution + exit 0 + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 50m + memory: 32Mi + restartPolicy: OnFailure diff --git a/validation-scripts/k8s_test_pods.yaml b/validation-scripts/k8s_test_pods.yaml new file mode 100644 index 00000000..8d17c7ea --- /dev/null +++ b/validation-scripts/k8s_test_pods.yaml @@ -0,0 +1,146 @@ +--- +# Namespace for test resources +apiVersion: v1 +kind: Namespace +metadata: + name: conntrack-test + labels: + purpose: conntrack-race-testing + +--- +# Server Pod - Runs a simple TCP server +apiVersion: v1 +kind: Pod +metadata: + name: test-server + namespace: conntrack-test + labels: + app: test-server + role: server +spec: + containers: + - name: server + image: nicolaka/netshoot:latest + command: + - /bin/sh + - -c + - | + # Start a simple TCP echo server on port 8080 + echo "Starting TCP server on port 8080..." + while true; do + echo "Connection accepted at $(date)" | nc -l -p 8080 + done + ports: + - containerPort: 8080 + name: tcp-server + protocol: TCP + resources: + requests: + memory: "64Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "200m" + restartPolicy: Always + +--- +# Server Service - Exposes the server pod +apiVersion: v1 +kind: Service +metadata: + name: test-server-svc + namespace: conntrack-test +spec: + selector: + app: test-server + ports: + - port: 8080 + targetPort: 8080 + protocol: TCP + name: tcp + type: ClusterIP + +--- +# Client Pod - Used to generate connections +apiVersion: v1 +kind: Pod +metadata: + name: test-client + namespace: conntrack-test + labels: + app: test-client + role: client +spec: + containers: + - name: client + image: nicolaka/netshoot:latest + command: + - /bin/sh + - -c + - | + # Keep the pod running + while true; do + sleep 3600 + done + resources: + requests: + memory: "64Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "200m" + restartPolicy: Always + +--- +# Optional: NetworkPolicy for testing +# Uncomment if you want to test with actual network policies +# apiVersion: networking.k8s.io/v1 +# kind: NetworkPolicy +# metadata: +# name: test-netpol +# namespace: conntrack-test +# spec: +# podSelector: +# matchLabels: +# app: test-server +# policyTypes: +# - Ingress +# - Egress +# ingress: +# - from: +# - podSelector: +# matchLabels: +# app: test-client +# ports: +# - protocol: TCP +# port: 8080 +# egress: +# - to: +# - podSelector: +# matchLabels: +# app: test-client + +--- +# ConfigMap with test utilities (optional) +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-scripts + namespace: conntrack-test +data: + simple-server.sh: | + #!/bin/sh + # Simple TCP server that logs connections + echo "Starting TCP server on port 8080..." + while true; do + echo "$(date): Accepting connection" | nc -l -p 8080 + done + + connection-test.sh: | + #!/bin/sh + # Test script to verify connectivity + TARGET=${1:-test-server-svc} + PORT=${2:-8080} + echo "Testing connection to $TARGET:$PORT" + echo "TEST" | nc -v -w 2 $TARGET $PORT + echo "Connection test complete" diff --git a/validation-scripts/setup.sh b/validation-scripts/setup.sh new file mode 100755 index 00000000..706c030a --- /dev/null +++ b/validation-scripts/setup.sh @@ -0,0 +1,121 @@ +#!/bin/bash +# +# Setup script for Conntrack Race Condition Reproducer +# This script validates prerequisites and helps set up the test environment +# + +set -e + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo "==================================================" +echo "Conntrack Race Condition Reproducer - Setup" +echo "==================================================" +echo "" + +# Check kubectl +echo -n "Checking kubectl... " +if command -v kubectl &> /dev/null; then + KUBECTL_VERSION=$(kubectl version --client --short 2>/dev/null || kubectl version --client 2>&1 | grep "Client Version" | cut -d: -f2) + echo -e "${GREEN}✓${NC} Found: $KUBECTL_VERSION" +else + echo -e "${RED}✗${NC} kubectl not found" + echo "Please install kubectl: https://kubernetes.io/docs/tasks/tools/" + exit 1 +fi + +# Check kubectl cluster access +echo -n "Checking cluster access... " +if kubectl cluster-info &> /dev/null; then + echo -e "${GREEN}✓${NC} Connected to cluster" + CLUSTER_NAME=$(kubectl config current-context 2>/dev/null || echo "unknown") + echo " Current context: $CLUSTER_NAME" +else + echo -e "${RED}✗${NC} Cannot connect to cluster" + echo "Please configure kubectl with cluster credentials" + exit 1 +fi + +# Check Python +echo -n "Checking Python... " +if command -v python3 &> /dev/null; then + PYTHON_VERSION=$(python3 --version) + echo -e "${GREEN}✓${NC} Found: $PYTHON_VERSION" +else + echo -e "${RED}✗${NC} python3 not found" + echo "Please install Python 3.8 or later" + exit 1 +fi + +# Check/Install PyYAML +echo -n "Checking PyYAML... " +if python3 -c "import yaml" 2>/dev/null; then + echo -e "${GREEN}✓${NC} Installed" +else + echo -e "${YELLOW}!${NC} Not found, installing..." + pip3 install pyyaml + if python3 -c "import yaml" 2>/dev/null; then + echo -e "${GREEN}✓${NC} Successfully installed PyYAML" + else + echo -e "${RED}✗${NC} Failed to install PyYAML" + echo "Please install manually: pip3 install pyyaml" + exit 1 + fi +fi + +# Check permissions +echo "" +echo "Checking Kubernetes permissions..." + +echo -n " Can create namespace... " +if kubectl auth can-i create namespace &> /dev/null; then + echo -e "${GREEN}✓${NC}" +else + echo -e "${YELLOW}!${NC} May need additional permissions" +fi + +echo -n " Can create pods... " +if kubectl auth can-i create pods -n conntrack-test &> /dev/null; then + echo -e "${GREEN}✓${NC}" +else + echo -e "${YELLOW}!${NC} May need additional permissions" +fi + +echo -n " Can exec into pods... " +if kubectl auth can-i create pods/exec -n conntrack-test &> /dev/null; then + echo -e "${GREEN}✓${NC}" +else + echo -e "${YELLOW}!${NC} May need additional permissions" +fi + +# Make script executable +echo "" +echo "Making Python script executable..." +chmod +x conntrack_race_reproducer.py +echo -e "${GREEN}✓${NC} Done" + +echo "" +echo "==================================================" +echo "Setup complete!" +echo "==================================================" +echo "" +echo "Next steps:" +echo "" +echo "1. Deploy test resources:" +echo " kubectl apply -f k8s_test_pods.yaml" +echo "" +echo "2. Wait for pods to be ready:" +echo " kubectl wait --for=condition=ready pod/test-client -n conntrack-test --timeout=60s" +echo " kubectl wait --for=condition=ready pod/test-server -n conntrack-test --timeout=60s" +echo "" +echo "3. (Optional) Edit test_config.yaml to match your environment" +echo "" +echo "4. Run the reproducer:" +echo " python3 conntrack_race_reproducer.py" +echo "" +echo "For more information, see README.md" +echo "" diff --git a/validation-scripts/test_config.yaml b/validation-scripts/test_config.yaml new file mode 100644 index 00000000..42eb839b --- /dev/null +++ b/validation-scripts/test_config.yaml @@ -0,0 +1,29 @@ +# Conntrack Race Condition Test Configuration + +# Kubernetes settings +namespace: conntrack-test +client_pod: test-client +server_pod: test-server +server_port: 8080 + +# NPA (Network Policy Agent) settings +npa_namespace: kube-system +npa_pod_label: k8s-app=aws-node # Adjust based on your NPA deployment + +# Timing parameters (in seconds) +# Note: For faster testing, these are reduced from production values +kernel_timeout: 30 # Kernel conntrack timeout (production: ~180s) +cleanup_period: 60 # NPA cleanup cycle period (production: 300s) + +# Test parameters +iterations: 5 # Number of test attempts +connections_per_wave: 20 # Connections created per wave (Phase 1) + +# Advanced settings (optional) +# retry_delay: 5 # Delay between retries (seconds) +# max_wait_time: 600 # Maximum time to wait for cleanup cycle (seconds) + +# Notes: +# - Reduce kernel_timeout for faster testing (requires node configuration) +# - Increase iterations to improve race detection probability +# - Adjust cleanup_period to match your NPA configuration