Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 31 additions & 51 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ type Daemon interface {
}

type daemon struct {
config config.DaemonConfig
watcher watcher.Watcher
kubeClient k8sClient.Client
guidPool guid.Pool
smClient plugins.SubnetManagerClient
guidPodNetworkMap map[string]string // allocated guid mapped to the pod and network
lastPkeyAPICallTimestamp time.Time // timestamp of the last initiated pkey modification API call
lastPkeyAPICallTimestampMutex sync.Mutex // protects lastPkeyAPICallTimestamp
config config.DaemonConfig
watcher watcher.Watcher
kubeClient k8sClient.Client
guidPool guid.Pool
smClient plugins.SubnetManagerClient
guidPodNetworkMap map[string]string // allocated guid mapped to the pod and network
pkeyUpdateBaseline time.Time // SM's last_updated observed at our last successful gate-pass
pkeyUpdateBaselineMutex sync.Mutex // protects pkeyUpdateBaseline
}

// Temporary struct used to proceed pods' networks
Expand All @@ -64,70 +64,50 @@ type networksMap struct {
}

// canProceedWithPkeyModification avoids making pkey modification calls before the previous call is completed.
// It queries the SM's pkey last_updated timestamp and compares it to the last time
// the client has made a call to the pkey API.
// If last_updated timestamp > stored API call timestamp, our previous operation likely completed and we can proceed.
// It compares the SM's pkey last_updated value against the baseline captured at our previous call —
// both timestamps come from the same SM endpoint, so any clock skew between the SM host and ib-kubernetes
// (or between two different SM endpoints) is irrelevant.
// If last_updated has advanced past the baseline, our previous op completed and we can proceed.
func (d *daemon) canProceedWithPkeyModification() bool {
d.lastPkeyAPICallTimestampMutex.Lock()
defer d.lastPkeyAPICallTimestampMutex.Unlock()
d.pkeyUpdateBaselineMutex.Lock()
defer d.pkeyUpdateBaselineMutex.Unlock()
lastPkeyUpdateTimestamp, err := d.smClient.GetLastPKeyUpdateTimestamp()
lastAPICallTimestamp := d.lastPkeyAPICallTimestamp
if err != nil {
log.Warn().Msgf("failed to get SM pkey last_updated timestamp for canProceedWithPkeyModification check: %v", err)
// If we can't get the timestamp, don't proceed
return false
}

// SM returns null for last_updated when no PKey updates have been done yet
// In this case, we can proceed since there's nothing in progress
// SM returns null for last_updated when no PKey updates have been done yet.
// Nothing is in progress; proceed without recording a baseline so the next call
// also takes this fast path until SM reports a real value.
if lastPkeyUpdateTimestamp.IsZero() {
log.Debug().Msgf("SM pkey last_updated is null (no updates yet), proceeding with pkey modification call")
d.updateLastPkeyAPICallTimestamp()
log.Debug().Msg("SM pkey last_updated is null (no updates yet), proceeding")
return true
}

// First call from our side - no previous timestamp stored
if lastAPICallTimestamp.IsZero() {
log.Debug().Msgf("no previous timestamp stored locally, proceeding with SM call")
d.updateLastPkeyAPICallTimestamp()
baseline := d.pkeyUpdateBaseline

// First call after startup: capture current last_updated as the baseline and proceed.
if baseline.IsZero() {
log.Debug().Msgf("no baseline stored yet, capturing last_updated=%v and proceeding", lastPkeyUpdateTimestamp)
d.pkeyUpdateBaseline = lastPkeyUpdateTimestamp
return true
}

// Check if last_updated timestamp >= stored API call timestamp
// If the SM's last_updated has reached or passed our stored API call timestamp,
// our previous operation likely completed
if !lastPkeyUpdateTimestamp.Before(lastAPICallTimestamp) {
log.Debug().Msgf("SM pkey last_updated %v >= stored timestamp %v, proceeding",
lastPkeyUpdateTimestamp, lastAPICallTimestamp)
d.updateLastPkeyAPICallTimestamp()
// If SM's last_updated has advanced past the baseline, our previous op completed.
if lastPkeyUpdateTimestamp.After(baseline) {
log.Debug().Msgf("SM pkey last_updated %v advanced past baseline %v, proceeding",
lastPkeyUpdateTimestamp, baseline)
d.pkeyUpdateBaseline = lastPkeyUpdateTimestamp
return true
}

log.Info().Msgf(
"pkey last_updated %v < stored timestamp %v, skipping this cycle (previous op may still be in progress)",
lastPkeyUpdateTimestamp, lastAPICallTimestamp)
"pkey last_updated %v has not advanced past baseline %v, skipping this cycle (previous op may still be in progress)",
lastPkeyUpdateTimestamp, baseline)
return false
}

// updateLastPkeyAPICallTimestamp updates the stored server side timestamp when a pkey modification API call was made.
func (d *daemon) updateLastPkeyAPICallTimestamp() {
currentTimestamp, err := d.smClient.GetServerTime()
if err != nil {
log.Warn().Msgf("failed to get SM current time: %v", err)
return
}

if currentTimestamp.IsZero() {
log.Warn().Msg("failed to get SM current time: returned time is 0")
return
}

if currentTimestamp.After(d.lastPkeyAPICallTimestamp) {
d.lastPkeyAPICallTimestamp = currentTimestamp
}
log.Debug().Msgf("Updated last pkey modification timestamp to %v", currentTimestamp)
}

// Exponential backoff ~26 sec + 6 * <api call time>
// NOTE: k8s client has built in exponential backoff, which ib-kubernetes don't use.
// In case client's backoff was configured time may dramatically increase.
Expand Down
159 changes: 27 additions & 132 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,159 +92,72 @@ var _ = Describe("Daemon", func() {
}
})

