Skip to content

Commit f4a818e

Browse files
authored
feat(packetparser): Allow sampling of packets (#1767)
# Description This PR allows for optional sampling of packet reporting when in high data aggregation level for `packetparser`. By default, all packets are reported but optionally `1 out of n` packets are sampled by random chance with the exception of certain important control flags or when hitting the reporting interval. This allows Retina to scale to high network volume environments at the trade-off of some reporting granularity. The performance impact of this is mostly for workloads with lots of new connections, connections already tracked in the conntrack table rely on #1665 for scalability. The behavior added in #1665 allows for accurate reporting of metrics despite sampling being in place. ## Related Issue #1760 ## Checklist - [X] I have read the [contributing documentation](https://retina.sh/docs/Contributing/overview). - [X] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [X] I have correctly attributed the author(s) of the code. - [X] I have tested the changes locally. - [X] I have followed the project's style guidelines. - [X] I have updated the documentation, if necessary. - [X] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed ## Main <img width="1487" height="860" alt="Screenshot 2025-07-22 at 4 51 24 PM" src="https://github.com/user-attachments/assets/72bc7b42-b280-4d10-aa7b-d114b460cd73" /> ## After the change (with default sampling rate of 1) <img width="1487" height="860" alt="Screenshot 2025-07-22 at 4 57 36 PM" src="https://github.com/user-attachments/assets/6c115205-3068-4e97-ac51-9980c088890d" /> ## After the change (with sampling rate of 1000) <img width="1487" height="856" alt="Screenshot 2025-07-22 at 5 04 22 PM" src="https://github.com/user-attachments/assets/b5e6cd5e-9c44-446f-bc1d-996044820f16" /> --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project. Signed-off-by: Matthew McKeen <[email protected]>
1 parent bd91072 commit f4a818e

File tree

10 files changed

+102
-41
lines changed

10 files changed

+102
-41
lines changed

deploy/standard/manifests/controller/helm/retina/templates/configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ data:
2525
bypassLookupIPOfInterest: {{ .Values.bypassLookupIPOfInterest }}
2626
dataAggregationLevel: {{ .Values.dataAggregationLevel }}
2727
telemetryInterval: {{ .Values.daemonset.telemetryInterval }}
28+
dataSamplingRate: {{ .Values.dataSamplingRate }}
2829
{{- end}}
2930
---
3031
{{- if .Values.os.windows}}

deploy/standard/manifests/controller/helm/retina/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ remoteContext: false
5656
enableAnnotations: false
5757
bypassLookupIPOfInterest: false
5858
dataAggregationLevel: "low"
59+
dataSamplingRate: 1
5960

6061
imagePullSecrets: []
6162
nameOverride: "retina"

docs/02-Installation/03-Config.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ Apply to both Agent and Operator.
5353
* `enableAnnotations`: Enables gathering of metrics for annotated resources. Resources can be annotated with `retina.sh=observe`. Requires the operator and `operator.enableRetinaEndpoint` to be enabled. By enabling annotations, the agent will not use MetricsConfiguration CRD.
5454
* `bypassLookupIPOfInterest`: If true, plugins like `packetparser` and `dropreason` will bypass IP lookup, generating an event for each packet regardless. `enableAnnotations` will not work if this is true.
5555
* `dataAggregationLevel`: Defines the level of data aggregation for Retina. See [Data Aggregation](../05-Concepts/data-aggregation.md) for more details.
56+
* `dataSamplingRate`: Defines the data sampling rate for `packetparser`. See [Sampling](../03-Metrics/plugins/Linux/packetparser.md#sampling) for more details.
5657

5758
## Operator Configuration
5859

docs/03-Metrics/plugins/Linux/packetparser.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ The `packetparser` plugin requires the `CAP_NET_ADMIN` and `CAP_SYS_ADMIN` capab
1515

1616
`packetparser` does not produce Basic metrics. In Advanced mode (refer to [Metric Modes](../../modes/modes.md)), the plugin transforms an eBPF result into an enriched `Flow` by adding Pod information based on IP. It then sends the `Flow` to an external channel, enabling *several modules* to generate Pod-Level metrics.
1717

18+
## Sampling
19+
20+
Since `packetparser` produces many enriched `Flow` objects it can be quite expensive for user space to process. Thus, when operating in `high` [data aggregation](../../../05-Concepts/data-aggregation.md) level optional sampling for reported packets is available via the `dataSamplingRate` configuration option.
21+
22+
`dataSamplingRate` is expressed in 1 out of N terms, where N is the `dataSamplingRate` value. For example, if `dataSamplingRate` is 3 1/3rd of packets will be sampled for reporting.
23+
24+
Keep in mind that there are cases where reporting will happen anyways as to ensure metric accuracy.
25+
1826
### Code locations
1927

2028
- Plugin and eBPF code: *pkg/plugin/packetparser/*

pkg/config/config.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ const (
2424
)
2525

2626
var (
27-
ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval)
28-
DefaultTelemetryInterval = 15 * time.Minute
27+
ErrorTelemetryIntervalTooSmall = fmt.Errorf("telemetryInterval smaller than %v is not allowed", MinTelemetryInterval)
28+
DefaultTelemetryInterval = 15 * time.Minute
29+
DefaultSamplingRate uint32 = 1
2930
)
3031

3132
func (l *Level) UnmarshalText(text []byte) error {
@@ -75,6 +76,7 @@ type Config struct {
7576
DataAggregationLevel Level `yaml:"dataAggregationLevel"`
7677
MonitorSockPath string `yaml:"monitorSockPath"`
7778
TelemetryInterval time.Duration `yaml:"telemetryInterval"`
79+
DataSamplingRate uint32 `yaml:"dataSamplingRate"`
7880
}
7981

8082
func GetConfig(cfgFilename string) (*Config, error) {
@@ -122,6 +124,12 @@ func GetConfig(cfgFilename string) (*Config, error) {
122124
return nil, ErrorTelemetryIntervalTooSmall
123125
}
124126

127+
// If unset, default sampling rate to 1
128+
if config.DataSamplingRate == 0 {
129+
log.Printf("dataSamplingRate is not set, defaulting to %v", DefaultSamplingRate)
130+
config.DataSamplingRate = DefaultSamplingRate
131+
}
132+
125133
return &config, nil
126134
}
127135

pkg/config/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ func TestGetConfig(t *testing.T) {
2828
c.RemoteContext ||
2929
c.EnableAnnotations ||
3030
c.TelemetryInterval != 15*time.Minute ||
31-
c.DataAggregationLevel != Low {
31+
c.DataAggregationLevel != Low ||
32+
c.DataSamplingRate != 1 {
3233
t.Errorf("Expeted config should be same as ./testwith/config.yaml; instead got %+v", c)
3334
}
3435
}
@@ -65,6 +66,5 @@ func TestDecodeLevelHook(t *testing.T) {
6566
result, err := decodeLevelHook(reflect.TypeOf(test.input), reflect.TypeOf(Level(0)), test.input)
6667
require.NoError(t, err)
6768
assert.Equal(t, test.expected, result)
68-
6969
}
7070
}

pkg/plugin/conntrack/_cprog/conntrack.c

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) {
233233
* @arg key The key to be used to create the new connection.
234234
* @arg observation_point The point in the network stack where the packet is observed.
235235
* @arg is_reply true if the packet is a SYN-ACK packet. False if it is a SYN packet.
236+
* @arg sampled Whether or not the packet was sampled for reporting.
236237
*/
237-
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool is_reply) {
238+
static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool is_reply, bool sampled) {
238239
struct ct_entry new_value;
239240
__builtin_memset(&new_value, 0, sizeof(struct ct_entry));
240241
__u64 now = bpf_mono_now();
@@ -245,14 +246,20 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru
245246
new_value.eviction_time = now + CT_SYN_TIMEOUT;
246247
if(is_reply) {
247248
new_value.flags_seen_rx_dir = p->flags;
248-
new_value.last_report_rx_dir = now;
249-
new_value.bytes_seen_since_last_report_rx_dir = 0;
250-
new_value.packets_seen_since_last_report_rx_dir = 0;
249+
new_value.last_report_rx_dir = sampled ? now : 0;
250+
new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0;
251+
new_value.packets_seen_since_last_report_rx_dir = !sampled;
252+
if (!sampled) {
253+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir);
254+
}
251255
} else {
252256
new_value.flags_seen_tx_dir = p->flags;
253-
new_value.last_report_tx_dir = now;
254-
new_value.bytes_seen_since_last_report_tx_dir = 0;
255-
new_value.packets_seen_since_last_report_tx_dir = 0;
257+
new_value.last_report_tx_dir = sampled ? now : 0;
258+
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
259+
new_value.packets_seen_since_last_report_tx_dir = !sampled;
260+
if (!sampled) {
261+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir);
262+
}
256263
}
257264
new_value.is_direction_unknown = false;
258265
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
@@ -273,16 +280,17 @@ static __always_inline bool _ct_create_new_tcp_connection(struct packet *p, stru
273280
p->is_reply = is_reply;
274281
p->traffic_direction = new_value.traffic_direction;
275282
bpf_map_update_elem(&retina_conntrack, key, &new_value, BPF_ANY);
276-
return true;
283+
return sampled;
277284
}
278285

