Skip to content

Commit b2a5ab5

Browse files
committed
feat(packetparser): Allow sampling of packets
Signed-off-by: Matthew McKeen <[email protected]>
1 parent 1983b55 commit b2a5ab5

File tree

8 files changed

+94
-42
lines changed

8 files changed

+94
-42
lines changed

deploy/standard/manifests/controller/helm/retina/templates/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ data:
2525
bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }}
2626
dataAggregationLevel: {{ .Values.dataAggregationLevel }}
2727
telemetryInterval: {{ .Values.daemonset.telemetryInterval }}
28+
dataSamplingRate: {{ .Values.dataSamplingRate }}
2829
{{- end}}
2930
---
3031
{{- if .Values.os.windows}}

deploy/standard/manifests/controller/helm/retina/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ remoteContext: false
5656
enableAnnotations: false
5757
bypassLookupIPOfInterest: false
5858
dataAggregationLevel: "low"
59+
dataSamplingRate: 1
5960

6061
imagePullSecrets: []
6162
nameOverride: "retina"

pkg/config/config.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ const (
2424
)
2525

2626
var (
27-
ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval)
28-
DefaultTelemetryInterval = 15 * time.Minute
27+
ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval)
28+
DefaultTelemetryInterval = 15 * time.Minute
29+
DefaultSamplingRate uint32 = 1
2930
)
3031

3132
func (l *Level) UnmarshalText(text []byte) error {
@@ -75,6 +76,7 @@ type Config struct {
7576
DataAggregationLevel Level `yaml:"dataAggregationLevel"`
7677
MonitorSockPath string `yaml:"monitorSockPath"`
7778
TelemetryInterval time.Duration `yaml:"telemetryInterval"`
79+
DataSamplingRate uint32 `yaml:"dataSamplingRate"`
7880
}
7981

8082
func GetConfig(cfgFilename string) (*Config, error) {
@@ -122,6 +124,12 @@ func GetConfig(cfgFilename string) (*Config, error) {
122124
return nil, ErrorTelemetryIntervalTooSmall
123125
}
124126

127+
// If unset, default sampling rate to 1
128+
if config.DataSamplingRate == 0 {
129+
log.Printf("dataSamplingRate is not set, defaulting to %v", DefaultSamplingRate)
130+
config.DataSamplingRate = DefaultSamplingRate
131+
}
132+
125133
return &config, nil
126134
}
127135

pkg/config/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ func TestGetConfig(t *testing.T) {
2828
c.RemoteContext ||
2929
c.EnableAnnotations ||
3030
c.TelemetryInterval != 15*time.Minute ||
31-
c.DataAggregationLevel != Low {
31+
c.DataAggregationLevel != Low ||
32+
c.DataSamplingRate != 1 {
3233
t.Errorf("Expeted config should be same as ./testwith/config.yaml; instead got %+v", c)
3334
}
3435
}
@@ -65,6 +66,5 @@ func TestDecodeLevelHook(t *testing.T) {
6566
result, err := decodeLevelHook(reflect.TypeOf(test.input), reflect.TypeOf(Level(0)), test.input)
6667
require.NoError(t, err)
6768
assert.Equal(t, test.expected, result)
68-
6969
}
7070
}

pkg/plugin/conntrack/_cprog/conntrack.c

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) {
233233
* @arg key The key to be used to create the new connection.
234234
* @arg observation_point The point in the network stack where the packet is observed.
235235
* @arg is_reply true if the packet is a SYN-ACK packet. False if it is a SYN packet.
236+
* @arg sampled Whether or not the packet was sampled for reporting.
236237
*/
237-
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point, bool is_reply) {
238+
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point, bool is_reply, bool sampled) {
238239
struct ct_entry new_value;
239240
__builtin_memset(&new_value, 0, sizeof(struct ct_entry));
240241
__u64 now = bpf_mono_now();
@@ -245,14 +246,20 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru
245246
new_value.eviction_time = now + CT_SYN_TIMEOUT;
246247
if(is_reply) {
247248
new_value.flags_seen_rx_dir = p->flags;
248-
new_value.last_report_rx_dir = now;
249-
new_value.bytes_seen_since_last_report_rx_dir = 0;
250-
new_value.packets_seen_since_last_report_rx_dir = 0;
249+
new_value.last_report_rx_dir = sampled ? now : 0;
250+
new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0;
251+
new_value.packets_seen_since_last_report_rx_dir = !sampled;
252+
if (!sampled) {
253+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir);
254+
}
251255
} else {
252256
new_value.flags_seen_tx_dir = p->flags;
253-
new_value.last_report_tx_dir = now;
254-
new_value.bytes_seen_since_last_report_tx_dir = 0;
255-
new_value.packets_seen_since_last_report_tx_dir = 0;
257+
new_value.last_report_tx_dir = sampled ? now : 0;
258+
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
259+
new_value.packets_seen_since_last_report_tx_dir = !sampled;
260+
if (!sampled) {
261+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir);
262+
}
256263
}
257264
new_value.is_direction_unknown = false;
258265
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
@@ -273,16 +280,17 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru
273280
p->is_reply = is_reply;
274281
p->traffic_direction = new_value.traffic_direction;
275282
bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY);
276-
return true;
283+
return sampled;
277284
}
278285

