Skip to content

Commit 2052d4d

Browse files
authored
Merge pull request #60 from BAIXIONGSODA/feat/sse-support
feat: add Server-Sent Events (SSE) support
2 parents 2b081dc + 4dc9b07 commit 2052d4d

5 files changed

Lines changed: 611 additions & 0 deletions

File tree

apps/koa-esm/src/index.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,46 @@ router.get('/fetch', async (ctx) => {
7777
ctx.body = data
7878
})
7979

80+
// SSE test endpoint - server side
81+
router.get('/sse-server', async (ctx) => {
82+
ctx.set('Content-Type', 'text/event-stream')
83+
ctx.set('Cache-Control', 'no-cache')
84+
ctx.set('Connection', 'keep-alive')
85+
86+
ctx.status = 200
87+
ctx.respond = false
88+
89+
const res = ctx.res
90+
let count = 0
91+
const interval = setInterval(() => {
92+
count++
93+
res.write(`event: message\nid: ${count}\ndata: {"count": ${count}, "time": "${new Date().toISOString()}"}\n\n`)
94+
if (count >= 5) {
95+
clearInterval(interval)
96+
res.end()
97+
}
98+
}, 500)
99+
})
100+
101+
// SSE test endpoint - fetch SSE from external source
102+
router.get('/sse-fetch', async (ctx) => {
103+
// Fetch SSE from our own server
104+
const response = await fetch('http://localhost:3001/sse-server')
105+
106+
// Consume the stream to capture events
107+
const reader = response.body.getReader()
108+
const decoder = new TextDecoder()
109+
let result = ''
110+
111+
while (true) {
112+
const { done, value } = await reader.read()
113+
if (done) break
114+
result += decoder.decode(value, { stream: true })
115+
}
116+
117+
ctx.body = { message: 'SSE stream consumed', data: result }
118+
})
119+
80120
app.use(router.routes())
81121
app.listen(3001)
82122

