Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/thanos/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func setupEndpointSet(
dnsSDInterval time.Duration,
unhealthyTimeout time.Duration,
endpointTimeout time.Duration,
queryTimeout time.Duration,
dialOpts []grpc.DialOption,
injectTestAddresses []string,
queryConnMetricLabels ...string,
Expand Down Expand Up @@ -364,7 +365,7 @@ func setupEndpointSet(
specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...))
}
return removeDuplicateEndpointSpecs(specs)
}, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...)
}, unhealthyTimeout, endpointTimeout, queryTimeout, queryConnMetricLabels...)

g.Add(func() error {
return runutil.Repeat(endpointTimeout, ctx.Done(), func() error {
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func registerQuery(app *extkingpin.App) {
time.Duration(*dnsSDInterval),
time.Duration(*unhealthyStoreTimeout),
time.Duration(*endpointInfoTimeout),
time.Duration(*queryTimeout),
dialOpts,
*injectTestAddresses,
*queryConnMetricLabels...,
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ func runRule(
conf.query.dnsSDInterval,
5*time.Minute,
5*time.Second,
conf.evalInterval,
dialOpts,
[]string{},
)
Expand Down
23 changes: 18 additions & 5 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ type EndpointSet struct {
endpointSpecs func() map[string]*GRPCEndpointSpec
endpointInfoTimeout time.Duration
unhealthyEndpointTimeout time.Duration
gcTimeout time.Duration

updateMtx sync.Mutex

Expand All @@ -241,6 +242,7 @@ func NewEndpointSet(
endpointSpecs func() []*GRPCEndpointSpec,
unhealthyEndpointTimeout time.Duration,
endpointInfoTimeout time.Duration,
queryTimeout time.Duration,
endpointMetricLabels ...string,
) *EndpointSet {
endpointsMetric := newEndpointSetNodeCollector(logger, endpointMetricLabels...)
Expand All @@ -261,6 +263,7 @@ func NewEndpointSet(
logger: log.With(logger, "component", "endpointset"),
endpointsMetric: endpointsMetric,
endpointInfoTimeout: endpointInfoTimeout,
gcTimeout: max(queryTimeout, endpointInfoTimeout, unhealthyEndpointTimeout),
unhealthyEndpointTimeout: unhealthyEndpointTimeout,
endpointSpecs: func() map[string]*GRPCEndpointSpec {
res := make(map[string]*GRPCEndpointSpec)
Expand Down Expand Up @@ -337,7 +340,7 @@ func (e *EndpointSet) Update(ctx context.Context) {

e.updateEndpoint(ctx, spec, newRef)
if !newRef.isQueryable() {
newRef.Close()
newRef.Close(e.gcTimeout)
return
}

Expand Down Expand Up @@ -369,7 +372,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
}
for addr, er := range staleRefs {
level.Info(er.logger).Log("msg", unhealthyEndpointMessage, "address", er.addr, "extLset", labelpb.PromLabelSetsToString(er.LabelSets()))
er.Close()
er.Close(e.gcTimeout)
delete(e.endpoints, addr)
}
level.Debug(e.logger).Log("msg", "updated endpoints", "activeEndpoints", len(e.endpoints))
Expand Down Expand Up @@ -562,7 +565,7 @@ func (e *EndpointSet) Close() {
defer e.endpointsMtx.Unlock()

for _, ef := range e.endpoints {
ef.Close()
ef.Close(e.gcTimeout)
}
e.endpoints = map[string]*endpointRef{}
}
Expand Down Expand Up @@ -819,8 +822,18 @@ func (er *endpointRef) Addr() (string, bool) {
return er.addr, false
}

func (er *endpointRef) Close() {
runutil.CloseWithLogOnErr(er.logger, er.cc, "endpoint %v connection closed", er.addr)
func (er *endpointRef) Close(gcDelay time.Duration) {
// NOTE(GiedriusS): We cannot close the gRPC connection easily. Someone might still be using it even if we do locking.
// I think there are two possibilities:
// 1. Do garbage collection in the background. Question is WHEN to close it.
// 2. We need to ensure no more calls are made to this endpointRef before Close is called. Cannot do this because SendMsg() is async and we might still be
// using it even if Series() has returned (I think?). It would be a lot of work to refactor all clients to use reference counting.
// So, in reality, only one works for now.
// Hence, we need to let the last calls finish. Use the maximum timeout as the garbage collection delay.
level.Info(er.logger).Log("msg", "waiting for gRPC calls to finish before closing", "addr", er.addr, "gcDelay", gcDelay)
time.AfterFunc(gcDelay, func() {
runutil.CloseWithLogOnErr(er.logger, er.cc, "endpoint %v connection closed", er.addr)
})
}

func (er *endpointRef) apisPresent() []string {
Expand Down
49 changes: 43 additions & 6 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
"fmt"
"math"
"net"
"os"
"strings"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"

promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -680,7 +684,7 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
}
return specs
},
time.Minute, 2*time.Second)
time.Minute, 2*time.Second, 10*time.Second)
defer endpointSet.Close()

// Initial update.
Expand Down Expand Up @@ -1049,7 +1053,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) {
}
return specs
},
time.Minute, 2*time.Second)
time.Minute, 2*time.Second, 10*time.Second)
defer endpointSet.Close()

// Should not matter how many of these we run.
Expand Down Expand Up @@ -1160,7 +1164,7 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) {
NewGRPCEndpointSpec(discoveredEndpointAddr[1], false, testGRPCOpts...),
NewGRPCEndpointSpec(discoveredEndpointAddr[2], true, testGRPCOpts...),
}
}, time.Minute, 1*time.Second)
}, time.Minute, 1*time.Second, 10*time.Second)
defer endpointSet.Close()

