Skip to content

Commit 6080857

Browse files
authored
Merge pull request #2077 from songrx1997/label-propagation-metrics
Add label propagation metrics calculation logic
2 parents 7bab346 + 2a56103 commit 6080857

File tree

10 files changed

+342
-55
lines changed

10 files changed

+342
-55
lines changed

pkg/neg/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
325325
wait.Until(c.gc, c.gcPeriod, stopCh)
326326
}()
327327
go c.reflector.Run(stopCh)
328+
go c.syncerMetrics.Run(stopCh)
328329
<-stopCh
329330
}
330331

pkg/neg/metrics/label_propagation_metrics.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const (
2525
annotationSize = "annotation_size_per_endpoint"
2626
labelErrorNumber = "label_propagation_error_count"
2727
numberOfEndpoints = "number_of_endpoints"
28+
epWithAnnotation = "with_annotation"
29+
totalEndpoints = "total"
2830
)
2931

3032
var (
@@ -33,7 +35,7 @@ var (
3335
}
3436

3537
endpointAnnotationLabels = []string{
36-
"with_annotation",
38+
"feature",
3739
}
3840

3941
NumberOfEndpoints = prometheus.NewGaugeVec(

pkg/neg/metrics/metrics.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ func RegisterMetrics() {
163163
prometheus.MustRegister(InitializationLatency)
164164
prometheus.MustRegister(SyncerStaleness)
165165
prometheus.MustRegister(EPSStaleness)
166+
prometheus.MustRegister(NumberOfEndpoints)
167+
prometheus.MustRegister(LabelPropagationError)
168+
prometheus.MustRegister(LabelNumber)
169+
prometheus.MustRegister(AnnotationSize)
166170

167171
RegisterSyncerMetrics()
168172
})

pkg/neg/metrics/metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestComputeLabelMetrics(t *testing.T) {
9090
collector.syncerLabelProagationStats = tc.syncerLabelProagationStats
9191
out := collector.computeLabelMetrics()
9292
if diff := cmp.Diff(out, tc.expect); diff != "" {
93-
t.Errorf("For test case %s, got %+v, want %+v, diff: %s", tc.desc, out, tc.expect, diff)
93+
t.Errorf("For test case %s, (-want +got):\n%s", tc.desc, diff)
9494
}
9595
}
9696
}

pkg/neg/metrics/neg_metrics_collector.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
type SyncerMetricsCollector interface {
2929
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
3030
SetSyncerEPMetrics(key negtypes.NegSyncerKey, epState *negtypes.SyncerEPStat)
31+
SetLabelPropagationStats(key negtypes.NegSyncerKey, labelstatLabelPropagationStats LabelPropagationStats)
3132
}
3233

3334
type SyncerMetrics struct {
@@ -50,11 +51,12 @@ type SyncerMetrics struct {
5051
// NewNEGMetricsCollector initializes SyncerMetrics and starts a go routine to compute and export metrics periodically.
5152
func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *SyncerMetrics {
5253
return &SyncerMetrics{
53-
syncerStatusMap: make(map[negtypes.NegSyncerKey]string),
54-
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
55-
syncerEPSStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
56-
metricsInterval: exportInterval,
57-
logger: logger.WithName("NegMetricsCollector"),
54+
syncerStatusMap: make(map[negtypes.NegSyncerKey]string),
55+
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
56+
syncerEPSStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
57+
syncerLabelProagationStats: make(map[negtypes.NegSyncerKey]LabelPropagationStats),
58+
metricsInterval: exportInterval,
59+
logger: logger.WithName("NegMetricsCollector"),
5860
}
5961
}
6062

@@ -79,6 +81,10 @@ func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {
7981

8082
// export exports syncer metrics.
8183
func (sm *SyncerMetrics) export() {
84+
lpMetrics := sm.computeLabelMetrics()
85+
NumberOfEndpoints.WithLabelValues(totalEndpoints).Set(float64(lpMetrics.NumberOfEndpoints))
86+
NumberOfEndpoints.WithLabelValues(epWithAnnotation).Set(float64(lpMetrics.EndpointsWithAnnotation))
87+
sm.logger.V(3).Info("Exporting syncer related metrics", "Number of Endpoints", lpMetrics.NumberOfEndpoints)
8288
}
8389

8490
// UpdateSyncer update the status of corresponding syncer based on the syncResult.
@@ -87,7 +93,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
8793
defer sm.mu.Unlock()
8894
if sm.syncerStatusMap == nil {
8995
sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string)
90-
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap)
96+
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap")
9197
}
9298
sm.syncerStatusMap[key] = string(syncResult.Result)
9399
}
@@ -98,17 +104,27 @@ func (sm *SyncerMetrics) SetSyncerEPMetrics(key negtypes.NegSyncerKey, endpointS
98104
defer sm.mu.Unlock()
99105
if sm.syncerEndpointStateMap == nil {
100106
sm.syncerEndpointStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
101-
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPStateMap: %v", sm.syncerEndpointStateMap)
107+
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPStateMap")
102108
}
103109
sm.syncerEndpointStateMap[key] = endpointStat.EndpointStateCount
104110

105111
if sm.syncerEPSStateMap == nil {
106112
sm.syncerEPSStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
107-
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPSStateMap: %v", sm.syncerEPSStateMap)
113+
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPSStateMap")
108114
}
109115
sm.syncerEPSStateMap[key] = endpointStat.EndpointSliceStateCount
110116
}
111117

