Skip to content

Commit 4dd2bba

Browse files
committed
proxy: Close client connection if session is expired / closed.
This handles cases where during development the machine is suspended and on resume, the session is expired but the connection is still open. As for the expired session the internal context is closed, the session can no longer be used.
1 parent c27ad30 commit 4dd2bba

File tree

3 files changed

+60
-11
lines changed

3 files changed

+60
-11
lines changed

proxy/proxy_server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,8 @@ func (s *ProxyServer) DeleteSession(id uint64) {
13821382
func (s *ProxyServer) deleteSessionLocked(id uint64) {
13831383
if session, found := s.sessions[id]; found {
13841384
delete(s.sessions, id)
1385+
s.sessionsLock.Unlock()
1386+
defer s.sessionsLock.Lock()
13851387
session.Close()
13861388
statsSessionsCurrent.Dec()
13871389
}

proxy/proxy_server_test.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"net/http/httptest"
3434
"os"
3535
"strings"
36+
"sync/atomic"
3637
"testing"
3738
"time"
3839

@@ -353,8 +354,11 @@ func TestProxyCreateSession(t *testing.T) {
353354
}
354355

355356
type HangingTestMCU struct {
356-
t *testing.T
357-
ctx context.Context
357+
t *testing.T
358+
ctx context.Context
359+
creating chan struct{}
360+
created chan struct{}
361+
cancelled atomic.Bool
358362
}
359363

360364
func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
@@ -364,8 +368,10 @@ func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
364368
})
365369

366370
return &HangingTestMCU{
367-
t: t,
368-
ctx: ctx,
371+
t: t,
372+
ctx: ctx,
373+
creating: make(chan struct{}),
374+
created: make(chan struct{}),
369375
}
370376
}
371377

@@ -393,8 +399,14 @@ func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.Mc
393399
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
394400
defer cancel()
395401

402+
m.creating <- struct{}{}
403+
defer func() {
404+
m.created <- struct{}{}
405+
}()
406+
396407
select {
397408
case <-ctx.Done():
409+
m.cancelled.Store(true)
398410
return nil, ctx.Err()
399411
case <-ctx2.Done():
400412
return nil, errors.New("Should have been cancelled before")
@@ -405,8 +417,14 @@ func (m *HangingTestMCU) NewSubscriber(ctx context.Context, listener signaling.M
405417
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
406418
defer cancel()
407419

420+
m.creating <- struct{}{}
421+
defer func() {
422+
m.created <- struct{}{}
423+
}()
424+
408425
select {
409426
case <-ctx.Done():
427+
m.cancelled.Store(true)
410428
return nil, ctx.Err()
411429
case <-ctx2.Done():
412430
return nil, errors.New("Should have been cancelled before")
@@ -419,7 +437,8 @@ func TestProxyCancelOnClose(t *testing.T) {
419437
require := require.New(t)
420438
proxy, key, server := newProxyServerForTest(t)
421439

422-
proxy.mcu = NewHangingTestMCU(t)
440+
mcu := NewHangingTestMCU(t)
441+
proxy.mcu = mcu
423442

424443
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
425444
defer cancel()
@@ -436,22 +455,35 @@ func TestProxyCancelOnClose(t *testing.T) {
436455
_, err := client.RunUntilLoad(ctx, 0)
437456
assert.NoError(err)
438457

439-
require.NoError(client.SendCommand(&signaling.CommandProxyClientMessage{
440-
Type: "create-publisher",
441-
StreamType: signaling.StreamTypeVideo,
458+
require.NoError(client.WriteJSON(&signaling.ProxyClientMessage{
459+
Id: "2345",
460+
Type: "command",
461+
Command: &signaling.CommandProxyClientMessage{
462+
Type: "create-publisher",
463+
StreamType: signaling.StreamTypeVideo,
464+
},
442465
}))
443466

444467
// Simulate expired session while request is still being processed.
445468
go func() {
469+
<-mcu.creating
446470
if session := proxy.GetSession(1); assert.NotNil(session) {
447471
session.Close()
448472
}
449473
}()
450474

451475
if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
452-
if err := checkMessageType(message, "error"); assert.NoError(err) {
453-
assert.Equal("internal_error", message.Error.Code)
454-
assert.Equal(context.Canceled.Error(), message.Error.Message)
476+
if err := checkMessageType(message, "bye"); assert.NoError(err) {
477+
assert.Equal("session_closed", message.Bye.Reason)
455478
}
456479
}
480+
481+
if message, err := client.RunUntilMessage(ctx); assert.Error(err) {
482+
assert.True(websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived), "expected close error, got %+v", err)
483+
} else {
484+
t.Errorf("expected error, got %+v", message)
485+
}
486+
487+
<-mcu.created
488+
assert.True(mcu.cancelled.Load())
457489
}

proxy/proxy_session.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,24 @@ func (s *ProxySession) MarkUsed() {
104104
}
105105

106106
func (s *ProxySession) Close() {
107+
prev := s.SetClient(nil)
108+
if prev != nil {
109+
reason := "session_closed"
110+
if s.IsExpired() {
111+
reason = "session_expired"
112+
}
113+
prev.SendMessage(&signaling.ProxyServerMessage{
114+
Type: "bye",
115+
Bye: &signaling.ByeProxyServerMessage{
116+
Reason: reason,
117+
},
118+
})
119+
}
120+
107121
s.closeFunc()
108122
s.clearPublishers()
109123
s.clearSubscribers()
124+
s.proxy.DeleteSession(s.Sid())
110125
}
111126

112127
func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {

0 commit comments

Comments
 (0)