Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func main() {

npMode, isMultiNICEnabled := lo.Must2(getNetworkPolicyConfigsFromIpamd(log))

ebpfClient := lo.Must1(ebpf.NewBpfClient(nodeIP, ctrlConfig.EnablePolicyEventLogs, ctrlConfig.EnableCloudWatchLogs,
ebpfClient := lo.Must1(ebpf.NewBpfClient(nodeIP, ctrlConfig.EnablePolicyEventLogs, ctrlConfig.PolicyEventLogsScope, ctrlConfig.EnableCloudWatchLogs,
ctrlConfig.EnableIPv6, ctrlConfig.ConntrackCacheCleanupPeriod, ctrlConfig.ConntrackCacheTableSize, npMode, isMultiNICEnabled))
ebpfClient.ReAttachEbpfProbes()

Expand Down
7 changes: 6 additions & 1 deletion pkg/config/controller_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ const (
defaultMaxConcurrentReconciles = 3
defaultConntrackCacheCleanupPeriod = 300
defaultConntrackCacheTableSize = 512 * 1024
defaultPolicyEventLogsScope = "ACCEPT"
flagEnablePolicyEventLogs = "enable-policy-event-logs"
flagPolicyEventLogsScope = "policy-event-logs-scope"
flagEnableCloudWatchLogs = "enable-cloudwatch-logs"
flagEnableIPv6 = "enable-ipv6"
flagEnableNetworkPolicy = "enable-network-policy"
Expand Down Expand Up @@ -58,6 +60,8 @@ type ControllerConfig struct {
RuntimeConfig RuntimeConfig
// Configuration for enabling profiling
EnableProfiling bool
// Policy event logs scope
PolicyEventLogsScope string
}

func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
Expand All @@ -80,7 +84,8 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
"Cleanup interval for network policy agent conntrack cache")
fs.IntVar(&cfg.ConntrackCacheTableSize, flagConntrackCacheTableSize, defaultConntrackCacheTableSize, ""+
"Table size for network policy agent conntrack cache")

fs.StringVar(&cfg.PolicyEventLogsScope, flagPolicyEventLogsScope, defaultPolicyEventLogsScope, ""+
"Set the policy event logs scope, if set to ACCEPT both ACCEPT and DENY events are generated, if set to DENY only DENY events are generated - ACCEPT, DENY")
cfg.RuntimeConfig.BindFlags(fs)
}

Expand Down
65 changes: 56 additions & 9 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,23 @@ var (
EVENTS_V6_BINARY = "v6events.bpf.o"
AWS_CONNTRACK_MAP = "aws_conntrack_map"
AWS_EVENTS_MAP = "policy_events"
AWS_EVENTS_SCOPE_MAP = "policy_events_scope"
EKS_CLI_BINARY = "aws-eks-na-cli"
EKS_V6_CLI_BINARY = "aws-eks-na-cli-v6"
hostBinaryPath = "/host/opt/cni/bin/"
IPv4_HOST_MASK = "/32"
IPv6_HOST_MASK = "/128"
CONNTRACK_MAP_PIN_PATH = "/sys/fs/bpf/globals/aws/maps/global_aws_conntrack_map"
POLICY_EVENTS_MAP_PIN_PATH = "/sys/fs/bpf/globals/aws/maps/global_policy_events"
POLICY_EVENTS_SCOPE_MAP_PIN_PATH = "/sys/fs/bpf/globals/aws/maps/global_policy_events_scope"
CATCH_ALL_PROTOCOL corev1.Protocol = "ANY_IP_PROTOCOL"
POD_VETH_PREFIX = "eni"
POLICIES_APPLIED = 0
DEFAULT_ALLOW = 1
DEFAULT_DENY = 2
POD_STATE_MAP_KEY = 0
CLUSTER_POLICY_POD_STATE_MAP_KEY = 1
POLICY_EVENTS_SCOPE_MAP_KEY = 0
BRANCH_ENI_VETH_PREFIX = "vlan"
INTERFACE_COUNT_UNKNOWN = -1 // Used when caller doesn't know interface count
INTERFACE_COUNT_DEFAULT = 1 // Default single interface
Expand Down Expand Up @@ -85,6 +88,10 @@ type pod_state struct {
state uint8
}

type policy_scope struct {
scope uint8
}

func msSince(start time.Time) float64 {
return float64(time.Since(start) / time.Millisecond)
}
Expand Down Expand Up @@ -115,9 +122,10 @@ type BPFContext struct {
conntrackMapInfo goebpfmaps.BpfMap
}

func NewBpfClient(nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs bool,
func NewBpfClient(nodeIP string, enablePolicyEventLogs bool, policyEventsLogsScope string, enableCloudWatchLogs bool,
enableIPv6 bool, conntrackTTL int, conntrackTableSize int, networkPolicyMode string, isMultiNICEnabled bool) (*bpfClient, error) {
var conntrackMap goebpfmaps.BpfMap
var policyEventsScopeMap goebpfmaps.BpfMap

ebpfClient := &bpfClient{
// Maps PolicyEndpoint resource to it's eBPF context
Expand Down Expand Up @@ -146,7 +154,7 @@ func NewBpfClient(nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs boo
ebpfClient.hostMask = ingressBinary, egressBinary, hostMask

bpfBinaries := []string{eventsBinary, ingressBinary, egressBinary, cliBinary}
isConntrackMapPresent, isPolicyEventsMapPresent := false, false
isConntrackMapPresent, isPolicyEventsMapPresent, isPolicyEventsScopeMapPresent := false, false, false
var err error

ebpfClient.bpfSDKClient = goelf.New()
Expand Down Expand Up @@ -181,7 +189,7 @@ func NewBpfClient(nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs boo
var interfaceNametoIngressPinPath map[string]string
var interfaceNametoEgressPinPath map[string]string
eventBufferFD := 0
isConntrackMapPresent, isPolicyEventsMapPresent, eventBufferFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err = ebpfClient.recoverBPFState(ebpfClient.bpfTCClient, ebpfClient.bpfSDKClient, ebpfClient.policyEndpointeBPFContext,
isConntrackMapPresent, isPolicyEventsMapPresent, isPolicyEventsScopeMapPresent, eventBufferFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err = ebpfClient.recoverBPFState(ebpfClient.bpfTCClient, ebpfClient.bpfSDKClient, ebpfClient.policyEndpointeBPFContext,
ebpfClient.globalMaps, ingressUpdateRequired, egressUpdateRequired, eventsUpdateRequired)
if err != nil {
//Log the error and move on
Expand All @@ -196,7 +204,7 @@ func NewBpfClient(nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs boo
// - Current events binary packaged with network policy agent is different than the one installed
// during the previous installation (or)
// - Either Conntrack Map (or) Events Map is currently missing on the node
if eventsUpdateRequired || (!isConntrackMapPresent || !isPolicyEventsMapPresent) {
if eventsUpdateRequired || (!isConntrackMapPresent || !isPolicyEventsMapPresent || !isPolicyEventsScopeMapPresent) {
log().Info("Install the default global maps")
eventsProbe := EVENTS_BINARY
if enableIPv6 {
Expand Down Expand Up @@ -226,6 +234,10 @@ func NewBpfClient(nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs boo
if mapName == AWS_EVENTS_MAP {
eventBufferFD = int(mapInfo.MapFD)
}
if mapName == AWS_EVENTS_SCOPE_MAP {
policyEventsScopeMap = mapInfo
isPolicyEventsScopeMapPresent = true
}
}
}

Expand All @@ -244,6 +256,36 @@ func NewBpfClient(nodeIP string, enablePolicyEventLogs, enableCloudWatchLogs boo
ebpfClient.conntrackClient = conntrack.NewConntrackClient(conntrackMap, enableIPv6)
log().Info("Initialized Conntrack client")

//if present update the PolicyEventsScope Map
if isPolicyEventsScopeMapPresent {
recoveredPolicyEventsScopeMap, ok := ebpfClient.globalMaps.Load(POLICY_EVENTS_SCOPE_MAP_PIN_PATH)
if ok {
policyEventsScopeMap = recoveredPolicyEventsScopeMap.(goebpfmaps.BpfMap)
log().Info("Derived existing policyEventsScopeMap identifier")
} else {
log().Errorf("Unable to get policyEventsScopeMap post recovery..error: %v", err)
sdkAPIErr.WithLabelValues("RecoveryFailed").Inc()
return nil, err
}

key := uint32(POLICY_EVENTS_SCOPE_MAP_KEY)
scope := uint8(utils.ACCEPT.Index())

if policyEventsLogsScope == "DENY" {
scope = uint8(utils.DENY.Index())
}

value := policy_scope{scope: scope}
log().Infof("Will update Policy Events Scope Map: key=%d value=%v", key, value)
err := policyEventsScopeMap.CreateUpdateMapEntry(uintptr(unsafe.Pointer(&key)), uintptr(unsafe.Pointer(&value)), 0)

if err != nil {
log().Errorf("Policy Events Scope Map update failed: %v", err)
sdkAPIErr.WithLabelValues("updateEbpfMap-policy-events-scope").Inc()
}
log().Infof("Updated Policy Events Scope Map: key=%d value=%v", key, value)
}

if enablePolicyEventLogs {
err = events.ConfigurePolicyEventsLogging(enableCloudWatchLogs, eventBufferFD, enableIPv6)
if err != nil {
Expand Down Expand Up @@ -379,8 +421,8 @@ func checkAndUpdateBPFBinaries(bpfTCClient tc.BpfTc, bpfBinaries []string, hostB
}

func (l *bpfClient) recoverBPFState(bpfTCClient tc.BpfTc, eBPFSDKClient goelf.BpfSDKClient, policyEndpointeBPFContext *sync.Map, globalMaps *sync.Map, updateIngressProbe,
updateEgressProbe, updateEventsProbe bool) (bool, bool, int, map[string]string, map[string]string, error) {
isConntrackMapPresent, isPolicyEventsMapPresent := false, false
updateEgressProbe, updateEventsProbe bool) (bool, bool, bool, int, map[string]string, map[string]string, error) {
isConntrackMapPresent, isPolicyEventsMapPresent, isPolicyEventsScopeMapPresent := false, false, false
eventsMapFD := 0
var interfaceNametoIngressPinPath = make(map[string]string)
var interfaceNametoEgressPinPath = make(map[string]string)
Expand All @@ -392,7 +434,7 @@ func (l *bpfClient) recoverBPFState(bpfTCClient tc.BpfTc, eBPFSDKClient goelf.Bp
if err != nil {
log().Errorf("failed to recover global maps %v", err)
sdkAPIErr.WithLabelValues("RecoverGlobalMaps").Inc()
return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, nil
return isConntrackMapPresent, isPolicyEventsMapPresent, isPolicyEventsScopeMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, nil
}
log().Infof("Total no of global maps recovered count: %d", len(recoveredGlobalMaps))
for globalMapName, globalMap := range recoveredGlobalMaps {
Expand All @@ -407,6 +449,11 @@ func (l *bpfClient) recoverBPFState(bpfTCClient tc.BpfTc, eBPFSDKClient goelf.Bp
eventsMapFD = int(globalMap.MapFD)
log().Infof("Policy event Map is already present on the node Recovered FD: %d", eventsMapFD)
}
if globalMapName == POLICY_EVENTS_SCOPE_MAP_PIN_PATH {
log().Info("Policy event scope Map is already present on the node")
isPolicyEventsScopeMapPresent = true
globalMaps.Store(globalMapName, globalMap)
}
}
}

Expand Down Expand Up @@ -505,7 +552,7 @@ func (l *bpfClient) recoverBPFState(bpfTCClient tc.BpfTc, eBPFSDKClient goelf.Bp
if err != nil {
log().Errorf("GetAllBpfProgramsAndMaps failed %v", err)
sdkAPIErr.WithLabelValues("GetAllBpfProgramsAndMaps").Inc()
return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err
return isConntrackMapPresent, isPolicyEventsMapPresent, isPolicyEventsScopeMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, err
}
log().Infof("GetAllBpfProgramsAndMaps returned %d", len(bpfState))
progIdToPinPath := make(map[int]string)
Expand Down Expand Up @@ -543,7 +590,7 @@ func (l *bpfClient) recoverBPFState(bpfTCClient tc.BpfTc, eBPFSDKClient goelf.Bp
log().Info("Collected all data for reattaching probes")
}

return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, nil
return isConntrackMapPresent, isPolicyEventsMapPresent, isPolicyEventsScopeMapPresent, eventsMapFD, interfaceNametoIngressPinPath, interfaceNametoEgressPinPath, nil
}

func (l *bpfClient) ReAttachEbpfProbes() error {
Expand Down
32 changes: 22 additions & 10 deletions pkg/ebpf/c/tc.v4egress.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ struct pod_state {
__u8 state; // 0 => POLICIES_APPLIED, 1 => DEFAULT_ALLOW, 2 => DEFAULT_DENY
};

struct policy_scope {
__u8 scope;
};

struct bpf_map_def_pvt SEC("maps") egress_pod_state_map = {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(__u32), // network policy key 0, cluster policy key 1
Expand All @@ -136,6 +140,15 @@ struct bpf_map_def_pvt SEC("maps") egress_pod_state_map = {

struct bpf_map_def_pvt aws_conntrack_map;
struct bpf_map_def_pvt policy_events;
struct bpf_map_def_pvt policy_events_scope;

static void publishPolicyEvent(struct data_t *evt) {
__u32 plsc_key = 0;
struct policy_scope *plsc = bpf_map_lookup_elem(&policy_events_scope, &plsc_key);
if (plsc == NULL || plsc->scope >= evt->verdict) {
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
}
}

static __always_inline int evaluateClusterPolicyByLookUp(struct keystruct trie_key, struct conntrack_key flow_key, __u32 *admin_tier_priority, __u8 *baseline_tier_action, __u32 *baseline_tier_priority) {

Expand Down Expand Up @@ -254,15 +267,15 @@ static __always_inline int evaluateFlow(struct keystruct trie_key, struct conntr
case ACTION_DENY: {
evt->verdict = 0;
evt->tier = ADMIN_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_DROP;
}
case ACTION_ALLOW: {
flow_val.val = pod_state_val;
bpf_map_update_elem(&aws_conntrack_map, &flow_key, &flow_val, 0);
evt->verdict = 1;
evt->tier = ADMIN_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_OK;
}
default:
Expand All @@ -278,13 +291,13 @@ static __always_inline int evaluateFlow(struct keystruct trie_key, struct conntr
bpf_map_update_elem(&aws_conntrack_map, &flow_key, &flow_val, 0); // 0 - BPF_ANY
evt->verdict = 1;
evt->tier = NETWORK_POLICY_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_OK;
}
case ACTION_DENY:{
evt->verdict = 0;
evt->tier = NETWORK_POLICY_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_DROP;
}
case ACTION_PASS:
Expand All @@ -296,15 +309,15 @@ static __always_inline int evaluateFlow(struct keystruct trie_key, struct conntr
case ACTION_DENY: {
evt->verdict = 0;
evt->tier = BASELINE_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_DROP;
}
case ACTION_ALLOW: {
flow_val.val = pod_state_val;
bpf_map_update_elem(&aws_conntrack_map, &flow_key, &flow_val, 0);
evt->verdict = 1;
evt->tier = BASELINE_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_OK;
}
case ACTION_PASS: {
Expand All @@ -314,13 +327,13 @@ static __always_inline int evaluateFlow(struct keystruct trie_key, struct conntr
bpf_map_update_elem(&aws_conntrack_map, &flow_key, &flow_val, 0);
evt->verdict = 1;
evt->tier = DEFAULT_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_OK;
}
case DEFAULT_DENY: {
evt->verdict = 0;
evt->tier = DEFAULT_TIER;
bpf_ringbuf_output(&policy_events, evt, sizeof(*evt), 0);
publishPolicyEvent(evt);
return BPF_DROP;
}
}
Expand Down Expand Up @@ -349,7 +362,6 @@ int handle_egress(struct __sk_buff *skb)
__builtin_memset(&src_ip, 0, sizeof(src_ip));
__builtin_memset(&reverse_flow_key, 0, sizeof(reverse_flow_key));


struct ethhdr *ether = data;
if (data + sizeof(*ether) > data_end) {
return BPF_OK;
Expand Down Expand Up @@ -431,7 +443,7 @@ int handle_egress(struct __sk_buff *skb)
if ((pst == NULL) || (clusterpolicy_pst == NULL)) {
evt.verdict = 0;
evt.tier = ERROR_TIER;
bpf_ringbuf_output(&policy_events, &evt, sizeof(evt), 0);
publishPolicyEvent(&evt);
return BPF_DROP;
}

Expand Down
Loading