packages/network-debugger/src/core/fetch.test.ts

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,5 +535,235 @@ describe('core/fetch.ts', () => {
535535
expect(mockFetch).toHaveBeenCalledWith('https://example.com/api', options)
536536
})
537537
})
538+
539+
describe('SSE (Server-Sent Events) 处理', () => {
540+
// Helper to create a mock SSE response
541+
function createMockSSEResponse(events: string[], options: { delay?: number } = {}) {
542+
const { delay = 0 } = options
543+
const mockHeaders = new Headers({
544+
'content-type': 'text/event-stream'
545+
})
546+
547+
// Create a mock ReadableStream
548+
let readerIndex = 0
549+
const encoder = new TextEncoder()
550+
const chunks = events.map((event) => encoder.encode(event))
551+
552+
const mockReader = {
553+
read: vi.fn().mockImplementation(async () => {
554+
if (delay > 0) {
555+
await new Promise((resolve) => setTimeout(resolve, delay))
556+
}
557+
if (readerIndex >= chunks.length) {
558+
return { done: true, value: undefined }
559+
}
560+
const value = chunks[readerIndex]
561+
readerIndex++
562+
return { done: false, value }
563+
})
564+
}
565+
566+
const mockBody = {
567+
getReader: vi.fn().mockReturnValue(mockReader)
568+
}
569+
570+
return {
571+
status: 200,
572+
headers: mockHeaders,
573+
body: mockBody,
574+
clone: vi.fn().mockReturnValue({
575+
body: mockBody,
576+
arrayBuffer: vi.fn().mockResolvedValue(new ArrayBuffer(0))
577+
})
578+
} as unknown as Response
579+
}
580+
581+
test('正确识别 SSE 响应 (text/event-stream)', async () => {
582+
const mockResponse = createMockSSEResponse(['data: hello\n\n'])
583+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
584+
const { mockMainProcess } = createMockMainProcess()
585+
586+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
587+
const result = await proxyFn('https://example.com/sse')
588+
589+
// SSE 响应应该直接返回,不等待流结束
590+
expect(result).toBe(mockResponse)
591+
})
592+
593+
test('解析简单的 SSE 事件', async () => {
594+
const mockResponse = createMockSSEResponse(['data: hello world\n\n'])
595+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
596+
const mockSend = vi.fn()
597+
const mockSendRequest = vi.fn().mockReturnThis()
598+
const mockMainProcess = {
599+
sendRequest: mockSendRequest,
600+
send: mockSend
601+
}
602+
603+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
604+
await proxyFn('https://example.com/sse')
605+
606+
// 等待流处理完成
607+
await new Promise((resolve) => setTimeout(resolve, 50))
608+
609+
expect(mockSend).toHaveBeenCalledWith({
610+
type: 'eventSourceMessage',
611+
data: {
612+
requestId: expect.any(String),
613+
eventName: 'message',
614+
eventId: '',
615+
data: 'hello world'
616+
}
617+
})
618+
})
619+
620+
test('解析带有 event 类型的 SSE 事件', async () => {
621+
const mockResponse = createMockSSEResponse(['event: custom\ndata: test data\n\n'])
622+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
623+
const mockSend = vi.fn()
624+
const mockSendRequest = vi.fn().mockReturnThis()
625+
const mockMainProcess = {
626+
sendRequest: mockSendRequest,
627+
send: mockSend
628+
}
629+
630+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
631+
await proxyFn('https://example.com/sse')
632+
633+
await new Promise((resolve) => setTimeout(resolve, 50))
634+
635+
expect(mockSend).toHaveBeenCalledWith({
636+
type: 'eventSourceMessage',
637+
data: {
638+
requestId: expect.any(String),
639+
eventName: 'custom',
640+
eventId: '',
641+
data: 'test data'
642+
}
643+
})
644+
})
645+
646+
test('解析带有 id 的 SSE 事件', async () => {
647+
const mockResponse = createMockSSEResponse(['id: 123\ndata: with id\n\n'])
648+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
649+
const mockSend = vi.fn()
650+
const mockSendRequest = vi.fn().mockReturnThis()
651+
const mockMainProcess = {
652+
sendRequest: mockSendRequest,
653+
send: mockSend
654+
}
655+
656+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
657+
await proxyFn('https://example.com/sse')
658+
659+
await new Promise((resolve) => setTimeout(resolve, 50))
660+
661+
expect(mockSend).toHaveBeenCalledWith({
662+
type: 'eventSourceMessage',
663+
data: {
664+
requestId: expect.any(String),
665+
eventName: 'message',
666+
eventId: '123',
667+
data: 'with id'
668+
}
669+
})
670+
})
671+
672+
test('处理多行 data', async () => {
673+
const mockResponse = createMockSSEResponse(['data: line1\ndata: line2\n\n'])
674+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
675+
const mockSend = vi.fn()
676+
const mockSendRequest = vi.fn().mockReturnThis()
677+
const mockMainProcess = {
678+
sendRequest: mockSendRequest,
679+
send: mockSend
680+
}
681+
682+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
683+
await proxyFn('https://example.com/sse')
684+
685+
await new Promise((resolve) => setTimeout(resolve, 50))
686+
687+
expect(mockSend).toHaveBeenCalledWith({
688+
type: 'eventSourceMessage',
689+
data: {
690+
requestId: expect.any(String),
691+
eventName: 'message',
692+
eventId: '',
693+
data: 'line1\nline2'
694+
}
695+
})
696+
})
697+
698+
test('处理多个连续的 SSE 事件', async () => {
699+
const mockResponse = createMockSSEResponse([
700+
'data: first\n\n',
701+
'data: second\n\n',
702+
'data: third\n\n'
703+
])
704+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
705+
const mockSend = vi.fn()
706+
const mockSendRequest = vi.fn().mockReturnThis()
707+
const mockMainProcess = {
708+
sendRequest: mockSendRequest,
709+
send: mockSend
710+
}
711+
712+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
713+
await proxyFn('https://example.com/sse')
714+
715+
await new Promise((resolve) => setTimeout(resolve, 100))
716+
717+
const eventSourceCalls = mockSend.mock.calls.filter(
718+
(call) => call[0]?.type === 'eventSourceMessage'
719+
)
720+
expect(eventSourceCalls.length).toBe(3)
721+
expect(eventSourceCalls[0][0].data.data).toBe('first')
722+
expect(eventSourceCalls[1][0].data.data).toBe('second')
723+
expect(eventSourceCalls[2][0].data.data).toBe('third')
724+
})
725+
726+
test('SSE 流结束后发送 endRequest', async () => {
727+
const mockResponse = createMockSSEResponse(['data: test\n\n'])
728+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
729+
const mockSend = vi.fn()
730+
const mockSendRequest = vi.fn().mockReturnThis()
731+
const mockMainProcess = {
732+
sendRequest: mockSendRequest,
733+
send: mockSend
734+
}
735+
736+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
737+
await proxyFn('https://example.com/sse')
738+
739+
await new Promise((resolve) => setTimeout(resolve, 50))
740+
741+
expect(mockSendRequest).toHaveBeenCalledWith('endRequest', expect.any(RequestDetail))
742+
})
743+
744+
test('非 SSE 响应不触发 eventSourceMessage', async () => {
745+
const mockResponse = createMockResponse({
746+
headers: { 'content-type': 'application/json' },
747+
body: '{"key": "value"}'
748+
})
749+
const mockFetch = vi.fn().mockResolvedValue(mockResponse)
750+
const mockSend = vi.fn()
751+
const mockSendRequest = vi.fn().mockReturnThis()
752+
const mockMainProcess = {
753+
sendRequest: mockSendRequest,
754+
send: mockSend
755+
}
756+
757+
const proxyFn = fetchProxyFactory(mockFetch, mockMainProcess as never)
758+
await proxyFn('https://example.com/api')
759+
760+
await new Promise((resolve) => setTimeout(resolve, 50))
761+
762+
const eventSourceCalls = mockSend.mock.calls.filter(
763+
(call) => call[0]?.type === 'eventSourceMessage'
764+
)
765+
expect(eventSourceCalls.length).toBe(0)
766+
})
767+
})
538768
})
539769
})

0 commit comments

Comments
 (0)