279286
/**
280287
* Create a new UDP connection.
281288
* @arg *p pointer to the packet to be processed.
282289
* @arg key The key to be used to create the new connection.
283290
* @arg observation_point The point in the network stack where the packet is observed.
291+
* @arg sampled Whether or not the packet was sampled for reporting.
284292
*/
285-
static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point) {
293+
static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key *key, __u8 observation_point, bool sampled) {
286294
if (!p || !key) {
287295
return false;
288296
}
@@ -295,9 +303,9 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
295303
}
296304
new_value.eviction_time = now + CT_CONNECTION_LIFETIME_NONTCP;
297305
new_value.flags_seen_tx_dir = p->flags;
298-
new_value.last_report_tx_dir = now;
299-
new_value.bytes_seen_since_last_report_tx_dir = 0;
300-
new_value.packets_seen_since_last_report_tx_dir = 0;
306+
new_value.last_report_tx_dir = sampled ? now : 0;
307+
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
308+
new_value.packets_seen_since_last_report_tx_dir = !sampled;
301309
new_value.traffic_direction = _ct_get_traffic_direction(observation_point);
302310
#ifdef ENABLE_CONNTRACK_METRICS
303311
new_value.conntrack_metadata.packets_tx_count = 1;
@@ -310,7 +318,7 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
310318
p->is_reply = false;
311319
p->traffic_direction = new_value.traffic_direction;
312320
bpf_map_update_elem(&retina_conntrack, key, &new_value, BPF_ANY);
313-
return true;
321+
return sampled;
314322
}
315323

