Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
- latest provider-source locations
- latest trackable locations and active motion state used for collision work
- optional per-trackable Kalman filter state and retained samples
- proximity hysteresis state
- fence membership state
- proximity hysteresis state
- fence membership state, including transient exit-tolerance and exit-delay bookkeeping
- collision pair state

## Event Fan-Out
Expand Down Expand Up @@ -51,6 +51,7 @@ Implications:
- WebSocket fan-out coalesces multiple internal events into fewer wrapper messages and drops outbound payloads for slow subscribers instead of tearing the connection down immediately
- hub-issued UUIDs for REST-managed resources, derived fence/collision events, and RPC caller IDs now use UUIDv7 so emitted identifiers are time-sortable
- internal hub events carry the persisted `origin_hub_id` so downstream transports preserve source provenance
- fence exits are still driven by accepted location updates, but the decision path now applies per-fence/provider/trackable tolerance bands and exit debounce before emitting `region_exit`

## Observability Boundaries
- `internal/observability` owns OpenTelemetry resource setup, OTLP exporters, lifecycle management, and the small internal instrumentation API used by the rest of the runtime.
Expand Down
36 changes: 35 additions & 1 deletion internal/hub/processing_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ type expiringKalmanTrack struct {
}

type expiringFenceMembership struct {
expiresAt time.Time
expiresAt time.Time
toleranceStartedAt time.Time
exitPendingSince time.Time
}

// ProcessingState keeps transient decision state in memory.
Expand Down Expand Up @@ -170,6 +172,38 @@ func (s *ProcessingState) SetInsideFence(trackableID, fenceID string, ttl time.D
current[fenceID] = expiringFenceMembership{expiresAt: s.nowUTC().Add(ttl)}
}

func (s *ProcessingState) FenceMembershipState(trackableID, fenceID string) (expiringFenceMembership, bool) {
s.mu.Lock()
defer s.mu.Unlock()
current, ok := s.fenceMembership[trackableID]
if !ok {
return expiringFenceMembership{}, false
}
membership, ok := current[fenceID]
if !ok || !membership.expiresAt.After(s.nowUTC()) {
delete(current, fenceID)
if len(current) == 0 {
delete(s.fenceMembership, trackableID)
}
return expiringFenceMembership{}, false
}
return membership, true
}

func (s *ProcessingState) UpdateFenceMembershipState(trackableID, fenceID string, ttl time.Duration, update func(*expiringFenceMembership)) {
s.mu.Lock()
defer s.mu.Unlock()
current := s.fenceMembership[trackableID]
if current == nil {
current = map[string]expiringFenceMembership{}
s.fenceMembership[trackableID] = current
}
membership := current[fenceID]
membership.expiresAt = s.nowUTC().Add(ttl)
update(&membership)
current[fenceID] = membership
}

func (s *ProcessingState) ClearInsideFence(trackableID, fenceID string) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
222 changes: 205 additions & 17 deletions internal/hub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ type resolvedProximity struct {
Sticky bool
}

type fenceExitPolicy struct {
ExitTolerance float64
ToleranceTimeout time.Duration
ToleranceTimeoutActive bool
ExitDelay time.Duration
}

type fenceContainment struct {
Inside bool
OutsideDistance float64
}

