-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.go
More file actions
197 lines (174 loc) · 6.03 KB
/
cache.go
File metadata and controls
197 lines (174 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package the_cachex
import (
"context"
"errors"
"log/slog"
"reflect"
"sync"
"sync/atomic"
"time"
)
// cacheListener is a struct used to subscribe to the cache for request deduplication
type cacheListener struct {
isCompleted chan int
isError chan error
}
// cacheEntry represents an internal entry of the cache.
type cacheEntry[T any] struct {
// isCurrentlyBeingProcessed is a flag indicating if the cache entry is currently being processed.
isCurrentlyBeingProcessed bool
// cacheListeners holds the listeners interested to learn about this cache.
cacheListeners []cacheListener
// expiryAt is the timestamp of when this cache entry should expire.
expiryAt int64
// dataMutex protects access to the data field.
dataMutex sync.Mutex
// data is the cached data.
data T
}
// Cache offers a simple API for caching stuff.
type Cache[T any] struct {
// defaultTTL represents the default given TTL.
defaultTTL time.Duration
// internalCache holds the internal cache data
internalCache sync.Map
// hitsCounter tracks how many cache hits occurred for this cache.
hitsCounter atomic.Int32
// missesCounter tracks how many cache misses occurred for this cache.
missesCounter atomic.Int32
// zeroT represents am empty T
zeroT T
}
// NewCache builds a new Cache.
// defaultTTL is the default time to live for each entry in the cache.
func NewCache[T any](defaultTTL time.Duration) *Cache[T] {
var zero T
return &Cache[T]{
defaultTTL: defaultTTL,
zeroT: zero,
}
}
// Cache caches the given function's result.
// The result is only cached if the error is nil, non-nil errors are not cached.
// An optional ttlOverride can be given in order to override the ttl for the cache entry.
func (c *Cache[T]) Cache(ctx context.Context, cacheKey string, cacheFunction func(ctx2 context.Context) (T, error), ttlOverride ...time.Duration) (T, error) {
ttlToSet := c.defaultTTL
if len(ttlOverride) > 0 {
ttlToSet = ttlOverride[0]
}
// Built internal cache entry struct
theInternalCacheEntry := cacheEntry[T]{
cacheListeners: make([]cacheListener, 0),
expiryAt: time.Now().Add(ttlToSet).UnixNano(),
isCurrentlyBeingProcessed: true,
data: c.zeroT,
}
// Load the cached value or store a placeholder to dedupe requests.
mapEntry, ok := c.internalCache.LoadOrStore(cacheKey, &theInternalCacheEntry)
if ok {
// We have gotten something from the internal cache, need to validate it.
cachedEntry, ok := mapEntry.(*cacheEntry[T])
// Cache entry is not expired
if ok && time.Now().UnixNano() < cachedEntry.expiryAt {
// If it doesn't have data means it is inflight.
if reflect.DeepEqual(cachedEntry.data, c.zeroT) {
// Register this fetch as a listener
listener := cacheListener{
isCompleted: make(chan int),
isError: make(chan error),
}
cachedEntry.dataMutex.Lock()
if !cachedEntry.isCurrentlyBeingProcessed {
if reflect.DeepEqual(cachedEntry.data, c.zeroT) {
cachedEntry.dataMutex.Unlock()
// Clean up listener resources, no longer registering
close(listener.isError)
close(listener.isCompleted)
return c.zeroT, errors.New("failed to grab entry from cache")
}
cachedEntry.dataMutex.Unlock()
// Clean up listener resources, no longer registering
close(listener.isError)
close(listener.isCompleted)
c.hitsCounter.Add(1)
return cachedEntry.data, nil
}
cachedEntry.cacheListeners = append(cachedEntry.cacheListeners, listener)
cachedEntry.dataMutex.Unlock()
select {
case <-ctx.Done():
return c.zeroT, ctx.Err()
case val := <-listener.isError:
return c.zeroT, val
case <-listener.isCompleted:
c.hitsCounter.Add(1)
return cachedEntry.data, nil
}
}
// Has data, return it
c.hitsCounter.Add(1)
return cachedEntry.data, nil
}
// If cache entry is expired, request dedup for ongoing stuff might fail here, should not happen often
if ok && time.Now().UnixNano() >= cachedEntry.expiryAt {
// Maybe I Could attempt to set data to nil and update timestamp of internal cache, then proceed to refetch here
c.internalCache.Delete(cacheKey)
c.missesCounter.Add(1)
return c.Cache(ctx, cacheKey, cacheFunction, ttlToSet)
}
}
// We need to fetch data here because we don't have it in the cache.
data, err := cacheFunction(ctx)
if err != nil {
// We had an error, first delete the cache key then announce the others.
c.internalCache.Delete(cacheKey)
// Announce listeners of errors if possible
theInternalCacheEntry.dataMutex.Lock()
theInternalCacheEntry.isCurrentlyBeingProcessed = false
for _, listener := range theInternalCacheEntry.cacheListeners {
select {
case listener.isError <- err:
slog.Debug("sent close message to listener")
default:
slog.Debug("could not sent close message to listener")
}
close(listener.isError)
close(listener.isCompleted)
}
theInternalCacheEntry.dataMutex.Unlock()
return c.zeroT, err
}
theInternalCacheEntry.dataMutex.Lock()
// Set the data
theInternalCacheEntry.data = data
theInternalCacheEntry.isCurrentlyBeingProcessed = false
// Announce listeners that data is available
for _, listener := range theInternalCacheEntry.cacheListeners {
select {
case listener.isCompleted <- 1:
slog.Debug("sent completed message to listener")
default:
slog.Debug("could not sent completed message to listener")
}
close(listener.isCompleted)
close(listener.isError)
}
theInternalCacheEntry.dataMutex.Unlock()
c.missesCounter.Add(1)
return data, nil
}
// Stats returns the statistics of the cache.
func (c *Cache[T]) Stats() (hits int, misses int, entries int) {
// Note: we use range here to check if keys have been expired to report the correct entries counter.
// This may not be ideal, could use a background goroutine.
c.internalCache.Range(func(key, value interface{}) bool {
entry, ok := value.(*cacheEntry[T])
if ok {
if time.Now().Unix() < entry.expiryAt {
entries += 1
}
}
return true
})
return int(c.hitsCounter.Load()), int(c.missesCounter.Load()), entries
}