279286
/**
280287
* Create a new UDP connection.
281288
* @arg *p pointer to the packet to be processed.
282289
* @arg key The key to be used to create the new connection.
283290
* @arg observation_point The point in the network stack where the packet is observed.
291+
* @arg sampled Whether or not the packet was sampled for reporting.
284292
*/
285-
static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point) {
293+
static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point, bool sampled) {
286294
struct ct_entry new_value;
287295
__builtin_memset(&new_value, 0, sizeof(struct ct_entry));
288296
__u64 now = bpf_mono_now();
@@ -292,9 +300,9 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
292300
}
293301
new_value.eviction_time = now + CT_CONNECTION_LIFETIME_NONTCP;
294302
new_value.flags_seen_tx_dir = p->flags;
295-
new_value.last_report_tx_dir = now;
296-
new_value.bytes_seen_since_last_report_tx_dir = 0;
297-
new_value.packets_seen_since_last_report_tx_dir = 0;
303+
new_value.last_report_tx_dir = sampled ? now : 0;
304+
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
305+
new_value.packets_seen_since_last_report_tx_dir = !sampled;
298306
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
299307
#ifdef ENABLE_CONNTRACK_METRICS
300308
new_value.conntrack_metadata.packets_tx_count = 1;
@@ -307,7 +315,7 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
307315
p->is_reply = false;
308316
p->traffic_direction = new_value.traffic_direction;
309317
bpf_map_update_elem(&retina_conntrack, &key, &new_value, BPF_ANY);
310-
return true;
318+
return sampled;
311319
}
312320

313321
/**
@@ -316,15 +324,16 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
316324
* @arg key The key to be used to handle the connection.
317325
* @arg reverse_key The reverse key to be used to handle the connection.
318326
* @arg observation_point The point in the network stack where the packet is observed.
327+
* @arg sampled Whether or not the packet was sampled for reporting.
319328
*/
320-
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point) {
329+
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point, bool sampled) {
321330
u8 tcp_handshake = p->flags & (TCP_SYN|TCP_ACK);
322331
if (tcp_handshake == TCP_SYN) {
323332
// We have a SYN, we set `is_reply` to false and we provide `key`
324-
return _ct_create_new_tcp_connection(p, key, observation_point, false);
333+
return _ct_create_new_tcp_connection(p, key, observation_point, false, sampled);
325334
} else if(tcp_handshake == TCP_SYN|TCP_ACK) {
326335
// We have a SYN-ACK, we set `is_reply` to true and we provide `reverse_key`
327-
return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true);
336+
return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true, sampled);
328337
}
329338

