Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions router/core/graphql_sse_regression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
package core

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
graphql_datasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/graphql_datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
)

type aliasingSubscriptionUpdater struct {
mu sync.Mutex
updates [][]byte
firstUpdate chan struct{}
completed chan struct{}
closed chan struct{}
}

func newAliasingSubscriptionUpdater() *aliasingSubscriptionUpdater {
return &aliasingSubscriptionUpdater{
firstUpdate: make(chan struct{}, 1),
completed: make(chan struct{}, 1),
closed: make(chan struct{}, 1),
}
}

func (a *aliasingSubscriptionUpdater) Update(data []byte) {
a.mu.Lock()
a.updates = append(a.updates, data)
isFirst := len(a.updates) == 1
a.mu.Unlock()

if isFirst {
select {
case a.firstUpdate <- struct{}{}:
default:
}
}
}

func (a *aliasingSubscriptionUpdater) UpdateSubscription(_ resolve.SubscriptionIdentifier, data []byte) {
a.Update(data)
}

func (a *aliasingSubscriptionUpdater) Complete() {
select {
case a.completed <- struct{}{}:
default:
}
}

func (a *aliasingSubscriptionUpdater) Close(_ resolve.SubscriptionCloseKind) {
select {
case a.closed <- struct{}{}:
default:
}
}

func (a *aliasingSubscriptionUpdater) CloseSubscription(_ resolve.SubscriptionCloseKind, _ resolve.SubscriptionIdentifier) {
}

func (a *aliasingSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
return nil
}

func (a *aliasingSubscriptionUpdater) Updates() []string {
a.mu.Lock()
defer a.mu.Unlock()

out := make([]string, len(a.updates))
for i := range a.updates {
out[i] = string(a.updates[i])
}
return out
}

func TestGraphQLSubscriptionClient_SSESingleLinePayloadsRemainStableAfterNextRead(t *testing.T) {
serverDone := make(chan struct{})
updater := newAliasingSubscriptionUpdater()

firstPayload := `{"data":{"tokenPriceUpdated":{"priceUsd":1.0001}}}`
secondPayload := `{"data":{"tokenPriceUpdated":{"priceUsd":1.0002}}}`

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.True(t, ok)

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

_, _ = fmt.Fprintf(w, "event: next\ndata: %s\n\n", firstPayload)
flusher.Flush()

select {
case <-updater.firstUpdate:
case <-time.After(time.Second):
t.Fatal("timed out waiting for first SSE update to be observed")
}

_, _ = fmt.Fprintf(w, "event: next\ndata: %s\n\n", secondPayload)
flusher.Flush()

_, _ = fmt.Fprint(w, "event: complete\n\n")
flusher.Flush()

close(serverDone)
}))
Comment on lines +91 to +115
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Read-only verification: find FailNow-style assertions in handler goroutines.
rg -nP 'httptest\.NewServer\(http\.HandlerFunc\(func\(' router/core/graphql_sse_regression_test.go
rg -nP '\brequire\.[A-Za-z_]+\(\s*t\s*,' router/core/graphql_sse_regression_test.go
rg -nP '\bt\.Fatalf?\(' router/core/graphql_sse_regression_test.go

Repository: wundergraph/cosmo

Length of output: 1280


🏁 Script executed:

cat -n router/core/graphql_sse_regression_test.go | sed -n '85,220p'

Repository: wundergraph/cosmo

Length of output: 4772


Avoid require/t.Fatal inside httptest handler goroutines.

require.True(t, ok) at lines 93 and 191, and t.Fatal(...) at lines 105 and 203 run in handler goroutines spawned by httptest.NewServer. Calling these assertion/failure methods outside the main test goroutine will cause a runtime panic. Route handler errors to a channel and assert in the main test goroutine instead.

💡 Suggested fix pattern
+	handlerErr := make(chan error, 1)
 	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		flusher, ok := w.(http.Flusher)
-		require.True(t, ok)
+		if !ok {
+			select { case handlerErr <- fmt.Errorf("response writer does not implement http.Flusher"): default: }
+			return
+		}
 ...
-		_, _ = fmt.Fprintf(w, "event: next\ndata: %s\n\n", firstPayload)
+		if _, err := fmt.Fprintf(w, "event: next\ndata: %s\n\n", firstPayload); err != nil {
+			select { case handlerErr <- fmt.Errorf("write first payload: %w", err): default: }
+			return
+		}
 		flusher.Flush()
 ...
-		case <-time.After(time.Second):
-			t.Fatal("timed out waiting for first SSE update to be observed")
+		case <-time.After(time.Second):
+			select { case handlerErr <- fmt.Errorf("timed out waiting for first SSE update"): default: }
+			return
 		}
 ...
 	}))
 ...
