Skip to content

Commit 4cd3c68

Browse files
committed
keep-alive in prometheus output
1 parent 26fb142 commit 4cd3c68

1 file changed

Lines changed: 32 additions & 7 deletions

File tree

plugin/output/prometheus/prometheus.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package prometheus
22

33
import (
44
"context"
5+
"net"
6+
"net/http"
57
"strings"
68
"time"
79

@@ -187,13 +189,19 @@ type Plugin struct {
187189
ctx context.Context
188190
cancel context.CancelFunc
189191

190-
client *promwrite.Client
192+
client PrometheusClient
191193
batcher *pipeline.RetriableBatcher
192194

193195
// plugin metrics
194196
sendErrorMetric prometheus.Counter
195197

196198
collector *metricCollector
199+
200+
router *pipeline.Router
201+
}
202+
203+
type PrometheusClient interface {
204+
Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error)
197205
}
198206

199207
func init() {
@@ -228,22 +236,28 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
228236
MetricCtl: params.MetricCtl,
229237
}
230238

239+
p.router = params.Router
231240
backoffOpts := pipeline.BackoffOpts{
232-
MinRetention: p.config.Retention_,
233-
Multiplier: float64(p.config.RetentionExponentMultiplier),
234-
AttemptNum: p.config.Retry,
241+
MinRetention: p.config.Retention_,
242+
Multiplier: float64(p.config.RetentionExponentMultiplier),
243+
AttemptNum: p.config.Retry,
244+
IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
235245
}
236246

237-
onError := func(err error) {
247+
onError := func(err error, events []*pipeline.Event) {
238248
var level zapcore.Level
239-
if p.config.FatalOnFailedInsert {
249+
if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
240250
level = zapcore.FatalLevel
241251
} else {
242252
level = zapcore.ErrorLevel
243253
}
244254

245255
p.logger.Log(level, "can't send data to Prometheus", zap.Error(err),
246256
zap.Int("retries", p.config.Retry))
257+
258+
for i := range events {
259+
p.router.Fail(events[i])
260+
}
247261
}
248262

249263
p.batcher = pipeline.NewRetriableBatcher(
@@ -373,5 +387,16 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
373387
}
374388

375389
func (p *Plugin) prepareClient() {
376-
p.client = promwrite.NewClient(p.config.Endpoint)
390+
customClient := &http.Client{
391+
Transport: &http.Transport{
392+
DialContext: (&net.Dialer{
393+
Timeout: p.config.ConnectionTimeout_,
394+
KeepAlive: p.config.KeepAlive.MaxConnDuration_,
395+
}).DialContext,
396+
IdleConnTimeout: p.config.KeepAlive.MaxIdleConnDuration_,
397+
},
398+
Timeout: p.config.RequestTimeout_,
399+
}
400+
401+
p.client = promwrite.NewClient(p.config.Endpoint, promwrite.HttpClient(customClient))
377402
}

0 commit comments

Comments
 (0)