316324
/**
@@ -319,18 +327,19 @@ static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct c
319327
* @arg key The key to be used to handle the connection.
320328
* @arg reverse_key The reverse key to be used to handle the connection.
321329
* @arg observation_point The point in the network stack where the packet is observed.
330+
* @arg sampled Whether or not the packet was sampled for reporting.
322331
*/
323-
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point) {
332+
static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point, bool sampled) {
324333
if (!p || !key || !reverse_key) {
325334
return false;
326335
}
327336
u8 tcp_handshake = p->flags & (TCP_SYN|TCP_ACK);
328337
if (tcp_handshake == TCP_SYN) {
329338
// We have a SYN, we set `is_reply` to false and we provide `key`
330-
return _ct_create_new_tcp_connection(p, key, observation_point, false);
339+
return _ct_create_new_tcp_connection(p, key, observation_point, false, sampled);
331340
} else if(tcp_handshake == (TCP_SYN|TCP_ACK)) {
332341
// We have a SYN-ACK, we set `is_reply` to true and we provide `reverse_key`
333-
return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true);
342+
return _ct_create_new_tcp_connection(p, reverse_key, observation_point, true, sampled);
334343
}
335344

336345
// The packet is not a SYN packet and the connection corresponding to this packet is not found.
@@ -353,9 +362,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
353362
if (p->flags & TCP_ACK) {
354363
p->is_reply = true;
355364
new_value.flags_seen_rx_dir = p->flags;
356-
new_value.last_report_rx_dir = now;
357-
new_value.bytes_seen_since_last_report_rx_dir = 0;
358-
new_value.packets_seen_since_last_report_rx_dir = 0;
365+
new_value.last_report_rx_dir = sampled ? now : 0;
366+
new_value.bytes_seen_since_last_report_rx_dir = !sampled ? p->bytes : 0;
367+
new_value.packets_seen_since_last_report_rx_dir = !sampled;
368+
if (!sampled) {
369+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_rx_dir);
370+
}
359371
#ifdef ENABLE_CONNTRACK_METRICS
360372
new_value.conntrack_metadata.bytes_rx_count = p->bytes;
361373
new_value.conntrack_metadata.packets_rx_count = 1;
@@ -364,9 +376,12 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
364376
} else { // Otherwise, the packet is considered as a packet in the send direction.
365377
p->is_reply = false;
366378
new_value.flags_seen_tx_dir = p->flags;
367-
new_value.last_report_tx_dir = now;
368-
new_value.bytes_seen_since_last_report_tx_dir = 0;
369-
new_value.packets_seen_since_last_report_tx_dir = 0;
379+
new_value.last_report_tx_dir = sampled ? now : 0;
380+
new_value.bytes_seen_since_last_report_tx_dir = !sampled ? p->bytes : 0;
381+
new_value.packets_seen_since_last_report_tx_dir = !sampled;
382+
if (!sampled) {
383+
_ct_record_tcp_flags(p->flags, &new_value.flags_seen_since_last_report_tx_dir);
384+
}
370385
#ifdef ENABLE_CONNTRACK_METRICS
371386
new_value.conntrack_metadata.bytes_tx_count = p->bytes;
372387
new_value.conntrack_metadata.packets_tx_count = 1;
@@ -377,7 +392,7 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
377392
// Update packet's conntrack metadata.
378393
__builtin_memcpy(&p->conntrack_metadata, &new_value.conntrack_metadata, sizeof(struct conntrackmetadata));
379394
#endif // ENABLE_CONNTRACK_METRICS
380-
return true;
395+
return sampled;
381396
}
382397