+	select {
+	case err := <-handlerErr:
+		require.NoError(t, err)
+	default:
+	}

Also applies to: lines 189–215

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@router/core/graphql_sse_regression_test.go` around lines 91 - 115, The
httptest handler is calling test assertions (require.True / t.Fatal) inside its
goroutine which can panic; instead capture any handler-side errors and signals
and send them over a channel to be asserted in the main test goroutine. Modify
the server handler created in httptest.NewServer (the anonymous
http.HandlerFunc) to replace require.True(t, ok) and t.Fatal(...) with sending
an error or boolean into a dedicated errCh (or doneCh) and close/send on
serverDone/updater-firstUpdate channels as needed, then in the main test
goroutine receive from errCh and perform the require/assert (e.g., check the
flusher ok, timeout errors) and fail the test there. Ensure you reference the
handler's flusher check and timeouts around updater.firstUpdate and serverDone
so all assertions happen in the main test goroutine.

defer server.Close()

engineCtx, engineCancel := context.WithCancel(context.Background())
defer engineCancel()

requestCtx, requestCancel := context.WithCancel(context.Background())
defer requestCancel()

client := graphql_datasource.NewGraphQLSubscriptionClient(
http.DefaultClient,
http.DefaultClient,
engineCtx,
graphql_datasource.WithReadTimeout(time.Millisecond),
)

done := make(chan error, 1)
go func() {
done <- client.Subscribe(resolve.NewContext(requestCtx), graphql_datasource.GraphQLSubscriptionOptions{
URL: server.URL,
Body: graphql_datasource.GraphQLBody{
Query: `subscription { tokenPriceUpdated { priceUsd } }`,
},
UseSSE: true,
}, updater)
}()

select {
case <-updater.completed:
case <-time.After(time.Second):
t.Fatal("timed out waiting for completion")
}

select {
case <-updater.closed:
case <-time.After(time.Second):
t.Fatal("timed out waiting for close")
}

select {
case err := <-done:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("timed out waiting for subscription client to exit")
}

select {
case <-serverDone:
case <-time.After(time.Second):
t.Fatal("timed out waiting for SSE server to finish")
}

updates := updater.Updates()
require.Len(t, updates, 2)
require.Equal(t, firstPayload, updates[0], "first SSE payload should remain stable after the next event is read")
require.Equal(t, secondPayload, updates[1])
}

func TestGraphQLSubscriptionClient_SSEPayloadsRemainStableAcrossBurstReads(t *testing.T) {
const payloadCount = 64

expectedPayloads := make([]string, 0, payloadCount)
for i := 0; i < payloadCount; i++ {
expectedPayloads = append(expectedPayloads, fmt.Sprintf(
`{"data":{"tokenPriceUpdated":{"sequence":%d,"priceUsd":1.%04d,"note":"%s"}}}`,
i,
1000+i,
strings.Repeat(string(rune('a'+(i%26))), 8+i),
))
}

serverDone := make(chan struct{})
updater := newAliasingSubscriptionUpdater()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.True(t, ok)

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

_, _ = fmt.Fprintf(w, "event: next\ndata: %s\n\n", expectedPayloads[0])
flusher.Flush()

select {
case <-updater.firstUpdate:
case <-time.After(time.Second):
t.Fatal("timed out waiting for first SSE update to be observed")
}

for i := 1; i < len(expectedPayloads); i++ {
_, _ = fmt.Fprintf(w, "event: next\ndata: %s\n\n", expectedPayloads[i])
flusher.Flush()
}

_, _ = fmt.Fprint(w, "event: complete\n\n")
flusher.Flush()

close(serverDone)
}))
defer server.Close()

engineCtx, engineCancel := context.WithCancel(context.Background())
defer engineCancel()

requestCtx, requestCancel := context.WithCancel(context.Background())
defer requestCancel()

client := graphql_datasource.NewGraphQLSubscriptionClient(
http.DefaultClient,
http.DefaultClient,
engineCtx,
graphql_datasource.WithReadTimeout(time.Millisecond),
)

done := make(chan error, 1)
go func() {
done <- client.Subscribe(resolve.NewContext(requestCtx), graphql_datasource.GraphQLSubscriptionOptions{
URL: server.URL,
Body: graphql_datasource.GraphQLBody{
Query: `subscription { tokenPriceUpdated { sequence priceUsd note } }`,
},
UseSSE: true,
}, updater)
}()

select {
case <-updater.completed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for completion")
}

select {
case <-updater.closed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for close")
}

select {
case err := <-done:
require.NoError(t, err)
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for subscription client to exit")
}

select {
case <-serverDone:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for SSE server to finish")
}

updates := updater.Updates()
require.Equal(t, expectedPayloads, updates, "every queued SSE payload should remain stable after later events are read")
}
Loading