118+
func (sm *SyncerMetrics) SetLabelPropagationStats(key negtypes.NegSyncerKey, labelstatLabelPropagationStats LabelPropagationStats) {
119+
sm.mu.Lock()
120+
defer sm.mu.Unlock()
121+
if sm.syncerLabelProagationStats == nil {
122+
sm.syncerLabelProagationStats = make(map[negtypes.NegSyncerKey]LabelPropagationStats)
123+
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerLabelProagationStats")
124+
}
125+
sm.syncerLabelProagationStats[key] = labelstatLabelPropagationStats
126+
}
127+
112128
// computeLabelMetrics aggregates label propagation metrics.
113129
func (sm *SyncerMetrics) computeLabelMetrics() LabelPropagationMetrics {
114130
sm.mu.Lock()

pkg/neg/syncers/labels/labels.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323

2424
v1 "k8s.io/api/core/v1"
25+
"k8s.io/ingress-gce/pkg/neg/metrics"
2526
negtypes "k8s.io/ingress-gce/pkg/neg/types"
2627
"k8s.io/ingress-gce/pkg/utils"
2728
)
@@ -44,6 +45,12 @@ type PodLabelMap map[string]string
4445
// EndpointPodLabelMap is a map of network endpoint, endpoint annotations.
4546
type EndpointPodLabelMap map[negtypes.NetworkEndpoint]PodLabelMap
4647

48+
const (
49+
Truncated = "truncated"
50+
TruncationFailure = "truncation_failed"
51+
OtherError = "other_error"
52+
)
53+
4754
var (
4855
ErrLabelTruncated = errors.New("label is truncated")
4956
ErrLabelTruncationFailed = errors.New("failed to truncate label")
@@ -68,6 +75,7 @@ func GetPodLabelMap(pod *v1.Pod, lpConfig PodLabelPropagationConfig) (PodLabelMa
6875
labelVal, err := truncatePodLabel(lpKey, val, label.MaxLabelSizeBytes)
6976
if err != nil {
7077
errs = append(errs, err)
78+
publishLabelPropagationTruncationMetrics(err)
7179
}
7280

7381
// Add the label to the map only if the truncation result is valid
@@ -82,6 +90,16 @@ func GetPodLabelMap(pod *v1.Pod, lpConfig PodLabelPropagationConfig) (PodLabelMa
8290
return labelMap, nil
8391
}
8492

93+
// publishLabelPropagationTruncationMetrics publishes errors occured during
94+
// label truncation.
95+
func publishLabelPropagationTruncationMetrics(err error) {
96+
if errors.Is(err, ErrLabelTruncated) {
97+
metrics.PublishLabelPropagationError(Truncated)
98+
} else if errors.Is(err, ErrLabelTruncationFailed) {
99+
metrics.PublishLabelPropagationError(TruncationFailure)
100+
}
101+
}
102+
85103
// truncatePodLabel calculates the potentially truncated label value to ensure that len(key) + len(label) <= maxTotalSize.
86104
// It will return:
87105
//
@@ -100,3 +118,13 @@ func truncatePodLabel(key, label string, maxTotalSize int) (string, error) {
100118
truncatedVal := string(labelBytes[:maxTotalSize-len(keyBytes)])
101119
return truncatedVal, fmt.Errorf("%w: `%s:%s` is truncated to `%s:%s` because the total length exceeded the limit, length: %d, limit: %d", ErrLabelTruncated, key, label, key, truncatedVal, len(key)+len(label), maxTotalSize)
102120
}
121+
122+
// PodLabelMapSize calculates the size of a podLabelMap.
123+
func GetPodLabelMapSize(podLabelMap PodLabelMap) int {
124+
var res int
125+
for key, val := range podLabelMap {
126+
res += len([]byte(key))
127+
res += len([]byte(val))
128+
}
129+
return res
130+
}

pkg/neg/syncers/transaction.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
233233
}
234234
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())
235235