It("returns false and updates timestamp when SM returns error (fail open)", func() {
It("returns false when SM returns error (fail closed)", func() {
smClient.On("GetLastPKeyUpdateTimestamp").Return(time.Time{}, errors.New("connection error"))
serverTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(serverTime, nil)

result := d.canProceedWithPkeyModification()

Expect(result).To(BeFalse())
// Baseline must not be advanced when we couldn't read SM
Expect(d.pkeyUpdateBaseline.IsZero()).To(BeTrue())
})

It("returns true when SM last_updated is null (zero time)", func() {
It("returns true when SM last_updated is null (no updates yet) without recording a baseline", func() {
smClient.On("GetLastPKeyUpdateTimestamp").Return(time.Time{}, nil)
serverTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(serverTime, nil)

result := d.canProceedWithPkeyModification()

Expect(result).To(BeTrue())
Expect(d.lastPkeyAPICallTimestamp).To(Equal(serverTime))
// Baseline stays zero so subsequent calls also take the fast path until SM has a real value
Expect(d.pkeyUpdateBaseline.IsZero()).To(BeTrue())
})

It("returns true when no previous local timestamp stored", func() {
It("captures current last_updated as baseline on first call after startup", func() {
lastUpdated := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
smClient.On("GetLastPKeyUpdateTimestamp").Return(lastUpdated, nil)
serverTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(serverTime, nil)

// lastPkeyAPICallTimestamp is zero (default)
// pkeyUpdateBaseline is zero (default)
result := d.canProceedWithPkeyModification()

Expect(result).To(BeTrue())
Expect(d.lastPkeyAPICallTimestamp).To(Equal(serverTime))
Expect(d.pkeyUpdateBaseline).To(Equal(lastUpdated))
})

It("returns true when SM last_updated > stored timestamp (previous op completed)", func() {
// Stored timestamp from previous API call
d.lastPkeyAPICallTimestamp = time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
It("returns true and advances baseline when SM last_updated advanced past baseline", func() {
d.pkeyUpdateBaseline = time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)

// SM reports a newer last_updated - meaning our previous call completed
smLastUpdated := time.Date(2024, time.January, 15, 10, 5, 0, 0, time.UTC)
smClient.On("GetLastPKeyUpdateTimestamp").Return(smLastUpdated, nil)

newServerTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(newServerTime, nil)

result := d.canProceedWithPkeyModification()

Expect(result).To(BeTrue())
Expect(d.lastPkeyAPICallTimestamp).To(Equal(newServerTime))
Expect(d.pkeyUpdateBaseline).To(Equal(smLastUpdated))
})

It("returns false when SM last_updated < stored timestamp (previous op still in progress)", func() {
// Stored timestamp from previous API call
d.lastPkeyAPICallTimestamp = time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
It("returns false when SM last_updated has not advanced (previous op still in progress)", func() {
baseline := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
d.pkeyUpdateBaseline = baseline

// SM reports an older last_updated - meaning our previous call hasn't completed
smLastUpdated := time.Date(2024, time.January, 15, 9, 55, 0, 0, time.UTC)
smClient.On("GetLastPKeyUpdateTimestamp").Return(smLastUpdated, nil)

result := d.canProceedWithPkeyModification()

Expect(result).To(BeFalse())
// Timestamp should NOT be updated when returning false
Expect(d.lastPkeyAPICallTimestamp).To(Equal(time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)))
Expect(d.pkeyUpdateBaseline).To(Equal(baseline))
})

It("returns true when SM last_updated equals stored timestamp (previous op completed)", func() {
storedTime := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = storedTime
It("returns false when SM last_updated equals baseline (no progress since last call)", func() {
baseline := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
d.pkeyUpdateBaseline = baseline

// SM reports the same last_updated - operation completed at exactly our stored time
smClient.On("GetLastPKeyUpdateTimestamp").Return(storedTime, nil)
newServerTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(newServerTime, nil)
smClient.On("GetLastPKeyUpdateTimestamp").Return(baseline, nil)

result := d.canProceedWithPkeyModification()

Expect(result).To(BeTrue())
Expect(d.lastPkeyAPICallTimestamp).To(Equal(newServerTime))
})
})

Context("updateLastPkeyAPICallTimestamp", func() {
var (
d *daemon
smClient *smMocks.SubnetManagerClient
)

BeforeEach(func() {
smClient = &smMocks.SubnetManagerClient{}
d = &daemon{
smClient: smClient,
}
})

It("updates timestamp when GetServerTime succeeds", func() {
serverTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(serverTime, nil)

d.updateLastPkeyAPICallTimestamp()

Expect(d.lastPkeyAPICallTimestamp).To(Equal(serverTime))
})

It("does not update timestamp when GetServerTime returns error", func() {
originalTime := time.Date(2024, time.January, 15, 9, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = originalTime

smClient.On("GetServerTime").Return(time.Time{}, errors.New("connection error"))

d.updateLastPkeyAPICallTimestamp()

// Timestamp should remain unchanged
Expect(d.lastPkeyAPICallTimestamp).To(Equal(originalTime))
})

It("does not update timestamp when GetServerTime returns zero time", func() {
originalTime := time.Date(2024, time.January, 15, 9, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = originalTime

smClient.On("GetServerTime").Return(time.Time{}, nil)

d.updateLastPkeyAPICallTimestamp()

// Timestamp should remain unchanged
Expect(d.lastPkeyAPICallTimestamp).To(Equal(originalTime))
})

It("only updates if new time is after existing timestamp", func() {
// Set a future timestamp
futureTime := time.Date(2025, time.January, 15, 10, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = futureTime

// Server returns an older time
olderServerTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(olderServerTime, nil)

d.updateLastPkeyAPICallTimestamp()

// Should NOT update because server time is older
Expect(d.lastPkeyAPICallTimestamp).To(Equal(futureTime))
})

It("updates when new time is after existing timestamp", func() {
olderTime := time.Date(2024, time.January, 15, 9, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = olderTime

newerServerTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetServerTime").Return(newerServerTime, nil)

d.updateLastPkeyAPICallTimestamp()

Expect(d.lastPkeyAPICallTimestamp).To(Equal(newerServerTime))
Expect(result).To(BeFalse())
Expect(d.pkeyUpdateBaseline).To(Equal(baseline))
})
})

Expand All @@ -261,26 +174,9 @@ var _ = Describe("Daemon", func() {
}
})

It("handles GetServerTime failure gracefully when proceeding", func() {
// SM returns null last_updated (should proceed)
smClient.On("GetLastPKeyUpdateTimestamp").Return(time.Time{}, nil)
// But GetServerTime fails
smClient.On("GetServerTime").Return(time.Time{}, errors.New("server time error"))

result := d.canProceedWithPkeyModification()

// Should still return true (proceed) but timestamp won't be updated
Expect(result).To(BeTrue())
Expect(d.lastPkeyAPICallTimestamp.IsZero()).To(BeTrue())
})

It("concurrent calls are serialized by mutex", func() {
// This test verifies the mutex protects the timestamp
serverTime := time.Date(2024, time.January, 15, 10, 30, 0, 0, time.UTC)
smClient.On("GetLastPKeyUpdateTimestamp").Return(time.Time{}, nil)
smClient.On("GetServerTime").Return(serverTime, nil)

// Run multiple goroutines to test mutex protection
done := make(chan bool, 10)
for i := 0; i < 10; i++ {
go func() {
Expand All @@ -290,13 +186,12 @@ var _ = Describe("Daemon", func() {
}()
}

// Wait for all goroutines
for i := 0; i < 10; i++ {
<-done
}

// Timestamp should be set correctly
Expect(d.lastPkeyAPICallTimestamp).To(Equal(serverTime))
// last_updated was zero throughout, so baseline must still be zero
Expect(d.pkeyUpdateBaseline.IsZero()).To(BeTrue())
})
})

Expand Down Expand Up @@ -548,7 +443,7 @@ var _ = Describe("Daemon", func() {
// - Set a stored timestamp
// - SM returns last_updated <= stored timestamp
storedTime := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = storedTime
d.pkeyUpdateBaseline = storedTime

// SM reports an older last_updated - previous op still in progress
smLastUpdated := time.Date(2024, time.January, 15, 9, 55, 0, 0, time.UTC)
Expand Down Expand Up @@ -582,7 +477,7 @@ var _ = Describe("Daemon", func() {

// First call: previous op in progress
storedTime := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = storedTime
d.pkeyUpdateBaseline = storedTime

smLastUpdated := time.Date(2024, time.January, 15, 9, 55, 0, 0, time.UTC)
// First call - returns older timestamp (in progress)
Expand Down Expand Up @@ -666,7 +561,7 @@ var _ = Describe("Daemon", func() {

// Setup: Simulate previous operation still in progress
storedTime := time.Date(2024, time.January, 15, 10, 0, 0, 0, time.UTC)
d.lastPkeyAPICallTimestamp = storedTime
d.pkeyUpdateBaseline = storedTime

smLastUpdated := time.Date(2024, time.January, 15, 9, 55, 0, 0, time.UTC)
smClient.On("GetLastPKeyUpdateTimestamp").Return(smLastUpdated, nil)
Expand Down
Empty file modified tag_build_and_push.sh
100644 → 100755
Empty file.
Loading