383398
/**
@@ -386,17 +401,18 @@ static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct c
386401
* @arg key The key to be used to handle the connection.
387402
* @arg reverse_key The reverse key to be used to handle the connection.
388403
* @arg observation_point The point in the network stack where the packet is observed.
404+
* @arg sampled Whether or not the packet was sampled for reporting.
389405
*/
390-
static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point) {
406+
static __always_inline struct packetreport _ct_handle_new_connection(struct packet *p, struct ct_v4_key *key, struct ct_v4_key *reverse_key, __u8 observation_point, bool sampled) {
391407
struct packetreport report;
392408
__builtin_memset(&report, 0, sizeof(struct packetreport));
393409
if (!p || !key || !reverse_key) {
394410
return report;
395411
}
396412
if (key->proto & IPPROTO_TCP) {
397-
report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point);
413+
report.report = _ct_handle_tcp_connection(p, key, reverse_key, observation_point, sampled);
398414
} else if (key->proto & IPPROTO_UDP) {
399-
report.report = _ct_handle_udp_connection(p, key, observation_point);
415+
report.report = _ct_handle_udp_connection(p, key, observation_point, sampled);
400416
} else {
401417
report.report = false; // We are not interested in other protocols.
402418
}
@@ -410,9 +426,10 @@ static __always_inline struct packetreport _ct_handle_new_connection(struct pack
410426
* @arg flags The flags of the packet.
411427
* @arg direction The direction of the packet in relation to the connection.
412428
* @arg bytes The size of the packet in bytes.
429+
* @arg sampled Whether or not the packet was sampled for reporting.
413430
* Returns a packetreport struct representing if the packet should be reported to userspace.
414431
*/
415-
static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes) {
432+
static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4_key *key, struct ct_entry *entry, __u8 flags, __u8 direction, __u32 bytes, bool sampled) {
416433
struct packetreport report;
417434
__builtin_memset(&report, 0, sizeof(struct packetreport));
418435
report.report = false;
@@ -522,21 +539,27 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4
522539
WRITE_ONCE(entry->eviction_time, now + CT_CONNECTION_LIFETIME_NONTCP);
523540
}
524541

542+
if (flags != seen_flags) {
543+
if (direction == CT_PACKET_DIR_TX) {
544+
WRITE_ONCE(entry->flags_seen_tx_dir, flags);
545+
} else {
546+
WRITE_ONCE(entry->flags_seen_rx_dir, flags);
547+
}
548+
}
549+
525550
// Report if:
526551
// 1. We already decided to report based on protocol-specific rules, or
527-
// 2. New flags have appeared, or
552+
// 2. New flags have appeared and the packet has been sampled, or
528553
// 3. Reporting interval has elapsed
529-
if (should_report || flags != seen_flags || now - last_report >= CT_REPORT_INTERVAL) {
554+
if (should_report || (sampled && flags != seen_flags) || now - last_report >= CT_REPORT_INTERVAL) {
530555
report.report = true;
531556
// Update the connection's state
532557
if (direction == CT_PACKET_DIR_TX) {
533-
WRITE_ONCE(entry->flags_seen_tx_dir, flags);
534558
WRITE_ONCE(entry->last_report_tx_dir, now);
535559
WRITE_ONCE(entry->bytes_seen_since_last_report_tx_dir, 0);
536560
WRITE_ONCE(entry->packets_seen_since_last_report_tx_dir, 0);
537561
__builtin_memset(&entry->flags_seen_since_last_report_tx_dir, 0, sizeof(struct tcpflagscount));
538562
} else {
539-
WRITE_ONCE(entry->flags_seen_rx_dir, flags);
540563
WRITE_ONCE(entry->last_report_rx_dir, now);
541564
WRITE_ONCE(entry->bytes_seen_since_last_report_rx_dir, 0);
542565
WRITE_ONCE(entry->packets_seen_since_last_report_rx_dir, 0);
@@ -565,9 +588,10 @@ static __always_inline struct packetreport _ct_should_report_packet(struct ct_v4
565588
* Process a packet and update the connection tracking map.
566589
* @arg *p pointer to the packet to be processed.
567590
* @arg observation_point The point in the network stack where the packet is observed.
591+
* @arg sampled Whether or not the packet has been sampled for reporting.
568592
* Returns a packetreport struct representing if the packet should be reported to userspace.
569593
*/
570-
static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point) {
594+
static __always_inline __attribute__((unused)) struct packetreport ct_process_packet(struct packet *p, __u8 observation_point, bool sampled) {
571595
if (!p) {
572596
struct packetreport report;
573597
__builtin_memset(&report, 0, sizeof(struct packetreport));
@@ -601,7 +625,7 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa
601625
// Update packet's conntract metadata.
602626
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
603627
#endif // ENABLE_CONNTRACK_METRICS
604-
return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes);
628+
return _ct_should_report_packet(&key, entry, p->flags, CT_PACKET_DIR_TX, p->bytes, sampled);
605629
}
606630

607631
// The connection is not found in the send direction. Check the reply direction by reversing the key.
@@ -623,9 +647,9 @@ static __always_inline __attribute__((unused)) struct packetreport ct_process_pa
623647
// Update packet's conntract metadata.
624648
__builtin_memcpy(&p->conntrack_metadata, &entry->conntrack_metadata, sizeof(struct conntrackmetadata));
625649
#endif // ENABLE_CONNTRACK_METRICS
626-
return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes);
650+
return _ct_should_report_packet(&reverse_key, entry, p->flags, CT_PACKET_DIR_RX, p->bytes, sampled);
627651
}
628652

629653
// If the connection is still not found, the connection is new.
630-
return _ct_handle_new_connection(p, &key, &reverse_key, observation_point);
654+
return _ct_handle_new_connection(p, &key, &reverse_key, observation_point, sampled);
631655
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
#define BYPASS_LOOKUP_IP_OF_INTEREST 0
22
#define DATA_AGGREGATION_LEVEL 0
3+
#define DATA_SAMPLING_RATE 1

pkg/plugin/packetparser/_cprog/packetparser.c

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,24 @@ static void parse(struct __sk_buff *skb, __u8 obs)
208208
p.conntrack_metadata = conntrack_metadata;
209209
#endif // ENABLE_CONNTRACK_METRICS
210210

211+
#ifdef DATA_AGGREGATION_LEVEL
212+
213+
// Calculate sampling
214+
bool sampled __attribute__((unused));
215+
sampled = true;
216+
217+
#ifdef DATA_SAMPLING_RATE
218+
u32 rand __attribute__((unused));
219+
rand = bpf_get_prandom_u32();
220+
if (rand >= UINT32_MAX / DATA_SAMPLING_RATE) {
221+
sampled = false;
222+
}
223+
#endif
224+
211225
// Process the packet in ct
212226
struct packetreport report __attribute__((unused));
213-
report = ct_process_packet(&p, obs);
227+
report = ct_process_packet(&p, obs, sampled);
214228

215-
#ifdef DATA_AGGREGATION_LEVEL
216229
// If the data aggregation level is low, always send the packet to the perf buffer.
217230
#if DATA_AGGREGATION_LEVEL == DATA_AGGREGATION_LEVEL_LOW
218231
p.previously_observed_packets = 0;

pkg/plugin/packetparser/packetparser_linux.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func (p *packetParser) Generate(ctx context.Context) error {
117117
p.l.Info("data aggregation level", zap.String("level", p.cfg.DataAggregationLevel.String()))
118118
st += fmt.Sprintf("#define DATA_AGGREGATION_LEVEL %d\n", p.cfg.DataAggregationLevel)
119119

120+
// Process packetparser sampling rate.
121+
p.l.Info("sampling rate", zap.Uint32("rate", p.cfg.DataSamplingRate))
122+
st += fmt.Sprintf("#define DATA_SAMPLING_RATE %d\n", p.cfg.DataSamplingRate)
123+
120124
// Generate dynamic header for packetparser.
121125
err = loader.WriteFile(ctx, dynamicHeaderPath, st)
122126
if err != nil {

0 commit comments

Comments
 (0)