330339
// The packet is not a SYN packet and the connection corresponding to this packet is not found.
@@ -347,9 +356,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
347356
if (p->flags & TCP_ACK) {
348357
p->is_reply = true;
349358
new_value.flags_seen_rx_dir = p->flags;
350-
new_value.last_report_rx_dir = now;
351-
new_value.bytes_seen_since_last_report_rx_dir = 0;
352-
new_value.packets_seen_since_last_report_rx_dir = 0;
359+
new_value.last_report_rx_dir = sampled ? now : 0;
360+
new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0;
361+
new_value.packets_seen_since_last_report_rx_dir = !sampled;
362+
if (!sampled) {
363+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir);
364+
}
353365
#ifdef ENABLE_CONNTRACK_METRICS
354366
new_value.conntrack_metadata.bytes_rx_count = p->bytes;
355367
new_value.conntrack_metadata.packets_rx_count = 1;
@@ -358,9 +370,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
358370
} else { // Otherwise, the packet is considered as a packet in the send direction.
359371
p->is_reply = false;
360372
new_value.flags_seen_tx_dir = p->flags;
361-
new_value.last_report_tx_dir = now;
362-
new_value.bytes_seen_since_last_report_tx_dir = 0;
363-
new_value.packets_seen_since_last_report_tx_dir = 0;
373+
new_value.last_report_tx_dir = sampled ? now : 0;
374+
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
375+
new_value.packets_seen_since_last_report_tx_dir = !sampled;
376+
if (!sampled) {
377+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir);
378+
}
364379
#ifdef ENABLE_CONNTRACK_METRICS
365380
new_value.conntrack_metadata.bytes_tx_count = p->bytes;
366381
new_value.conntrack_metadata.packets_tx_count = 1;
@@ -371,7 +386,7 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
371386
// Update packet's conntrack metadata.
372387
__builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));
373388
#endif // ENABLE_CONNTRACK_METRICS
374-
return true;
389+
return sampled;
375390
}
376391

377392
/**
@@ -380,14 +395,15 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
380395
* @arg key The key to be used to handle the connection.
381396
* @arg reverse_key The reverse key to be used to handle the connection.
382397
* @arg observation_point The point in the network stack where the packet is observed.
398+
* @arg sampled Whether or not the packet was sampled for reporting.
383399
*/
384-
static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point) {
400+
static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point, bool sampled) {
385401
struct packetreport report;
386402
__builtin_memset(&report, 0, sizeof(struct packetreport));
387403
if (key.proto & IPPROTO_TCP) {
388-
report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point);
404+
report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point, sampled);
389405
} else if (key.proto & IPPROTO_UDP) {
390-
report.report = _ct_handle_udp_connection(p, key, observation_point);
406+
report.report = _ct_handle_udp_connection(p, key, observation_point, sampled);
391407
} else {
392408
report.report = false; // We are not interested in other protocols.
393409
}
@@ -401,9 +417,10 @@ static __always_inline struct packetreport _ct_handle_new_connection(struct pack
401417
* @arg flags The flags of the packet.
402418
* @arg direction The direction of the packet in relation to the connection.
403419
* @arg bytes The size of the packet in bytes.
420+
* @arg sampled Whether or not the packet was sampled for reporting.
404421
* Returns a packetreport struct representing if the packet should be reported to userspace.
405422
*/
406-
static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes) {
423+
static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes, bool sampled) {
407424
struct packetreport report;
408425
__builtin_memset(&report, 0, sizeof(struct packetreport));
409426
report.report = false;
@@ -513,21 +530,27 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4
513530
WRITE_ONCE(entry->eviction_time, now + CT_CONNECTION_LIFETIME_NONTCP);
514531
}
515532

