Skip to content

Commit a52298b

Browse files
authored
fix: add enricher ip check (#1959)
# Description * created private function to create enricher (non singleton), to allow creation of enricher in new tests * public interface still creates a singleton (no logic change) * Add check if IP is nil to enricher * Add test * Refactor existing test ## Related Issue #1909 ## Checklist - [x] I have read the [contributing documentation](https://retina.sh/docs/Contributing/overview). - [x] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [x] I have correctly attributed the author(s) of the code. - [x] I have tested the changes locally. - [x] I have followed the project's style guidelines. - [x] I have updated the documentation, if necessary. - [x] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed Prometheus metrics: <img width="1854" height="937" alt="image" src="https://github.com/user-attachments/assets/dc547267-33ce-496f-958d-3fa683b5f0a0" /> Agent pod log: ```bash Defaulted container "retina" out of: retina, init-retina (init) Starting Retina Agent starting Retina daemon with legacy control plane v1.0.0-rc5-17-g2c1764d init client-go api server: https://lx-non-cil-lx-non-cilium-9b8218-51dc7ale.hcp.uksouth.azmk8s.io:443 init logger ts=2025-12-08T09:52:04.625Z level=info caller=metrics/metrics.go:198 msg="Metrics initialized" ts=2025-12-08T09:52:04.625Z level=info caller=standard/daemon.go:142 msg="{data aggregation level 15 0 low <nil>}" ts=2025-12-08T09:52:04.625Z level=info caller=standard/daemon.go:160 msg="telemetry disabled" ts=2025-12-08T09:52:04.625Z level=info caller=standard/daemon.go:178 msg="Remote context is disabled, only pods deployed on the same node as retina-agent will be monitored" ts=2025-12-08T09:52:04.625Z level=info caller=standard/daemon.go:199 msg="pod selector when remote context is disabled{pod selector 15 0 spec.nodeName=aks-nodepool1-24136058-vmss000000,status.podIP!=10.224.0.5 <nil>}" ts=2025-12-08T09:52:04.701Z level=info caller=standard/daemon.go:231 msg="Kubernetes server version: v1.33.5" ts=2025-12-08T09:52:04.701Z level=info caller=standard/daemon.go:253 msg="Initializing Pod controller" ts=2025-12-08T09:52:04.702Z level=info caller=pod/controller.go:89 msg="Setting up Pod controller" ts=2025-12-08T09:52:04.702Z level=info caller=standard/daemon.go:269 msg="Initializing Node controller" ts=2025-12-08T09:52:04.702Z level=info caller=node/controller.go:94 msg="Setting up Node controller" ts=2025-12-08T09:52:04.702Z level=info caller=standard/daemon.go:275 msg="Initializing Service controller" ts=2025-12-08T09:52:04.702Z level=info caller=service/controller.go:95 msg="Setting up Service controller" ts=2025-12-08T09:52:04.702Z level=info caller=standard/daemon.go:282 msg="Initializing MetricsConfig namespaceController" ts=2025-12-08T09:52:04.702Z level=info caller=namespace/namespace_controller.go:105 msg="Setting up Namespace controller" ts=2025-12-08T09:52:04.702Z level=info caller=pluginmanager/pluginmanager.go:55 msg="plugin manager has pod level enabled" ts=2025-12-08T09:52:04.702Z level=info caller=controllermanager/controllermanager.go:72 msg="Initializing controller manager ..." ts=2025-12-08T09:52:04.702Z level=info caller=servermanager/servermanager.go:33 msg="Initializing HTTP server ..." ts=2025-12-08T09:52:04.702Z level=info caller=server/server.go:42 msg="Setting up handlers" ts=2025-12-08T09:52:04.702Z level=info caller=server/server.go:57 msg="Completed handler setup" ts=2025-12-08T09:52:04.702Z level=info caller=servermanager/servermanager.go:37 msg="HTTP server initialized..." ts=2025-12-08T09:52:04.702Z level=info caller=standard/daemon.go:314 msg="Started controller manager" ts=2025-12-08T09:52:04.702Z level=info caller=servermanager/servermanager.go:42 msg="Starting HTTP server ..." host=0.0.0.0 port=10093 ts=2025-12-08T09:52:04.702Z level=info caller=server/server.go:69 msg="starting HTTP server... on " addr=0.0.0.0:10093 ts=2025-12-08T09:52:04.702Z level=info caller=pluginmanager/pluginmanager.go:118 msg="Starting plugin manager ..." ts=2025-12-08T09:52:04.702Z level=info caller=pluginmanager/pluginmanager.go:130 msg="starting watchers" ts=2025-12-08T09:52:04.702Z level=info caller=watchermanager/watchermanager.go:44 msg="watcher started" watcher_type=*endpoint.EndpointWatcher ts=2025-12-08T09:52:04.702Z level=info caller=watchermanager/watchermanager.go:44 msg="watcher started" watcher_type=*apiserver.ApiServerWatcher ts=2025-12-08T09:52:04.703Z level=info caller=server/server.go:208 msg="Starting metrics server" ts=2025-12-08T09:52:04.703Z level=info caller=manager/server.go:83 msg="starting server" name="health probe" addr=[::]:18081 ts=2025-12-08T09:52:04.703Z level=info caller=server/server.go:247 msg="Serving metrics server" bindAddress=:18080 secure=false ts=2025-12-08T09:52:04.703Z level=info caller=linuxutil/linuxutil_linux.go:48 msg="Initializing linuxutil plugin..." ts=2025-12-08T09:52:04.703Z level=info caller=pluginmanager/pluginmanager.go:110 msg="Reconciled plugin" name=linuxutil ts=2025-12-08T09:52:04.703Z level=info caller=dns/dns_linux.go:86 msg="Stopped dns plugin" ts=2025-12-08T09:52:04.703Z level=info caller=controller/controller.go:204 msg="Starting EventSource" controller=node controllerGroup= controllerKind=Node source="kind source: *v1.Node" ts=2025-12-08T09:52:04.703Z level=info caller=controller/controller.go:204 msg="Starting EventSource" controller=service controllerGroup= controllerKind=Service source="kind source: *v1.Service" ts=2025-12-08T09:52:04.703Z level=info caller=conntrack/conntrack_linux.go:106 msg="Starting Conntrack GC loop" ts=2025-12-08T09:52:04.703Z level=info caller=controller/controller.go:204 msg="Starting EventSource" controller=pod controllerGroup= controllerKind=Pod source="kind source: *v1.Pod" ts=2025-12-08T09:52:04.703Z level=info caller=controller/controller.go:204 msg="Starting EventSource" controller=namespace controllerGroup= controllerKind=Namespace source="kind source: *v1.Namespace" ts=2025-12-08T09:52:04.703Z level=info caller=pluginmanager/pluginmanager.go:167 msg="starting plugin linuxutil" ts=2025-12-08T09:52:04.703Z level=info caller=linuxutil/linuxutil_linux.go:63 msg="Running linuxutil plugin..." ts=2025-12-08T09:52:04.717Z level=info caller=dns/dns_linux.go:60 msg="Initialized dns plugin" ts=2025-12-08T09:52:04.717Z level=info caller=pluginmanager/pluginmanager.go:110 msg="Reconciled plugin" name=dns ts=2025-12-08T09:52:04.717Z level=info caller=packetparser/packetparser_linux.go:117 msg="data aggregation level" level=low ts=2025-12-08T09:52:04.717Z level=info caller=pluginmanager/pluginmanager.go:167 msg="starting plugin dns" ts=2025-12-08T09:52:04.717Z level=info caller=packetparser/packetparser_linux.go:125 msg="PacketParser header generated at" path=/go/src/github.com/microsoft/retina/pkg/plugin/packetparser/_cprog/dynamic.h ts=2025-12-08T09:52:04.803Z level=info caller=controller/controller.go:239 msg="Starting Controller" controller=pod controllerGroup= controllerKind=Pod ts=2025-12-08T09:52:04.803Z level=info caller=controller/controller.go:248 msg="Starting workers" controller=pod controllerGroup= controllerKind=Pod worker count=1 ts=2025-12-08T09:52:04.803Z level=info caller=controller/controller.go:239 msg="Starting Controller" controller=node controllerGroup= controllerKind=Node ts=2025-12-08T09:52:04.803Z level=info caller=controller/controller.go:248 msg="Starting workers" controller=node controllerGroup= controllerKind=Node worker count=1 ts=2025-12-08T09:52:04.803Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=kube-system/microsoft-defender-collector-ds-hrs4s ts=2025-12-08T09:52:04.804Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=kube-system/microsoft-defender-publisher-ds-49k77 ts=2025-12-08T09:52:04.804Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=kube-system/prometheus-kube-prometheus-operator-64cf9f887f-2grmj ts=2025-12-08T09:52:04.804Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=gatekeeper-system/gatekeeper-controller-f465c8d88-5ktwr ts=2025-12-08T09:52:04.804Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=kube-system/alertmanager-prometheus-kube-prometheus-alertmanager-0 ts=2025-12-08T09:52:04.804Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=kube-system/azure-policy-webhook-79f4f8b5b6-nvrrh ts=2025-12-08T09:52:04.804Z level=info caller=pod/controller.go:39 msg="Reconciling Pod" Pod=kube-system/metrics-server-5554f5bfbd-lxrtg ts=2025-12-08T09:52:04.816Z level=info caller=controller/controller.go:239 msg="Starting Controller" controller=namespace controllerGroup= controllerKind=Namespace ts=2025-12-08T09:52:04.816Z level=info caller=controller/controller.go:248 msg="Starting workers" controller=namespace controllerGroup= controllerKind=Namespace worker count=1 ts=2025-12-08T09:52:04.816Z level=info caller=controller/controller.go:239 msg="Starting Controller" controller=service controllerGroup= controllerKind=Service ts=2025-12-08T09:52:04.816Z level=info caller=controller/controller.go:248 msg="Starting workers" controller=service controllerGroup= controllerKind=Service worker count=1 ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:157 msg="Reconciling metric module" spec= specError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:346 msg="Appending namespaces to include list" namespaces= ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:361 msg="Current included namespaces" namespaces= namespacesError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:380 msg="Namespaces to add" namespaces= ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:381 msg="Namespaces to remove" namespaces= ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:346 msg="Appending namespaces to include list" namespaces= ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:361 msg="Current included namespaces" namespaces= namespacesError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:380 msg="Namespaces to add" namespaces= ts=2025-12-08T09:52:05.702Z level=info caller=metrics/metrics_module.go:381 msg="Namespaces to remove" namespaces= ts=2025-12-08T09:52:05.702Z level=info caller=metrics/forward.go:43 msg="Creating forward count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/forward.go:43 msg="Creating forward count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/drops.go:38 msg="Creating drop count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/drops.go:38 msg="Creating drop count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/tcpflags.go:37 msg="Creating TCP Flags count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/tcpretrans.go:37 msg="Creating TCP retransmit count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/dns.go:44 msg="Creating DNS count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/dns.go:44 msg="Creating DNS count metrics" options= optionsError="unsupported value type" ts=2025-12-08T09:52:05.702Z level=info caller=metrics/tcpretrans.go:57 msg="src labels" labels=direction,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.702Z level=info caller=metrics/dns.go:76 msg="src labels" labels=query_type,query,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.702Z level=info caller=metrics/dns.go:91 msg="src labels" labels=return_code,query_type,query,response,num_response,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.702Z level=info caller=metrics/forward.go:80 msg="src labels" labels=direction,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.702Z level=info caller=metrics/forward.go:80 msg="src labels" labels=direction,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.702Z level=info caller=metrics/drops.go:72 msg="src labels" labels=reason,direction,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.703Z level=info caller=metrics/drops.go:72 msg="src labels" labels=reason,direction,ip,namespace,podname,workload_kind,workload_name ts=2025-12-08T09:52:05.738Z level=info caller=packetparser/packetparser_linux.go:163 msg="PacketParser metric compiled" ts=2025-12-08T09:52:05.739Z level=info caller=packetparser/packetparser_linux.go:290 msg="Stopping packet parser" ts=2025-12-08T09:52:05.739Z level=info caller=packetparser/packetparser_linux.go:332 msg="Stopped packet parser" ts=2025-12-08T09:52:06.559Z level=info caller=common/common_linux.go:79 msg="perf reader created" Map=PerfEventArray(retina_packetparser_events)#35 PageSize=4096 BufferSize=131072 ts=2025-12-08T09:52:06.559Z level=info caller=pluginmanager/pluginmanager.go:110 msg="Reconciled plugin" name=packetparser ts=2025-12-08T09:52:06.559Z level=info caller=pluginmanager/pluginmanager.go:167 msg="starting plugin packetparser" ts=2025-12-08T09:52:06.559Z level=info caller=packetparser/packetparser_linux.go:239 msg="Starting packet parser" ts=2025-12-08T09:52:06.559Z level=info caller=packetparser/packetparser_linux.go:241 msg="setting up enricher since pod level is enabled" ts=2025-12-08T09:52:06.559Z level=info caller=packetparser/packetparser_linux.go:261 msg="Attaching bpf program to default interface of k8s Node in node namespace" ts=2025-12-08T09:52:06.560Z level=info caller=packetparser/packetparser_linux.go:272 msg="Attaching Packetparser" outgoingLink.Index=2 outgoingLink.Name=eth0 outgoingLink.HardwareAddr=7c:ed:8d:99:17:be ts=2025-12-08T09:52:06.724Z level=info caller=packetparser/packetparser_linux.go:543 msg="Started packet parser" ts=2025-12-08T09:52:07.534Z level=info caller=dropreason/dropreason_linux.go:125 msg="DropReason metric compiled" ts=2025-12-08T09:52:07.535Z level=info caller=dropreason/ebpfsetup_linux.go:69 msg="Distro check:" isMariner=false ts=2025-12-08T09:52:07.535Z level=info caller=dropreason/ebpfsetup_linux.go:78 msg="Detected kernel" version=5.15.0 ts=2025-12-08T09:52:07.535Z level=info caller=dropreason/ebpfsetup_linux.go:82 msg="Ftrace status" enabled=true ts=2025-12-08T09:52:07.922Z level=info caller=common/common_linux.go:79 msg="perf reader created" Map=PerfEventArray(retina_dropreason_events)#56 PageSize=4096 BufferSize=65536 ts=2025-12-08T09:52:08.120Z level=info caller=dropreason/ebpfsetup_linux.go:124 msg="Attached kprobe" program=__nf_conntrack_confirm ts=2025-12-08T09:52:08.146Z level=info caller=dropreason/ebpfsetup_linux.go:124 msg="Attached kprobe" program=inet_csk_accept ts=2025-12-08T09:52:08.232Z level=info caller=dropreason/ebpfsetup_linux.go:124 msg="Attached kprobe" program=nf_hook_slow ts=2025-12-08T09:52:08.321Z level=info caller=dropreason/ebpfsetup_linux.go:124 msg="Attached kprobe" program=nf_nat_inet_fn ts=2025-12-08T09:52:08.422Z level=info caller=dropreason/ebpfsetup_linux.go:137 msg="Attached kretprobe" program=nf_hook_slow ts=2025-12-08T09:52:08.457Z level=info caller=dropreason/ebpfsetup_linux.go:137 msg="Attached kretprobe" program=inet_csk_accept ts=2025-12-08T09:52:08.541Z level=info caller=dropreason/ebpfsetup_linux.go:137 msg="Attached kretprobe" program=tcp_v4_connect ts=2025-12-08T09:52:08.717Z level=info caller=dropreason/ebpfsetup_linux.go:137 msg="Attached kretprobe" program=nf_nat_inet_fn ts=2025-12-08T09:52:08.830Z level=info caller=dropreason/ebpfsetup_linux.go:137 msg="Attached kretprobe" program=__nf_conntrack_confirm ts=2025-12-08T09:52:08.831Z level=info caller=pluginmanager/pluginmanager.go:110 msg="Reconciled plugin" name=dropreason ts=2025-12-08T09:52:08.831Z level=info caller=pluginmanager/pluginmanager.go:167 msg="starting plugin dropreason" ts=2025-12-08T09:52:08.831Z level=info caller=dropreason/dropreason_linux.go:182 msg="Start listening for drop reason events..." ts=2025-12-08T09:52:08.831Z level=info caller=packetforward/packetforward_linux.go:104 msg="Packet forwarding metric header generated" ts=2025-12-08T09:52:08.831Z level=info caller=dropreason/dropreason_linux.go:188 msg="setting up enricher since pod level is enabled" ts=2025-12-08T09:52:09.538Z level=info caller=packetforward/packetforward_linux.go:132 msg="Packet forwarding metric compiled" ts=2025-12-08T09:52:09.539Z level=info caller=packetforward/packetforward_linux.go:173 msg="Packet forwarding metric initialized" ts=2025-12-08T09:52:09.539Z level=info caller=pluginmanager/pluginmanager.go:110 msg="Reconciled plugin" name=packetforward ts=2025-12-08T09:52:09.539Z level=info caller=pluginmanager/pluginmanager.go:173 msg="successfully started pluginmanager" ts=2025-12-08T09:52:09.539Z level=info caller=pluginmanager/pluginmanager.go:167 msg="starting plugin packetforward" ts=2025-12-08T09:52:09.539Z level=info caller=packetforward/packetforward_linux.go:178 msg="Start collecting packet forward metrics" ts=2025-12-08T09:52:34.743Z level=info caller=log/warning_handler.go:65 msg="v1 Endpoints is deprecated in v1.33+; use discovery.k8s.io/v1 EndpointSlice" ts=2025-12-08T09:52:34.747Z level=info caller=apiserver/apiserver.go:131 msg="New Apiserver IPs:" ip=4.250.221.129 ts=2025-12-08T09:52:34.747Z level=info caller=apiserver/apiserver.go:131 msg="New Apiserver IPs:" ip=10.0.0.1 ts=2025-12-08T09:53:04.713Z level=info caller=log/warning_handler.go:65 msg="v1 Endpoints is deprecated in v1.33+; use discovery.k8s.io/v1 EndpointSlice" ts=2025-12-08T09:53:34.716Z level=info caller=log/warning_handler.go:65 msg="v1 Endpoints is deprecated in v1.33+; use discovery.k8s.io/v1 EndpointSlice" ts=2025-12-08T09:54:04.715Z level=info caller=log/warning_handler.go:65 msg="v1 Endpoints is deprecated in v1.33+; use discovery.k8s.io/v1 EndpointSlice" ``` ## Additional Notes Add any additional notes or context about the pull request here. --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project. --------- Signed-off-by: Alex Castilio dos Santos <[email protected]>
1 parent 59c1655 commit a52298b

File tree

2 files changed

+159
-58
lines changed

2 files changed

+159
-58
lines changed

pkg/enricher/enricher.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,28 @@ type Enricher struct {
4040
outputRing *container.Ring
4141
}
4242

43-
func New(ctx context.Context, cache cache.CacheInterface) *Enricher {
43+
func New(ctx context.Context, c cache.CacheInterface) *Enricher {
4444
once.Do(func() {
45-
ir := container.NewRing(container.Capacity1023)
46-
e = &Enricher{
47-
ctx: ctx,
48-
l: log.Logger().Named("enricher"),
49-
cache: cache,
50-
inputRing: ir,
51-
Reader: container.NewRingReader(ir, ir.OldestWrite()),
52-
outputRing: container.NewRing(container.Capacity1023),
53-
}
54-
initialized = true
45+
e = newEnricher(ctx, c)
5546
})
5647

5748
return e
5849
}
5950

51+
func newEnricher(ctx context.Context, c cache.CacheInterface) *Enricher {
52+
ir := container.NewRing(container.Capacity1023)
53+
enricher := &Enricher{
54+
ctx: ctx,
55+
l: log.Logger().Named("enricher"),
56+
cache: c,
57+
inputRing: ir,
58+
Reader: container.NewRingReader(ir, ir.OldestWrite()),
59+
outputRing: container.NewRing(container.Capacity1023),
60+
}
61+
initialized = true
62+
return enricher
63+
}
64+
6065
func Instance() *Enricher {
6166
return e
6267
}
@@ -100,7 +105,21 @@ func (e *Enricher) Run() {
100105

101106
// enrich takes the flow and enriches it with the information from the cache
102107
func (e *Enricher) enrich(ev *v1.Event) {
108+
if ev == nil {
109+
e.l.Debug("received nil event to enrich")
110+
return
111+
}
112+
103113
flow := ev.Event.(*flow.Flow)
114+
if flow == nil {
115+
e.l.Debug("received nil flow to enrich", zap.Any("event", ev))
116+
return
117+
}
118+
119+
if flow.GetIP() == nil {
120+
e.l.Debug("flow IP is nil", zap.Any("flow", flow))
121+
return
122+
}
104123

105124
// IPversion is a enum in the flow proto
106125
// 0: IPVersion_IP_NOT_USED

pkg/enricher/enricher_test.go

Lines changed: 129 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,118 @@ import (
2222
"google.golang.org/protobuf/types/known/timestamppb"
2323
)
2424

25+
var (
26+
// number of events
27+
eventsGeneratedCount = 5
28+
29+
// construct the endpoints
30+
sourcePod = common.NewRetinaEndpoint("pod1", "ns1", &common.IPAddresses{
31+
IPv4: net.IPv4(1, 1, 1, 1),
32+
OtherIPv4s: []net.IP{net.IPv4(1, 1, 1, 2)},
33+
})
34+
destPod = common.NewRetinaEndpoint("pod2", "ns2", &common.IPAddresses{
35+
IPv4: net.IPv4(2, 2, 2, 2),
36+
OtherIPv4s: []net.IP{net.IPv4(2, 2, 2, 3)},
37+
})
38+
// sourceIP = sourcePod.NetIPs().IPv4.String()
39+
// destIP = destPod.NetIPs().IPv4.String()
40+
sourceIP = sourcePod.NetIPs().OtherIPv4s[0].String()
41+
destIP = destPod.NetIPs().OtherIPv4s[0].String()
42+
43+
// construct events
44+
normal = &v1.Event{
45+
Timestamp: timestamppb.Now(),
46+
Event: &flow.Flow{
47+
Time: timestamppb.Now(),
48+
IP: &flow.IP{
49+
IpVersion: 1,
50+
Source: sourceIP,
51+
Destination: destIP,
52+
},
53+
},
54+
}
55+
nilFlow = &v1.Event{
56+
Timestamp: timestamppb.Now(),
57+
Event: nil,
58+
}
59+
nilIP = &v1.Event{
60+
Timestamp: timestamppb.Now(),
61+
Event: &flow.Flow{
62+
Time: timestamppb.Now(),
63+
IP: nil,
64+
},
65+
}
66+
events = []*v1.Event{
67+
normal, nilFlow, nilIP,
68+
}
69+
)
70+
71+
func writeEventToEnricher(t *testing.T, e *Enricher, ev *v1.Event) {
72+
t.Helper()
73+
var wg sync.WaitGroup
74+
wg.Add(1)
75+
go func() {
76+
for range eventsGeneratedCount {
77+
l := log.Logger().Named("addev")
78+
l.Info("Adding event", zap.Any("event", ev))
79+
time.Sleep(100 * time.Millisecond)
80+
e.Write(ev)
81+
}
82+
wg.Done()
83+
}()
84+
e.Run()
85+
wg.Wait()
86+
}
87+
88+
func TestEnricher(t *testing.T) {
89+
opts := log.GetDefaultLogOpts()
90+
opts.Level = "debug"
91+
_, err := log.SetupZapLogger(opts)
92+
require.NoError(t, err)
93+
94+
c := cache.New(pubsub.New())
95+
96+
err = c.UpdateRetinaEndpoint(sourcePod)
97+
require.NoError(t, err)
98+
99+
err = c.UpdateRetinaEndpoint(destPod)
100+
require.NoError(t, err)
101+
102+
e := newEnricher(context.Background(), c)
103+
104+
var wg sync.WaitGroup
105+
defer wg.Wait()
106+
107+
ctx, cancel := context.WithCancel(context.Background())
108+
defer cancel()
109+
110+
for _, ev := range events {
111+
writeEventToEnricher(t, e, ev)
112+
}
113+
114+
l := log.Logger().Named("test-enricher")
115+
116+
l.Info("Starting to read from enricher")
117+
wg.Add(1)
118+
go func() {
119+
oreader := e.ExportReader()
120+
for {
121+
ev := oreader.NextFollow(ctx)
122+
if ev == nil {
123+
l.Info("No more events to read from enricher")
124+
break
125+
}
126+
127+
l.Info("One Received event", zap.Any("event", ev))
128+
assertEqualEndpoint(t, sourcePod, ev.Event.(*flow.Flow).GetSource())
129+
assertEqualEndpoint(t, destPod, ev.Event.(*flow.Flow).GetDestination())
130+
}
131+
wg.Done()
132+
}()
133+
134+
time.Sleep(3 * time.Second)
135+
}
136+
25137
func TestEnricherSecondaryIPs(t *testing.T) {
26138
evCount := 20
27139
// by design per ring, the last written item is not readable
@@ -37,52 +149,25 @@ func TestEnricherSecondaryIPs(t *testing.T) {
37149
ctx, cancel := context.WithCancel(context.Background())
38150
c := cache.New(pubsub.New())
39151

40-
// construct the source endpoint
41-
sourceEndpoints := common.NewRetinaEndpoint("pod1", "ns1", nil)
42-
sourceEndpoints.SetLabels(map[string]string{
43-
"app": "app1",
44-
})
45-
46-
sourceEndpoints.SetOwnerRefs([]*common.OwnerReference{
47-
{
48-
Kind: "Pod",
49-
Name: "pod1-deployment",
50-
},
51-
})
52-
53-
sourceEndpoints.SetIPs(&common.IPAddresses{
54-
IPv4: net.IPv4(1, 1, 1, 1),
55-
OtherIPv4s: []net.IP{net.IPv4(1, 1, 1, 2)},
56-
})
57-
err := c.UpdateRetinaEndpoint(sourceEndpoints)
152+
err := c.UpdateRetinaEndpoint(sourcePod)
58153
require.NoError(t, err)
59154

60-
// construct the destination endpoint
61-
destEndpoints := common.NewRetinaEndpoint("pod2", "ns2", nil)
62-
destEndpoints.SetLabels(map[string]string{"app": "app2"})
63-
destEndpoints.SetOwnerRefs([]*common.OwnerReference{
64-
{
65-
Kind: "Pod",
66-
Name: "pod2-deployment",
67-
},
68-
})
69-
destEndpoints.SetIPs(&common.IPAddresses{
70-
IPv4: net.IPv4(2, 2, 2, 2),
71-
OtherIPv4s: []net.IP{net.IPv4(2, 2, 2, 3)},
72-
})
73-
err = c.UpdateRetinaEndpoint(destEndpoints)
155+
err = c.UpdateRetinaEndpoint(destPod)
74156
require.NoError(t, err)
75157

76-
// get the enricher
77-
e := New(ctx, c)
158+
// create new enricher (not using singleton here)
159+
e := newEnricher(ctx, c)
78160
var wg sync.WaitGroup
79161

80162
wg.Add(1)
81163
go func() {
82164
for i := 0; i < evCount; i++ {
83165
// The Event Source IP is the secondary IP of the source endpoint
166+
secondarySourceIP := sourcePod.NetIPs().OtherIPv4s[0].String()
84167
// The Event Destination IP is the secondary IP of the destination endpoint
85-
addEvent(e, "1.1.1.2", "2.2.2.3")
168+
secondaryDestIP := destPod.NetIPs().OtherIPv4s[0].String()
169+
170+
addEvent(e, secondarySourceIP, secondaryDestIP)
86171
}
87172
wg.Done()
88173
}()
@@ -101,12 +186,8 @@ func TestEnricherSecondaryIPs(t *testing.T) {
101186

102187
l.Info("One Received event", zap.Any("event", ev))
103188
// check whether the event is enriched correctly
104-
sourceFlowEndPoint := ev.Event.(*flow.Flow).GetSource()
105-
assert.Equal(t, sourceEndpoints.Namespace(), sourceFlowEndPoint.GetNamespace())
106-
assert.Equal(t, sourceEndpoints.Name(), sourceFlowEndPoint.GetPodName())
107-
destFlowEndPoint := ev.Event.(*flow.Flow).GetDestination()
108-
assert.Equal(t, destEndpoints.Namespace(), destFlowEndPoint.GetNamespace())
109-
assert.Equal(t, destEndpoints.Name(), destFlowEndPoint.GetPodName())
189+
assertEqualEndpoint(t, sourcePod, ev.Event.(*flow.Flow).GetSource())
190+
assertEqualEndpoint(t, destPod, ev.Event.(*flow.Flow).GetDestination())
110191
count++
111192
}
112193
assert.Equal(t, expectedOutputCount, count, "one")
@@ -123,12 +204,8 @@ func TestEnricherSecondaryIPs(t *testing.T) {
123204
break
124205
}
125206
// check whether the event is enriched correctly
126-
sourceFlowEndPoint := ev.Event.(*flow.Flow).GetSource()
127-
assert.Equal(t, sourceEndpoints.Namespace(), sourceFlowEndPoint.GetNamespace())
128-
assert.Equal(t, sourceEndpoints.Name(), sourceFlowEndPoint.GetPodName())
129-
destFlowEndPoint := ev.Event.(*flow.Flow).GetDestination()
130-
assert.Equal(t, destEndpoints.Namespace(), destFlowEndPoint.GetNamespace())
131-
assert.Equal(t, destEndpoints.Name(), destFlowEndPoint.GetPodName())
207+
assertEqualEndpoint(t, sourcePod, ev.Event.(*flow.Flow).GetSource())
208+
assertEqualEndpoint(t, destPod, ev.Event.(*flow.Flow).GetDestination())
132209
count++
133210
}
134211
assert.Equal(t, expectedOutputCount, count, "two")
@@ -157,3 +234,8 @@ func addEvent(e *Enricher, sourceIP, destIP string) {
157234
time.Sleep(100 * time.Millisecond)
158235
e.Write(ev)
159236
}
237+
238+
func assertEqualEndpoint(t *testing.T, expected *common.RetinaEndpoint, actual *flow.Endpoint) {
239+
assert.Equal(t, expected.Namespace(), actual.GetNamespace())
240+
assert.Equal(t, expected.Name(), actual.GetPodName())
241+
}

0 commit comments

Comments
 (0)