Skip to content

Commit 69a394a

Browse files
authored
Do not block main server thread when launching streaming threads (#5)
Instead, handle streaming queue in a new thread (which serialises writes to the connection socket, and handlers errors such as IOError, Errno::EPIPE triggering callbacks. This is so that the server's request thread (ie Puma) can be quickly returned to the pool. Servers like Falcon (fibers instead of threads) should not have this problem, but they should still work fine with this (they will spawn an extra fiber, but that should be cheap). Possible issues: This change decouples the server's thread pool from Datastar's streaming threads, which ATM are unbounded (an app with long-lived streams could potentially spawn thousands of threads even if the server is configured with a limited pool. This can be problematic, because: * The server can run out of resourced * If the streams rely on services such as database connections, they could quickly drain those connection pools. Possible solution: provide configuration for a separate, Datastar-specific thread-pool so that it can be tweaked as per available resources (such as database pools)
1 parent 5bb5eda commit 69a394a

4 files changed

Lines changed: 76 additions & 21 deletions

File tree

lib/datastar/dispatcher.rb

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def stream_one(streamer)
285285
proc do |socket|
286286
generator = ServerSentEventGenerator.new(socket, signals:, view_context: @view_context)
287287
@on_connect.each { |callable| callable.call(generator) }
288-
handling_errors(generator, socket) do
288+
handling_sync_errors(generator, socket) do
289289
streamer.call(generator)
290290
end
291291
ensure
@@ -313,17 +313,23 @@ def stream_many(streamer)
313313
@on_connect.each { |callable| callable.call(conn_generator) }
314314

315315
threads = @streamers.map do |streamer|
316+
duped_signals = signs.dup.freeze
316317
@executor.spawn do
317318
# TODO: Review thread-safe view context
318-
generator = ServerSentEventGenerator.new(@queue, signals: signs, view_context: @view_context)
319+
generator = ServerSentEventGenerator.new(@queue, signals: duped_signals, view_context: @view_context)
319320
streamer.call(generator)
320321
@queue << :done
321322
rescue StandardError => e
322323
@queue << e
323324
end
324325
end
325326

326-
handling_errors(conn_generator, socket) do
327+
# Now launch the control thread that actually writes to the socket
328+
# We don't want to block the main thread, so that servers like Puma
329+
# which have a limited thread pool can keep serving other requests
330+
# Other streamers will push any StandardError exceptions to the queue
331+
# So we handle them here
332+
@executor.spawn do
327333
done_count = 0
328334
threads_size = @heartbeat_on ? threads.size - 1 : threads.size
329335

@@ -332,24 +338,46 @@ def stream_many(streamer)
332338
done_count += 1
333339
@queue << nil if done_count == threads_size
334340
elsif data.is_a?(Exception)
335-
raise data
341+
handle_streaming_error(data, socket)
342+
@queue << nil
336343
else
337-
socket << data
344+
# Here we attempt writing to the actual socket
345+
# which may raise an IOError if the client disconnected
346+
begin
347+
socket << data
348+
rescue Exception => e
349+
handle_streaming_error(e, socket)
350+
@queue << nil
351+
end
338352
end
339353
end
354+
355+
ensure
356+
@on_server_disconnect.each { |callable| callable.call(conn_generator) }
357+
@executor.stop(threads) if threads
358+
socket.close
340359
end
341-
ensure
342-
@executor.stop(threads) if threads
343-
socket.close
344360
end
345361
end
346362

347-
# Run a streaming block while handling errors
363+
# Handle errors caught during streaming
364+
# @param error [Exception] the error that occurred
365+
# @param socket [IO] the socket to pass to error handlers
366+
def handle_streaming_error(error, socket)
367+
case error
368+
when IOError, Errno::EPIPE, Errno::ECONNRESET
369+
@on_client_disconnect.each { |callable| callable.call(socket) }
370+
when Exception
371+
@on_error.each { |callable| callable.call(error) }
372+
end
373+
end
374+
375+
# Run a block while handling errors
348376
# @param generator [ServerSentEventGenerator]
349377
# @param socket [IO]
350378
# @yield
351379
# @api private
352-
def handling_errors(generator, socket, &)
380+
def handling_sync_errors(generator, socket, &)
353381
yield
354382

355383
@on_server_disconnect.each { |callable| callable.call(generator) }

lib/datastar/server_sent_event_generator.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ class ServerSentEventGenerator
2424

2525
attr_reader :signals
2626

27+
# @param stream [IO, Queue] The IO stream or Queue to write to
28+
# @option signals [Hash] A hash of signals (params)
29+
# @option view_context [Object] The view context for rendering elements, if applicable.
2730
def initialize(stream, signals:, view_context: nil)
2831
@stream = stream
2932
@signals = signals

spec/dispatcher_spec.rb

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,35 @@
22

33
class TestSocket
44
attr_reader :lines, :open
5-
def initialize
5+
6+
def initialize(open: true)
67
@lines = []
7-
@open = true
8+
@open = open
9+
@finish = Thread::Queue.new
810
end
911

1012
def <<(line)
13+
raise Errno::EPIPE, 'Socket closed' unless @open
14+
1115
@lines << line
1216
end
1317

14-
def close = @open = false
18+
def close
19+
@open = false
20+
@finish << true
21+
end
1522

1623
def split_lines
1724
@lines.join.split("\n")
1825
end
26+
27+
# Streams run in threads
28+
# we can call this to signal the end of the stream
29+
# in tests
30+
def wait_for_close(&)
31+
@finish.pop
32+
yield if block_given?
33+
end
1934
end
2035

2136
RSpec.describe Datastar::Dispatcher do
@@ -407,6 +422,8 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
407422
end
408423

409424
dispatcher.response.body.call(socket)
425+
426+
socket.wait_for_close
410427
expect(socket.open).to be(false)
411428
expect(socket.lines.size).to eq(2)
412429
expect(socket.lines[0]).to eq("event: datastar-patch-elements\ndata: elements <div id=\"foo\">\ndata: elements <span>hello</span>\ndata: elements </div>\n\n")
@@ -448,15 +465,17 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
448465
block_called = false
449466
dispatcher.on_client_disconnect { |conn| connected = false }
450467

451-
socket = TestSocket.new
452-
allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
468+
socket = TestSocket.new(open: false)
469+
# allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
453470

454471
dispatcher.stream do |sse|
455472
sleep 10
456473
block_called = true
457474
end
458475

459476
dispatcher.response.body.call(socket)
477+
socket.wait_for_close
478+
460479
expect(connected).to be(false)
461480
expect(block_called).to be(false)
462481
end
@@ -467,8 +486,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
467486
block_called = false
468487
dispatcher.on_client_disconnect { |conn| connected = false }
469488

470-
socket = TestSocket.new
471-
allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
489+
socket = TestSocket.new(open: false)
472490

473491
dispatcher.stream do |sse|
474492
sleep 0.001
@@ -496,6 +514,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
496514
end
497515
socket = TestSocket.new
498516
dispatcher.response.body.call(socket)
517+
socket.wait_for_close
499518

500519
expect(signals['user']['name']).to eq('joe')
501520
end
@@ -520,10 +539,10 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
520539
dispatcher.stream do |sse|
521540
sse.patch_signals(foo: 'bar')
522541
end
523-
socket = TestSocket.new
524-
allow(socket).to receive(:<<).and_raise(Errno::EPIPE, 'Socket closed')
542+
socket = TestSocket.new(open: false)
525543

526544
dispatcher.response.body.call(socket)
545+
socket.wait_for_close
527546
expect(events).to eq([true, false])
528547
end
529548

@@ -536,10 +555,10 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
536555
dispatcher.stream do |sse|
537556
sse.check_connection!
538557
end
539-
socket = TestSocket.new
540-
allow(socket).to receive(:<<).with("\n").and_raise(Errno::EPIPE, 'Socket closed')
558+
socket = TestSocket.new(open: false)
541559

542560
dispatcher.response.body.call(socket)
561+
socket.wait_for_close
543562
expect(events).to eq([true, false])
544563
end
545564

@@ -555,6 +574,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
555574
socket = TestSocket.new
556575

557576
dispatcher.response.body.call(socket)
577+
socket.wait_for_close
558578
expect(events).to eq([true, false])
559579
end
560580

@@ -570,6 +590,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
570590
allow(socket).to receive(:<<).and_raise(ArgumentError, 'Invalid argument')
571591

572592
dispatcher.response.body.call(socket)
593+
socket.wait_for_close
573594
expect(errors.first).to be_a(ArgumentError)
574595
expect(Datastar.config.logger).to have_received(:error).with(/ArgumentError \(Invalid argument\):/)
575596
end
@@ -584,6 +605,7 @@ def self.render_in(view_context) = %(<div id="foo">\n<span>#{view_context}</span
584605
sse.patch_signals(foo: 'bar')
585606
end
586607
dispatcher.response.body.call(socket)
608+
socket.wait_for_close
587609
expect(errs.first).to be_a(ArgumentError)
588610
end
589611
end

spec/support/dispatcher_examples.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ module DispatcherExamples
1919

2020
socket = TestSocket.new
2121
dispatcher.response.body.call(socket)
22+
socket.wait_for_close
2223
expect(socket.open).to be(false)
2324
expect(socket.lines.size).to eq(2)
2425
expect(socket.lines[0]).to eq("event: datastar-patch-signals\ndata: signals {\"foo\":\"bar\"}\n\n")
@@ -45,6 +46,7 @@ module DispatcherExamples
4546

4647
socket = TestSocket.new
4748
dispatcher.response.body.call(socket)
49+
socket.wait_for_close
4850
expect(errs.first).to be_a(ArgumentError)
4951
Thread.report_on_exception = true
5052
end

0 commit comments

Comments
 (0)