533+
if (flags != seen_flags) {
534+
if (direction == CT_PACKET_DIR_TX) {
535+
WRITE_ONCE(entry->flags_seen_tx_dir, flags);
536+
} else {
537+
WRITE_ONCE(entry->flags_seen_rx_dir, flags);
538+
}
539+
}
540+
516541
// Report if:
517542
// 1. We already decided to report based on protocol-specific rules, or
518-
// 2. New flags have appeared, or
543+
// 2. New flags have appeared and the packet has been sampled, or
519544
// 3. Reporting interval has elapsed
520-
if (should_report || flags != seen_flags || now - last_report >= CT_REPORT_INTERVAL) {
545+
if (should_report || (sampled && flags != seen_flags) || now - last_report >= CT_REPORT_INTERVAL) {
521546
report.report = true;
522547
// Update the connection's state
523548
if (direction == CT_PACKET_DIR_TX) {
524-
WRITE_ONCE(entry->flags_seen_tx_dir, flags);
525549
WRITE_ONCE(entry->last_report_tx_dir, now);
526550
WRITE_ONCE(entry->bytes_seen_since_last_report_tx_dir, 0);
527551
WRITE_ONCE(entry->packets_seen_since_last_report_tx_dir, 0);
528552
__builtin_memset(&entry->flags_seen_since_last_report_tx_dir, 0, sizeof(struct tcpflagscount));
529553
} else {
530-
WRITE_ONCE(entry->flags_seen_rx_dir, flags);
531554
WRITE_ONCE(entry->last_report_rx_dir, now);
532555
WRITE_ONCE(entry->bytes_seen_since_last_report_rx_dir, 0);
533556
WRITE_ONCE(entry->packets_seen_since_last_report_rx_dir, 0);
@@ -556,9 +579,10 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4
556579
* Process a packet and update the connection tracking map.
557580
* @arg *p pointer to the packet to be processed.
558581
* @arg observation_point The point in the network stack where the packet is observed.
582+
* @arg sampled Whether or not the packet has been sampled for reporting.
559583
* Returns a packetreport struct representing if the packet should be reported to userspace.
560584
*/
561-
static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point) {
585+
static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point, bool sampled) {
562586
if (!p) {
563587
struct packetreport report;
564588
__builtin_memset(&report, 0, sizeof(struct packetreport));
@@ -592,7 +616,7 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa
592616
// Update packet's conntract metadata.
593617
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
594618
#endif // ENABLE_CONNTRACK_METRICS
595-
return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes);
619+
return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes, sampled);
596620
}
597621

598622
// The connection is not found in the send direction. Check the reply direction by reversing the key.
@@ -614,9 +638,9 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa
614638
// Update packet's conntract metadata.
615639
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
616640
#endif // ENABLE_CONNTRACK_METRICS
617-
return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes);
641+
return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes, sampled);
618642
}
619643

620644
// If the connection is still not found, the connection is new.
621-
return _ct_handle_new_connection(p, key, reverse_key, observation_point);
622-
}
645+
return _ct_handle_new_connection(p, key, reverse_key, observation_point, sampled);
646+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
#define BYPASS_LOOKUP_IP_OF_INTEREST 0
22
#define DATA_AGGREGATION_LEVEL 0
3+
#define DATA_SAMPLING_RATE 1

pkg/plugin/packetparser/_cprog/packetparser.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,24 @@ static void parse(struct __sk_buff *skb, __u8 obs)
208208
p.conntrack_metadata = conntrack_metadata;
209209
#endif // ENABLE_CONNTRACK_METRICS
210210

211+
#ifdef DATA_AGGREGATION_LEVEL
212+
213+
// Calculate sampling
214+
bool sampled __attribute__((unused));
215+
sampled = true;
216+
217+
#ifdef DATA_SAMPLING_RATE
218+
u32 rand __attribute__((unused));
219+
rand = bpf_get_prandom_u32();
220+
if (rand >= UINT32_MAX / DATA_SAMPLING_RATE) {
221+
sampled = false;
222+
}
223+
#endif
224+
211225
// Process the packet in ct
212226
struct packetreport report __attribute__((unused));
213-
report = ct_process_packet(&p, obs);
227+
report = ct_process_packet(&p, obs, sampled);
214228

215-
#ifdef DATA_AGGREGATION_LEVEL
216229
// If the data aggregation level is low, always send the packet to the perf buffer.
217230
#if DATA_AGGREGATION_LEVEL == DATA_AGGREGATION_LEVEL_LOW
218231
p.previously_observed_packets = 0;

pkg/plugin/packetparser/packetparser_linux.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func (p *packetParser) Generate(ctx context.Context) error {
117117
p.l.Info("data aggregation level", zap.String("level", p.cfg.DataAggregationLevel.String()))
118118
st += fmt.Sprintf("#define DATA_AGGREGATION_LEVEL %d\n", p.cfg.DataAggregationLevel)
119119

120+
// Process packetparser sampling rate.
121+
p.l.Info("sampling rate", zap.Uint32("rate", p.cfg.DataSamplingRate))
122+
st += fmt.Sprintf("#define DATA_SAMPLING_RATE %d\n", p.cfg.DataSamplingRate)
123+
120124
// Generate dynamic header for packetparser.
121125
err = loader.WriteFile(ctx, dynamicHeaderPath, st)
122126
if err != nil {

0 commit comments

Comments
 (0)