236-
currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
236+
currentMap, currentPodLabelMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
237237
if err != nil {
238238
return err
239239
}
@@ -312,8 +312,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
312312
// Only fetch label from pod for L7 endpoints
313313
if flags.F.EnableNEGLabelPropagation && s.NegType == negtypes.VmIpPortEndpointType {
314314
endpointPodLabelMap = getEndpointPodLabelMap(addEndpoints, endpointPodMap, s.podLister, s.podLabelPropagationConfig, s.recorder, s.logger)
315+
publishAnnotationSizeMetrics(addEndpoints, endpointPodLabelMap)
315316
}
316317

318+
s.syncCollector.SetLabelPropagationStats(s.NegSyncerKey, collectLabelStats(currentPodLabelMap, endpointPodLabelMap, targetMap))
319+
317320
if s.needCommit() {
318321
s.commitPods(committedEndpoints, endpointPodMap)
319322
}
@@ -866,11 +869,13 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
866869
key := fmt.Sprintf("%s/%s", endpointPodMap[endpoint].Namespace, endpointPodMap[endpoint].Name)
867870
obj, ok, err := podLister.GetByKey(key)
868871
if err != nil || !ok {
872+
metrics.PublishLabelPropagationError(labels.OtherError)
869873
logger.Error(err, "getEndpointPodLabelMap: error getting pod", "pod", key, "exist", ok)
870874
continue
871875
}
872876
pod, ok := obj.(*v1.Pod)
873877
if !ok {
878+
metrics.PublishLabelPropagationError(labels.OtherError)
874879
logger.Error(nil, "expected type *v1.Pod", "pod", key, "type", fmt.Sprintf("%T", obj))
875880
continue
876881
}
@@ -883,3 +888,28 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
883888
}
884889
return endpointPodLabelMap
885890
}
891+
892+
// publishAnnotationSizeMetrics goes through all the endpoints to be attached
893+
// and publish annotation size metrics.
894+
func publishAnnotationSizeMetrics(endpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap) {
895+
for _, endpointSet := range endpoints {
896+
for endpoint := range endpointSet {
897+
labelMap := endpointPodLabelMap[endpoint]
898+
metrics.PublishAnnotationMetrics(labels.GetPodLabelMapSize(labelMap), len(labelMap))
899+
}
900+
}
901+
}
902+
903+
// collectLabelStats calculate the number of endpoints and the number of endpoints with annotations.
904+
func collectLabelStats(currentPodLabelMap, addPodLabelMap labels.EndpointPodLabelMap, targetEndpointMap map[string]negtypes.NetworkEndpointSet) metrics.LabelPropagationStats {
905+
labelPropagationStats := metrics.LabelPropagationStats{}
906+
for _, endpointSet := range targetEndpointMap {
907+
for endpoint := range endpointSet {
908+
labelPropagationStats.NumberOfEndpoints += 1
909+
if currentPodLabelMap[endpoint] != nil || addPodLabelMap[endpoint] != nil {
910+
labelPropagationStats.EndpointsWithAnnotation += 1
911+
}
912+
}
913+
}
914+
return labelPropagationStats
915+
}

pkg/neg/syncers/transaction_test.go

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,7 @@ func TestUnknownNodes(t *testing.T) {
15321532
}
15331533

15341534
// Check that unknown zone did not cause endpoints to be removed
1535-
out, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
1535+
out, _, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
15361536
if err != nil {
15371537
t.Errorf("errored retrieving existing network endpoints")
15381538
}
@@ -1761,7 +1761,7 @@ func TestEnableDegradedMode(t *testing.T) {
17611761
(s.syncer.(*syncer)).stopped = false
17621762
tc.modify(s)
17631763

1764-
out, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
1764+
out, _, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
17651765
if err != nil {
17661766
t.Errorf("errored retrieving existing network endpoints")
17671767
}
@@ -1777,7 +1777,7 @@ func TestEnableDegradedMode(t *testing.T) {
17771777
t.Errorf("after syncInternal, error state is %v, expected to be %v", s.inErrorState(), tc.expectedInErrorState)
17781778
}
17791779
err = wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) {
1780-
out, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
1780+
out, _, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
17811781
if err != nil {
17821782
return false, err
17831783
}
@@ -1910,6 +1910,109 @@ func TestGetEndpointPodLabelMap(t *testing.T) {
19101910
}
19111911
}
19121912

