Skip to content

Commit 2f1b9f7

Browse files
author
New year
committed
conn: change design log
1 parent da8a234 commit 2f1b9f7

File tree

3 files changed

+404
-156
lines changed

3 files changed

+404
-156
lines changed

connection.go

Lines changed: 85 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ const (
3232
const shutdownEventKey = "box.shutdown"
3333

3434
type ConnEventKind int
35-
type ConnLogKind int
3635

3736
var (
3837
errUnknownRequest = errors.New("the passed connected request doesn't belong " +
@@ -50,19 +49,6 @@ const (
5049
Shutdown
5150
// Either reconnect attempts exhausted, or explicit Close is called.
5251
Closed
53-
54-
// LogReconnectFailed is logged when reconnect attempt failed.
55-
// LogReconnectFailed ConnLogKind = iota + 1
56-
// LogLastReconnectFailed is logged when last reconnect attempt failed,
57-
// connection will be closed after that.
58-
// LogLastReconnectFailed
59-
// LogUnexpectedResultId is logged when response with unknown id was received.
60-
// Most probably it is due to request timeout.
61-
LogUnexpectedResultId
62-
// LogWatchEventReadFailed is logged when failed to read a watch event.
63-
// LogWatchEventReadFailed
64-
// LogBoxSessionPushUnsupported is logged when response type turned IPROTO_CHUNK.
65-
// LogBoxSessionPushUnsupported
6652
)
6753

6854
// ConnEvent is sent throw Notify channel specified in Opts.
@@ -80,126 +66,6 @@ type connWatchEvent struct {
8066

8167
var epoch = time.Now()
8268

83-
type ConnLogEvent interface{}
84-
85-
// Events of logger
86-
type LogReconnectFailed struct {
87-
Reconnects uint
88-
MaxReconnects uint
89-
Err error
90-
}
91-
92-
type LogLastReconnectFailed struct {
93-
Err error
94-
}
95-
type LogUnexpectedResultID struct {
96-
Header Header
97-
}
98-
type LogWatchEventReadFailed struct {
99-
Err error
100-
}
101-
type LogBoxSessionPushUnsupported struct {
102-
Header Header
103-
}
104-
105-
// Logger is logger type expected to be passed in options.
106-
type Logger interface {
107-
Report(event ConnLogEvent, conn *Connection)
108-
}
109-
110-
type defaultSlogLogger struct {
111-
l *slog.Logger
112-
}
113-
114-
func NewDefaultSlogLogger(l *slog.Logger) Logger {
115-
if l == nil {
116-
l = slog.Default()
117-
}
118-
return &defaultSlogLogger{l: l}
119-
}
120-
121-
type defaultLogger struct{}
122-
123-
func attrsToAny(attrs []slog.Attr) []any {
124-
out := make([]any, 0, len(attrs)*2)
125-
for _, a := range attrs {
126-
out = append(out, a.Key, a.Value.Any())
127-
}
128-
return out
129-
}
130-
131-
func (d *defaultSlogLogger) Report(event ConnLogEvent, conn *Connection) {
132-
level, msg, attrs := eventToSlog(event, conn)
133-
args := attrsToAny(attrs)
134-
ctx := context.Background()
135-
switch level {
136-
case slog.LevelError:
137-
d.l.Error(msg, args...)
138-
case slog.LevelWarn:
139-
d.l.Warn(msg, args...)
140-
case slog.LevelInfo:
141-
d.l.Info(msg, args...)
142-
case slog.LevelDebug:
143-
d.l.Debug(msg, args...)
144-
default:
145-
d.l.Log(ctx, level, msg, args...)
146-
}
147-
}
148-
149-
func eventToSlog(event ConnLogEvent, conn *Connection) (slog.Level, string, []slog.Attr) {
150-
addr := "<nil>"
151-
if conn != nil && conn.Addr() != nil {
152-
addr = fmt.Sprintf("%s", conn.Addr())
153-
}
154-
155-
switch e := event.(type) {
156-
case LogReconnectFailed:
157-
msg := "tarantool: reconnect failed"
158-
attrs := []slog.Attr{
159-
slog.Int("reconnects", int(e.Reconnects)),
160-
slog.Int("max_reconnects", int(e.MaxReconnects)),
161-
slog.String("addr", addr),
162-
}
163-
if e.Err != nil {
164-
attrs = append(attrs, slog.String("error", e.Err.Error()))
165-
}
166-
return slog.LevelWarn, msg, attrs
167-
168-
case LogLastReconnectFailed:
169-
msg := "tarantool: last reconnect failed, giving it up"
170-
attrs := []slog.Attr{slog.String("addr", addr)}
171-
if e.Err != nil {
172-
attrs = append(attrs, slog.String("error", e.Err.Error()))
173-
}
174-
return slog.LevelError, msg, attrs
175-
176-
case LogUnexpectedResultID:
177-
msg := "tarantool: unexpected response request id (probably cancelled request)"
178-
attrs := []slog.Attr{
179-
slog.String("addr", addr),
180-
slog.Any("request_id", e.Header.RequestId),
181-
}
182-
return slog.LevelWarn, msg, attrs
183-
184-
case LogWatchEventReadFailed:
185-
msg := "tarantool: unable to parse watch event"
186-
attrs := []slog.Attr{}
187-
if e.Err != nil {
188-
attrs = append(attrs, slog.String("error", e.Err.Error()))
189-
}
190-
return slog.LevelWarn, msg, attrs
191-
192-
case LogBoxSessionPushUnsupported:
193-
msg := "tarantool: unsupported box.session.push()"
194-
attrs := []slog.Attr{slog.Any("request_id", e.Header.RequestId), slog.String("addr", addr)}
195-
return slog.LevelInfo, msg, attrs
196-
197-
default:
198-
// unknown event: log type and value
199-
return slog.LevelInfo, "tarantool: unexpected event", []slog.Attr{slog.Any("event", event), slog.String("addr", addr)}
200-
}
201-
}
202-
20369
// Connection is a handle with a single connection to a Tarantool instance.
20470
//
20571
// It is created and configured with Connect function, and could not be
@@ -442,7 +308,7 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
442308
}
443309

444310
if conn.opts.Logger == nil {
445-
conn.opts.Logger = defaultLogger{}
311+
conn.opts.Logger = NewSlogLogger(slog.Default())
446312
}
447313

448314
conn.cond = sync.NewCond(&conn.mutex)
@@ -483,6 +349,29 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
483349
return conn, err
484350
}
485351

352+
func (conn *Connection) logEvent(event LogEvent) {
353+
if conn.opts.Logger != nil {
354+
conn.opts.Logger.Report(event, conn)
355+
}
356+
}
357+
358+
// stateToString преобразует состояние соединения в строку
359+
func (conn *Connection) stateToString() string {
360+
state := atomic.LoadUint32(&conn.state)
361+
switch state {
362+
case connDisconnected:
363+
return "disconnected"
364+
case connConnected:
365+
return "connected"
366+
case connShutdown:
367+
return "shutdown"
368+
case connClosed:
369+
return "closed"
370+
default:
371+
return "unknown"
372+
}
373+
}
374+
486375
// ConnectedNow reports if connection is established at the moment.
487376
func (conn *Connection) ConnectedNow() bool {
488377
return atomic.LoadUint32(&conn.state) == connConnected
@@ -652,10 +541,20 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
652541
return
653542
}
654543

544+
// connect обновленная версия с новым логированием
655545
func (conn *Connection) connect(ctx context.Context) error {
656546
var err error
657547
if conn.c == nil && conn.state == connDisconnected {
658548
if err = conn.dial(ctx); err == nil {
549+
// Определяем номер попытки переподключения
550+
var reconnects uint = 0
551+
//TODO нужно отслеживать количество переподключений
552+
553+
// Логируем успешное подключение
554+
conn.logEvent(ConnectedEvent{
555+
baseEvent: newBaseEvent(conn.addr),
556+
Reconnects: reconnects,
557+
})
659558
conn.notify(Connected)
660559
return nil
661560
}
@@ -666,24 +565,30 @@ func (conn *Connection) connect(ctx context.Context) error {
666565
return err
667566
}
668567

568+
// closeConnection обновленная версия с новым логированием
669569
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
670570
conn.lockShards()
671571
defer conn.unlockShards()
572+
672573
if forever {
673574
if conn.state != connClosed {
674575
close(conn.control)
675576
atomic.StoreUint32(&conn.state, connClosed)
676577
conn.cond.Broadcast()
677-
// Free the resources.
678-
if conn.shutdownWatcher != nil {
679-
go conn.shutdownWatcher.Unregister()
680-
conn.shutdownWatcher = nil
681-
}
578+
// Логируем закрытие соединения
579+
conn.logEvent(ClosedEvent{
580+
baseEvent: newBaseEvent(conn.addr),
581+
})
682582
conn.notify(Closed)
683583
}
684584
} else {
685585
atomic.StoreUint32(&conn.state, connDisconnected)
686586
conn.cond.Broadcast()
587+
// Логируем отключение
588+
conn.logEvent(DisconnectedEvent{
589+
baseEvent: newBaseEvent(conn.addr),
590+
Reason: neterr,
591+
})
687592
conn.notify(Disconnected)
688593
}
689594
if conn.c != nil {
@@ -715,6 +620,7 @@ func (conn *Connection) getDialTimeout() time.Duration {
715620
return dialTimeout
716621
}
717622

623+
// runReconnects обновленная версия с новым логированием
718624
func (conn *Connection) runReconnects(ctx context.Context) error {
719625
dialTimeout := conn.getDialTimeout()
720626
var reconnects uint
@@ -728,14 +634,6 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
728634
cancel()
729635

730636
if err != nil {
731-
// The error will most likely be the one that Dialer
732-
// returns to us due to the context being cancelled.
733-
// Although this is not guaranteed. For example,
734-
// if the dialer may throw another error before checking
735-
// the context, and the context has already been
736-
// canceled. Or the context was not canceled after
737-
// the error was thrown, but before the context was
738-
// checked here.
739637
if ctx.Err() != nil {
740638
return err
741639
}
@@ -747,23 +645,33 @@ func (conn *Connection) runReconnects(ctx context.Context) error {
747645
return nil
748646
}
749647

750-
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
648+
// Новое логирование события
649+
conn.logEvent(ReconnectFailedEvent{
650+
baseEvent: newBaseEvent(conn.addr),
651+
Reconnects: reconnects,
652+
MaxReconnects: conn.opts.MaxReconnects,
653+
Error: err,
654+
IsInitial: conn.addr == nil,
655+
})
656+
751657
conn.notify(ReconnectFailed)
752658
reconnects++
753659
conn.mutex.Unlock()
754660

755661
select {
756662
case <-ctx.Done():
757-
// Since the context is cancelled, we don't need to do anything.
758-
// Conn.connect() will return the correct error.
759663
case <-t.C:
760664
}
761665

762666
conn.mutex.Lock()
763667
}
764668

765-
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
766-
// mark connection as closed to avoid reopening by another goroutine
669+
// Новое логирование события последней неудачной попытки
670+
conn.logEvent(LastReconnectFailedEvent{
671+
baseEvent: newBaseEvent(conn.addr),
672+
Error: err,
673+
})
674+
767675
return ClientError{ErrConnectionClosed, "last reconnect failed"}
768676
}
769677

@@ -917,6 +825,7 @@ func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
917825
return event, nil
918826
}
919827

828+
// reader обновленная версия с новым логированием
920829
func (conn *Connection) reader(r io.Reader, c Conn) {
921830
events := make(chan connWatchEvent, 1024)
922831
defer close(events)
@@ -953,11 +862,19 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
953862
ErrProtocolError,
954863
fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
955864
}
956-
conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
865+
// Новое логирование события
866+
conn.logEvent(WatchEventReadFailedEvent{
867+
baseEvent: newBaseEvent(conn.addr),
868+
Error: err,
869+
})
957870
}
958871
continue
959872
} else if code == iproto.IPROTO_CHUNK {
960-
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
873+
// Новое логирование события
874+
conn.logEvent(BoxSessionPushUnsupportedEvent{
875+
baseEvent: newBaseEvent(conn.addr),
876+
RequestId: header.RequestId,
877+
})
961878
} else {
962879
if fut = conn.fetchFuture(header.RequestId); fut != nil {
963880
if err := fut.SetResponse(header, &buf); err != nil {
@@ -968,7 +885,11 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
968885
}
969886

970887
if fut == nil {
971-
conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
888+
// Новое логирование события
889+
conn.logEvent(UnexpectedResultIdEvent{
890+
baseEvent: newBaseEvent(conn.addr),
891+
RequestId: header.RequestId,
892+
})
972893
}
973894
}
974895
}
@@ -1214,6 +1135,7 @@ func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
12141135
}
12151136
}
12161137

1138+
// timeouts обновленная версия с новым логированием
12171139
func (conn *Connection) timeouts() {
12181140
timeout := conn.opts.Timeout
12191141
t := time.NewTimer(timeout)
@@ -1247,6 +1169,13 @@ func (conn *Connection) timeouts() {
12471169
})
12481170
conn.markDone(fut)
12491171
shard.bufmut.Unlock()
1172+
1173+
// Логируем таймаут
1174+
conn.logEvent(TimeoutEvent{
1175+
baseEvent: newBaseEvent(conn.addr),
1176+
RequestId: fut.requestId,
1177+
Timeout: timeout,
1178+
})
12501179
}
12511180
if pair.first != nil && pair.first.timeout < minNext {
12521181
minNext = pair.first.timeout

0 commit comments

Comments
 (0)