func (s *Service) processingState() *ProcessingState {
if s.state == nil {
s.state = NewProcessingState(s.now)
Expand Down Expand Up @@ -1514,30 +1526,33 @@ func (s *Service) publishFenceEvents(ctx context.Context, location gen.Location)
if err != nil {
return nil
}
provider, hasProvider := s.providerByID(ctx, location.ProviderId)
fences, err := s.fenceCandidatesForLocation(ctx, location)
if err != nil {
return err
}
locationCandidates := make(map[string]gen.Fence, len(fences))
locationContains := make(map[string]bool, len(fences))
locationContains := make(map[string]fenceContainment, len(fences))
for _, fence := range fences {
fenceID := fence.Id.String()
locationCandidates[fenceID] = fence
inside, err := fenceContainsPoint(fence, point)
containment, err := fenceContainmentForPoint(fence, point)
if err != nil {
continue
}
locationContains[fenceID] = inside
locationContains[fenceID] = containment
}
now := time.Now().UTC()
now := s.processingState().nowUTC()
for _, trackableID := range *location.Trackables {
trackable, err := s.trackableByID(ctx, trackableID)
hasTrackable := err == nil
activeFenceIDs := s.processingState().ListInsideFences(trackableID)
activeFenceSet := make(map[string]struct{}, len(activeFenceIDs))
for _, fenceID := range activeFenceIDs {
activeFenceSet[fenceID] = struct{}{}
}
trackableCandidates := make(map[string]gen.Fence, len(locationCandidates)+len(activeFenceIDs))
trackableContains := make(map[string]bool, len(locationContains)+len(activeFenceIDs))
trackableContains := make(map[string]fenceContainment, len(locationContains)+len(activeFenceIDs))
for fenceID, fence := range locationCandidates {
trackableCandidates[fenceID] = fence
trackableContains[fenceID] = locationContains[fenceID]
Expand All @@ -1556,17 +1571,18 @@ func (s *Service) publishFenceEvents(ctx context.Context, location gen.Location)
continue
}
trackableCandidates[fenceID] = fence
inside, err := fenceContainsPoint(fence, point)
containment, err := fenceContainmentForPoint(fence, point)
if err != nil {
continue
}
trackableContains[fenceID] = inside
trackableContains[fenceID] = containment
}
for fenceID, fence := range trackableCandidates {
inside := trackableContains[fenceID]
containment := trackableContains[fenceID]
_, wasInside := activeFenceSet[fenceID]
policy := resolveFenceExitPolicy(fence, trackable, hasTrackable, provider, hasProvider)
switch {
case inside && !wasInside:
case containment.Inside && !wasInside:
s.processingState().SetInsideFence(trackableID, fenceID, s.cfg.LocationTTL)
event := gen.FenceEvent{
EventType: gen.RegionEntry,
Expand All @@ -1581,9 +1597,30 @@ func (s *Service) publishFenceEvents(ctx context.Context, location gen.Location)
span.RecordError(err)
return err
}
case inside && wasInside:
case containment.Inside && wasInside:
s.processingState().SetInsideFence(trackableID, fenceID, s.cfg.LocationTTL)
case !inside && wasInside:
case !containment.Inside && wasInside:
membership, ok := s.processingState().FenceMembershipState(trackableID, fenceID)
if !ok {
continue
}
if !exitReady(containment, policy, now, &membership) {
s.processingState().UpdateFenceMembershipState(trackableID, fenceID, s.cfg.LocationTTL, func(state *expiringFenceMembership) {
state.toleranceStartedAt = membership.toleranceStartedAt
state.exitPendingSince = membership.exitPendingSince
})
continue
}
if membership.exitPendingSince.IsZero() {
membership.exitPendingSince = now
s.processingState().UpdateFenceMembershipState(trackableID, fenceID, s.cfg.LocationTTL, func(state *expiringFenceMembership) {
state.toleranceStartedAt = membership.toleranceStartedAt
state.exitPendingSince = membership.exitPendingSince
})
}
if now.Sub(membership.exitPendingSince) < policy.ExitDelay {
continue
}
s.processingState().ClearInsideFence(trackableID, fenceID)
event := gen.FenceEvent{
EventType: gen.RegionExit,
Expand All @@ -1605,13 +1642,96 @@ func (s *Service) publishFenceEvents(ctx context.Context, location gen.Location)
return nil
}

func exitReady(containment fenceContainment, policy fenceExitPolicy, now time.Time, membership *expiringFenceMembership) bool {
if containment.Inside {
membership.toleranceStartedAt = time.Time{}
membership.exitPendingSince = time.Time{}
return false
}
if policy.ExitTolerance <= 0 || containment.OutsideDistance > policy.ExitTolerance {
membership.toleranceStartedAt = time.Time{}
return true
}
if membership.toleranceStartedAt.IsZero() {
membership.toleranceStartedAt = now
}
if !policy.ToleranceTimeoutActive {
membership.exitPendingSince = time.Time{}
return false
}
if now.Sub(membership.toleranceStartedAt) >= policy.ToleranceTimeout {
return true
}
membership.exitPendingSince = time.Time{}
return false
}

func (s *Service) fenceCandidatesForLocation(ctx context.Context, location gen.Location) ([]gen.Fence, error) {
if cache := s.metadataCache(); cache != nil {
return cache.FenceCandidates(location)
}
return s.ListFences(ctx)
}

func (s *Service) providerByID(ctx context.Context, id string) (gen.LocationProvider, bool) {
if cache := s.metadataCache(); cache != nil {
return cache.ProviderByID(id)
}
provider, err := s.GetProvider(ctx, strings.TrimSpace(id))
if err != nil {
return gen.LocationProvider{}, false
}
return provider, true
}

func resolveFenceExitPolicy(fence gen.Fence, trackable gen.Trackable, hasTrackable bool, provider gen.LocationProvider, hasProvider bool) fenceExitPolicy {
policy := fenceExitPolicy{}
applyFencePolicyOverride(&policy, fence.ExitTolerance, fence.ToleranceTimeout, fence.ExitDelay)
if hasProvider {
applyFencePolicyOverride(&policy, provider.ExitTolerance, provider.ToleranceTimeout, provider.ExitDelay)
}
if hasTrackable {
applyFencePolicyOverride(&policy, trackable.ExitTolerance, trackable.ToleranceTimeout, trackable.ExitDelay)
}
return policy
}

func applyFencePolicyOverride(policy *fenceExitPolicy, exitTolerance *gen.PositiveNumber, toleranceTimeout *gen.PositiveOrMinusOne, exitDelay *gen.PositiveOrMinusOne) {
if exitTolerance != nil && *exitTolerance > 0 {
policy.ExitTolerance = float64(*exitTolerance)
}
if timeout, ok, disabled := decodePositiveOrMinusOneDuration(toleranceTimeout); ok {
if disabled {
policy.ToleranceTimeout = 0
policy.ToleranceTimeoutActive = false
} else {
policy.ToleranceTimeout = timeout
policy.ToleranceTimeoutActive = true
}
}
if delay, ok, disabled := decodePositiveOrMinusOneDuration(exitDelay); ok {
if disabled {
policy.ExitDelay = 0
} else {
policy.ExitDelay = delay
}
}
}

func decodePositiveOrMinusOneDuration(value *gen.PositiveOrMinusOne) (time.Duration, bool, bool) {
if value == nil {
return 0, false, false
}
if disabled, err := value.AsPositiveOrMinusOne0(); err == nil && disabled == gen.Minus1 {
return 0, true, true
}
positive, err := value.AsPositiveNumber()
if err != nil || positive <= 0 {
return 0, false, false
}
return time.Duration(float64(positive) * float64(time.Millisecond)), true, false
}

func (s *Service) fenceByID(ctx context.Context, fenceID string) (gen.Fence, bool) {
if cache := s.metadataCache(); cache != nil {
fence, ok := cache.FenceByID(fenceID)
Expand Down Expand Up @@ -2730,28 +2850,48 @@ func point2D(point gen.Point) ([2]float64, error) {
return [2]float64{}, errors.New("invalid coordinates")
}

func fenceContainsPoint(fence gen.Fence, point [2]float64) (bool, error) {
func fenceContainmentForPoint(fence gen.Fence, point [2]float64) (fenceContainment, error) {
if p, err := fence.Region.AsPoint(); err == nil {
center, err := point2D(p)
if err != nil {
return false, err
return fenceContainment{}, err
}
radius := 0.0
if fence.Radius != nil {
radius = float64(*fence.Radius)
}
dx := point[0] - center[0]
dy := point[1] - center[1]
return dx*dx+dy*dy <= radius*radius, nil
distance := math.Sqrt(dx*dx + dy*dy)
if distance <= radius {
return fenceContainment{Inside: true}, nil
}
return fenceContainment{OutsideDistance: distance - radius}, nil
}
polygon, err := fence.Region.AsPolygon()
if err != nil {
return false, err
return fenceContainment{}, err
}
if len(polygon.Coordinates) == 0 || len(polygon.Coordinates[0]) == 0 {
return false, errors.New("empty polygon")
return fenceContainment{}, errors.New("empty polygon")
}
inside := pointInRing(point, polygon.Coordinates[0])
if inside {
return fenceContainment{Inside: true}, nil
}
distance, err := pointToRingDistance(point, polygon.Coordinates[0])
if err != nil {
return fenceContainment{}, err
}
return fenceContainment{OutsideDistance: distance}, nil
}

func fenceContainsPoint(fence gen.Fence, point [2]float64) (bool, error) {
containment, err := fenceContainmentForPoint(fence, point)
if err != nil {
return false, err
}
return pointInRing(point, polygon.Coordinates[0]), nil
return containment.Inside, nil
}

func pointInRing(point [2]float64, ring []gen.GeoJsonPosition) bool {
Expand All @@ -2778,6 +2918,54 @@ func pointInRing(point [2]float64, ring []gen.GeoJsonPosition) bool {
return inside
}

func pointToRingDistance(point [2]float64, ring []gen.GeoJsonPosition) (float64, error) {
if len(ring) == 0 {
return 0, errors.New("empty ring")
}
minDistance := math.Inf(1)
last := len(ring) - 1
for i := 0; i < len(ring); i++ {
start, err := geoPoint(ring[last])
if err != nil {
last = i
continue
}
end, err := geoPoint(ring[i])
if err != nil {
last = i
continue
}
distance := pointToSegmentDistance(point, start, end)
if distance < minDistance {
minDistance = distance
}
last = i
}
if math.IsInf(minDistance, 1) {
return 0, errors.New("ring has no valid coordinates")
}
return minDistance, nil
}

func pointToSegmentDistance(point, start, end [2]float64) float64 {
dx := end[0] - start[0]
dy := end[1] - start[1]
if dx == 0 && dy == 0 {
return math.Sqrt((point[0]-start[0])*(point[0]-start[0]) + (point[1]-start[1])*(point[1]-start[1]))
}
t := ((point[0]-start[0])*dx + (point[1]-start[1])*dy) / (dx*dx + dy*dy)
if t < 0 {
t = 0
} else if t > 1 {
t = 1
}
closestX := start[0] + t*dx
closestY := start[1] + t*dy
dx = point[0] - closestX
dy = point[1] - closestY
return math.Sqrt(dx*dx + dy*dy)
}

func geoPoint(pos gen.GeoJsonPosition) ([2]float64, error) {
coords2d, err := pos.AsGeoJsonPosition2D()
if err == nil && len(coords2d) == 2 {
Expand Down
Loading