// Initial update.
Expand Down Expand Up @@ -1341,7 +1345,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {

return tc.states[currentState].endpointSpec()
},
time.Minute, 2*time.Second)
time.Minute, 2*time.Second, 10*time.Second)

defer endpointSet.Close()

Expand Down Expand Up @@ -1526,14 +1530,14 @@ func TestUpdateEndpointStateForgetsPreviousErrors(t *testing.T) {
}

func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, metricLabels ...string) *EndpointSet {
endpointSet := NewEndpointSet(now, nil, nil,
endpointSet := NewEndpointSet(now, log.NewLogfmtLogger(os.Stderr), nil,
func() (specs []*GRPCEndpointSpec) {
for _, addr := range discoveredEndpointAddr {
specs = append(specs, NewGRPCEndpointSpec(addr, strict, testGRPCOpts...))
}
return specs
},
time.Minute, time.Second, metricLabels...)
time.Minute, time.Second, 10*time.Second, metricLabels...)
return endpointSet
}

Expand Down Expand Up @@ -1829,3 +1833,36 @@ func TestEndpointSet_WaitForFirstUpdate(t *testing.T) {
testutil.Equals(t, 1, len(endpointSet.GetStoreClients()))
})
}

func TestEndpointCloseGCTime(t *testing.T) {
endpoints, err := startTestEndpoints([]testEndpointMeta{
{
InfoResponse: sidecarInfo,
extlsetFn: func(addr string) []labelpb.ZLabelSet {
return labelpb.ZLabelSetsFromPromLabels(
labels.FromStrings("addr", addr),
)
},
},
})
testutil.Ok(t, err)
t.Cleanup(endpoints.Close)

discoveredEndpointAddr := endpoints.EndpointAddresses()
endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now)
endpointSet.Update(context.Background())

synctest.Test(t, func(t *testing.T) {
now := time.Now()
er := endpointSet.endpoints[discoveredEndpointAddr[0]]

endpointSet.Close()
time.Sleep(endpointSet.gcTimeout)
elapsed := time.Since(now)
testutil.Assert(t, elapsed == 1*time.Minute, "expected gcTimeout to be 1 minute, got %v", elapsed)

synctest.Wait()
state := er.cc.GetState()
testutil.Assert(t, state == connectivity.Shutdown, "expected connection state to be Shutdown, got %v", state)
})
}
4 changes: 4 additions & 0 deletions pkg/testutil/custom/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func TolerantVerifyLeakMain(m *testing.M) {
goleak.IgnoreTopFunction("k8s.io/klog.(*loggingT).flushDaemon"),
// https://github.com/baidubce/bce-sdk-go/blob/9a8c1139e6a3ad23080b9b8c51dec88df8ce3cda/util/log/logger.go#L359
goleak.IgnoreTopFunction("github.com/baidubce/bce-sdk-go/util/log.NewLogger.func1"),
// gRPC conns are not immediately reaped so we need to ignore these.
goleak.IgnoreTopFunction(`google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run`),
goleak.IgnoreTopFunction(`google.golang.org/grpc.(*addrConn).resetTransport`),
goleak.IgnoreTopFunction(`google.golang.org/grpc.(*addrConn).resetTransportAndUnlock`),
)
}

Expand Down
Loading