1913+
func TestCollectLabelStats(t *testing.T) {
1914+
t.Parallel()
1915+
1916+
testIP1 := "1.2.3.4"
1917+
testIP2 := "1.2.3.5"
1918+
testIP3 := "1.2.3.6"
1919+
testIP4 := "1.2.3.7"
1920+
testPort := int64(80)
1921+
endpoint1 := negtypes.NetworkEndpoint{IP: testIP1, Node: negtypes.TestInstance1, Port: strconv.Itoa(int(testPort))}
1922+
endpoint2 := negtypes.NetworkEndpoint{IP: testIP2, Node: negtypes.TestInstance2, Port: strconv.Itoa(int(testPort))}
1923+
endpoint3 := negtypes.NetworkEndpoint{IP: testIP3, Node: negtypes.TestInstance3, Port: strconv.Itoa(int(testPort))}
1924+
endpoint4 := negtypes.NetworkEndpoint{IP: testIP4, Node: negtypes.TestInstance4, Port: strconv.Itoa(int(testPort))}
1925+
1926+
for _, tc := range []struct {
1927+
desc string
1928+
curLabelMap labels.EndpointPodLabelMap
1929+
addLabelMap labels.EndpointPodLabelMap
1930+
targetEndpointMap map[string]negtypes.NetworkEndpointSet
1931+
expect metrics.LabelPropagationStats
1932+
}{
1933+
{
1934+
desc: "Empty inputs",
1935+
curLabelMap: labels.EndpointPodLabelMap{},
1936+
addLabelMap: labels.EndpointPodLabelMap{},
1937+
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{},
1938+
expect: metrics.LabelPropagationStats{
1939+
EndpointsWithAnnotation: 0,
1940+
NumberOfEndpoints: 0,
1941+
},
1942+
},
1943+
{
1944+
desc: "No new endpoints to be added",
1945+
curLabelMap: labels.EndpointPodLabelMap{
1946+
endpoint1: labels.PodLabelMap{
1947+
"foo": "bar",
1948+
},
1949+
},
1950+
addLabelMap: labels.EndpointPodLabelMap{},
1951+
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
1952+
testZone1: negtypes.NewNetworkEndpointSet(
1953+
endpoint1,
1954+
endpoint2,
1955+
),
1956+
},
1957+
expect: metrics.LabelPropagationStats{
1958+
EndpointsWithAnnotation: 1,
1959+
NumberOfEndpoints: 2,
1960+
},
1961+
},
1962+
{
1963+
desc: "Some endpoints to be added",
1964+
curLabelMap: labels.EndpointPodLabelMap{
1965+
endpoint1: labels.PodLabelMap{
1966+
"foo": "bar",
1967+
},
1968+
},
1969+
addLabelMap: labels.EndpointPodLabelMap{
1970+
endpoint3: labels.PodLabelMap{
1971+
"foo": "bar",
1972+
},
1973+
},
1974+
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
1975+
testZone1: negtypes.NewNetworkEndpointSet(
1976+
endpoint1,
1977+
endpoint2,
1978+
),
1979+
testZone2: negtypes.NewNetworkEndpointSet(
1980+
endpoint3,
1981+
endpoint4,
1982+
),
1983+
},
1984+
expect: metrics.LabelPropagationStats{
1985+
EndpointsWithAnnotation: 2,
1986+
NumberOfEndpoints: 4,
1987+
},
1988+
},
1989+
{
1990+
desc: "Only newly added endpoints",
1991+
curLabelMap: labels.EndpointPodLabelMap{},
1992+
addLabelMap: labels.EndpointPodLabelMap{
1993+
endpoint3: labels.PodLabelMap{
1994+
"foo": "bar",
1995+
},
1996+
},
1997+
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
1998+
testZone2: negtypes.NewNetworkEndpointSet(
1999+
endpoint3,
2000+
endpoint4,
2001+
),
2002+
},
2003+
expect: metrics.LabelPropagationStats{
2004+
EndpointsWithAnnotation: 1,
2005+
NumberOfEndpoints: 2,
2006+
},
2007+
},
2008+
} {
2009+
out := collectLabelStats(tc.curLabelMap, tc.addLabelMap, tc.targetEndpointMap)
2010+
if diff := cmp.Diff(out, tc.expect); diff != "" {
2011+
t.Errorf("For test case %s: (-want +got): \n%s", tc.desc, diff)
2012+
}
2013+
}
2014+
}
2015+
19132016
func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
19142017
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
19152018
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)

0 commit comments

Comments
 (0)