diff --git a/.changeset/hibernate-then-suspend.md b/.changeset/hibernate-then-suspend.md new file mode 100644 index 0000000000..6b18ca0957 --- /dev/null +++ b/.changeset/hibernate-then-suspend.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': minor +--- + +Add hibernate-then-suspend behavior for Consumer processes. When suspend is enabled, consumers now hibernate first (triggering GC) before suspending. Adds `shape_suspend_after` config (default 60s) to control the delay between hibernation and suspension. Any activity cancels the pending suspend timer, restarting the cycle. diff --git a/integration-tests/tests/shape-suspension-resumption.lux b/integration-tests/tests/shape-suspension-resumption.lux index 198917c27c..e3035151ba 100644 --- a/integration-tests/tests/shape-suspension-resumption.lux +++ b/integration-tests/tests/shape-suspension-resumption.lux @@ -20,7 +20,7 @@ # Start Electric and wait for it to finish initialization. # Set a very small hibernation timeout so consumers will shutdown quickly -[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"] +[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"] [shell electric] [timeout 10] ??[debug] Replication client started streaming diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 955218d8e0..89be072e01 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -164,6 +164,9 @@ shape_hibernate_after = shape_enable_suspend? = env!("ELECTRIC_SHAPE_SUSPEND_CONSUMER", :boolean, nil) +shape_suspend_after = + env!("ELECTRIC_SHAPE_SUSPEND_AFTER", &Electric.Config.parse_human_readable_time!/1, nil) + system_metrics_poll_interval = env!( "ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL", @@ -264,6 +267,7 @@ config :electric, env!("ELECTRIC_SUBQUERY_BUFFER_MAX_TRANSACTIONS", :integer, nil), shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, storage_dir: storage_dir, storage: storage_spec, cleanup_interval_ms: diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 2221552035..086dd0d85f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -145,6 +145,7 @@ defmodule Electric.Application do cleanup_interval_ms: get_env(opts, :cleanup_interval_ms), shape_hibernate_after: get_env(opts, :shape_hibernate_after), shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), + shape_suspend_after: get_env(opts, :shape_suspend_after), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), process_spawn_opts: get_env(opts, :process_spawn_opts) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index c33ac3c271..290fbfa4f3 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -85,10 +85,15 @@ defmodule Electric.Config do otel_sampling_ratio: 0.01, metrics_sampling_ratio: 1, ## Memory + # After this duration of inactivity, consumer processes will hibernate + # to allow garbage collection shape_hibernate_after: :timer.seconds(30), - # Should we terminate consumer processes after `shape_hibernate_after` ms - # or just hibernate them? + # If enabled, terminate (suspend) consumer processes after hibernating. + # This frees memory more aggressively than hibernation alone. shape_enable_suspend?: false, + # After hibernating, wait this duration before suspending (terminating). + # Only applies when shape_enable_suspend? is true. + shape_suspend_after: :timer.minutes(10), # Sets max_requests for Bandit handler processes: # https://hexdocs.pm/bandit/Bandit.html#t:http_1_options/0 # "The maximum number of requests to serve in a single HTTP/{1,2} diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index c00721e323..61ea5b0ea8 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -186,8 +186,8 @@ defmodule Electric.Shapes.Consumer do def handle_call(:await_snapshot_start, from, state) do Logger.debug("Starting a wait on the snapshot #{state.shape_handle} for #{inspect(from)}}") - - {:noreply, State.add_waiter(state, from), state.hibernate_after} + state = State.add_waiter(state, from) + {:noreply, state, state.hibernate_after} end def handle_call({:handle_event, event, trace_context}, _from, state) do @@ -205,9 +205,8 @@ defmodule Electric.Shapes.Consumer do def handle_call({:subscribe_materializer, pid}, _from, state) do Logger.debug("Subscribing materializer for #{state.shape_handle}") Process.monitor(pid, tag: :materializer_down) - - {:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true}, - state.hibernate_after} + state = %{state | materializer_subscribed?: true} + {:reply, {:ok, state.latest_offset}, state, state.hibernate_after} end def handle_call({:stop, reason}, _from, state) do @@ -386,30 +385,37 @@ defmodule Electric.Shapes.Consumer do {:stop, reason, state} end - # Set a new value for hibernate after and set a timeout between - # hibernate_after and max_timeout in order to spread - # consumer suspend events. - def handle_info({:configure_suspend, hibernate_after, jitter_period}, state) do - {:noreply, %{state | hibernate_after: hibernate_after}, - Enum.random(hibernate_after..jitter_period)} + # Set new values for hibernate_after and suspend_after, and set a jittered + # timeout between hibernate_after and jitter_period to spread hibernation + # events. Each consumer will hibernate at the jittered timeout, then schedule + # suspension for suspend_after ms later. + def handle_info({:configure_suspend, hibernate_after, suspend_after, jitter_period}, state) do + state = %{state | hibernate_after: hibernate_after, suspend_after: suspend_after} + {:noreply, state, Enum.random(hibernate_after..jitter_period)} end def handle_info(:timeout, state) do - # we can only suspend (terminate) the consumer process if - # - # 1. Consumer suspend has been enabled in the stack config - # 2. we're not waiting for snapshot information - # 3. we are not part of a subquery dependency tree, that is either - # a. we have no dependent shapes - # b. we don't have a materializer subscribed - - if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do + state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} + + state = + if consumer_suspend_enabled?(state) and consumer_can_suspend?(state), + do: schedule_suspend_timer(state), + else: state + + {:noreply, state, :hibernate} + end + + # Suspend timer uses a generation number to handle stale timers. If activity + # occurred after the timer was scheduled, a new timer with a higher generation + # will have been scheduled, making this one stale (generation mismatch). + def handle_info({:suspend_timeout, generation}, state) do + if generation == state.suspend_generation and + consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end) {:stop, ShapeCleaner.consumer_suspend_reason(), state} else - state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)} - - {:noreply, state, :hibernate} + # Stale timer or conditions changed - just restart the hibernate timeout + {:noreply, state, state.hibernate_after} end end @@ -422,6 +428,14 @@ defmodule Electric.Shapes.Consumer do not state.materializer_subscribed? end + defp schedule_suspend_timer(%{suspend_after: nil} = state), do: state + + defp schedule_suspend_timer(%{suspend_after: suspend_after, suspend_generation: gen} = state) do + next_gen = gen + 1 + :erlang.send_after(suspend_after, self(), {:suspend_timeout, next_gen}) + %{state | suspend_generation: next_gen} + end + @impl GenServer def terminate(reason, state) do Logger.debug(fn -> diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 9414f07571..2b366021b2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -41,7 +41,13 @@ defmodule Electric.Shapes.Consumer.State do # When a {Storage, :flushed, offset} message arrives during a pending # transaction, we defer the notification and store the max flushed offset # here. Multiple deferred notifications are collapsed into a single most recent offset. - pending_flush_offset: nil + pending_flush_offset: nil, + # Generation counter for suspend timers - incremented each time we schedule + # a new suspend timer. When a timer fires, it checks if its generation matches + # the current one; if not, activity occurred and the timer is stale (ignored). + suspend_generation: 0, + # How long after hibernation to suspend (in ms) + suspend_after: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -95,6 +101,12 @@ defmodule Electric.Shapes.Consumer.State do :shape_hibernate_after, Electric.Config.default(:shape_hibernate_after) ), + suspend_after: + Electric.StackConfig.lookup( + stack_id, + :shape_suspend_after, + Electric.Config.default(:shape_suspend_after) + ), buffering?: true } end diff --git a/packages/sync-service/lib/electric/shapes/consumer_registry.ex b/packages/sync-service/lib/electric/shapes/consumer_registry.ex index 935420b2d0..b98ba330ae 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_registry.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_registry.ex @@ -217,33 +217,35 @@ defmodule Electric.Shapes.ConsumerRegistry do disabled, because the configuration message will have the side-effect of waking all consumers from hibernation. - The `jitter_period` value allows for spreading the suspension of existing - consumers over a large time period to avoid a sudden rush of consumer - shutdowns after `hibernate_after` ms. + The `jitter_period` value allows for spreading the hibernation of existing + consumers over a time period to avoid a sudden rush of hibernation events. + Each consumer picks a random timeout between `hibernate_after` and `jitter_period`, + then hibernates and schedules suspension for `suspend_after` ms later. To re-enable consumer suspend: - # set the hibernation timeout to 1 minute but phase the suspension of - # existing consumers over a 20 minute period - Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20) + # hibernation timeout: 1 min, suspend timeout: 4 min, jitter window: 20 min + # Consumers will hibernate between 1-20 min, then suspend 4 min after hibernating + Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 4 * 60_000, 60_000 * 20) Disabling suspension is as easy as: Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false) """ - @spec enable_suspend(stack_id(), pos_integer(), pos_integer()) :: + @spec enable_suspend(stack_id(), pos_integer(), pos_integer(), pos_integer()) :: consumer_count :: non_neg_integer() - def enable_suspend(stack_id, hibernate_after, jitter_period) - when is_integer(hibernate_after) and is_integer(jitter_period) and - jitter_period > hibernate_after do + def enable_suspend(stack_id, hibernate_after, suspend_after, jitter_period) + when is_integer(hibernate_after) and is_integer(suspend_after) and + is_integer(jitter_period) and jitter_period > hibernate_after do Electric.StackConfig.put(stack_id, :shape_hibernate_after, hibernate_after) + Electric.StackConfig.put(stack_id, :shape_suspend_after, suspend_after) Electric.StackConfig.put(stack_id, :shape_enable_suspend?, true) :ets.foldl( fn {_shape_handle, pid}, n -> if Process.alive?(pid), - do: send(pid, {:configure_suspend, hibernate_after, jitter_period}) + do: send(pid, {:configure_suspend, hibernate_after, suspend_after, jitter_period}) n + 1 end, diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..462d474953 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -30,6 +30,7 @@ defmodule Electric.StackConfig do snapshot_timeout_to_first_data: :timer.seconds(30), shape_hibernate_after: Electric.Config.default(:shape_hibernate_after), shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), + shape_suspend_after: Electric.Config.default(:shape_suspend_after), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], process_spawn_opts: %{} diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index af22b52e35..98d3ba27e1 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -136,6 +136,10 @@ defmodule Electric.StackSupervisor do type: :boolean, default: Electric.Config.default(:shape_enable_suspend?) ], + shape_suspend_after: [ + type: :non_neg_integer, + default: Electric.Config.default(:shape_suspend_after) + ], snapshot_timeout_to_first_data: [ type: :pos_integer, default: Electric.Config.default(:snapshot_timeout_to_first_data) @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) + shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) shape_cache_opts = [ @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do inspector: inspector, shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, + shape_suspend_after: shape_suspend_after, process_spawn_opts: process_spawn_opts, feature_flags: Map.get(config, :feature_flags, []) ]}, diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 40b27576c5..1f68110b5d 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -598,6 +598,12 @@ defmodule Electric.Shapes.ConsumerTest do Map.get(ctx, :hibernate_after, 10_000) ) + Electric.StackConfig.put( + ctx.stack_id, + :shape_suspend_after, + Map.get(ctx, :shape_suspend_after, 60_000) + ) + if not Map.get(ctx, :allow_subqueries, true) do Electric.StackConfig.put(ctx.stack_id, :feature_flags, []) end @@ -1375,9 +1381,10 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {:flush_boundary_updated, 301}, 1_000 end - @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag hibernate_after: 10, shape_suspend_after: 20 + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true - test "should terminate after :hibernate_after ms", ctx do + test "should suspend after hibernate_after + shape_suspend_after ms", ctx do register_as_replication_client(ctx.stack_id) {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) @@ -1409,7 +1416,8 @@ defmodule Electric.Shapes.ConsumerTest do refute Consumer.whereis(ctx.stack_id, shape_handle) end - @tag hibernate_after: 10, with_pure_file_storage_opts: [flush_period: 1] + @tag hibernate_after: 10, shape_suspend_after: 10 + @tag with_pure_file_storage_opts: [flush_period: 1] @tag suspend: true test "should hibernate not suspend if has dependencies", ctx do register_as_replication_client(ctx.stack_id) @@ -1494,7 +1502,8 @@ defmodule Electric.Shapes.ConsumerTest do assert Consumer.whereis(ctx.stack_id, shape_handle) - Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 10) + # hibernate_after=5, shape_suspend_after=5, jitter_period=10 + Shapes.ConsumerRegistry.enable_suspend(ctx.stack_id, 5, 5, 10) Process.sleep(60) @@ -1503,6 +1512,118 @@ defmodule Electric.Shapes.ConsumerTest do refute Consumer.whereis(ctx.stack_id, shape_handle) end + @tag hibernate_after: 10, shape_suspend_after: 150, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "should hibernate first then suspend after shape_suspend_after ms", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate_after (10ms) + small buffer + # Suspend won't happen until hibernate_after + shape_suspend_after = 10 + 150 = 160ms + Process.sleep(50) + + # Should be hibernated, not suspended yet (we're at ~50ms, suspend at ~160ms) + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Wait for shape_suspend_after (150ms from hibernate) to complete + # We're at ~50ms, need to wait another ~150ms to be past 160ms + Process.sleep(180) + + # Now should be suspended + assert_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}} + + refute Consumer.whereis(ctx.stack_id, shape_handle) + end + + @tag hibernate_after: 10, shape_suspend_after: 200, with_pure_file_storage_opts: [flush_period: 1] + @tag suspend: true + test "activity during hibernation cancels pending suspend", ctx do + register_as_replication_client(ctx.stack_id) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + lsn1 = Lsn.from_integer(300) + lsn2 = Lsn.from_integer(301) + + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + assert is_pid(consumer_pid) + ref = Process.monitor(consumer_pid) + + txn1 = + complete_txn_fragment(2, lsn1, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "21"}, + log_offset: LogOffset.new(lsn1, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn1, ctx.stack_id) + assert_receive {:flush_boundary_updated, 300}, 1_000 + + # Wait for hibernate (hibernate_after=10ms + buffer) + Process.sleep(30) + + # Should be hibernated + assert {:current_function, {:gen_server, :loop_hibernate, 4}} = + Process.info(consumer_pid, :current_function) + + # Wait ~50ms so suspend timer has been running but not fired yet + # (shape_suspend_after=200ms, so we're at ~80ms total, well before 200ms) + Process.sleep(50) + + # Send another transaction - this should cancel the suspend timer + txn2 = + complete_txn_fragment(3, lsn2, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "22"}, + log_offset: LogOffset.new(lsn2, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn2, ctx.stack_id) + assert_receive {:flush_boundary_updated, 301}, 1_000 + + # Wait past what would have been the original shape_suspend_after window + # Original timer started at ~30ms, would fire at ~230ms + # We're now at ~80ms, wait 160ms to reach ~240ms + # But new timer started at ~80ms, would fire at ~280ms + # So at ~240ms the process should still be alive + Process.sleep(160) + + # Should NOT have suspended because activity reset the timer + refute_receive {:DOWN, ^ref, :process, ^consumer_pid, {:shutdown, :suspend}}, 0 + + # Process should still be alive (hibernated again) + assert Process.alive?(consumer_pid) + end + @tag with_pure_file_storage_opts: [compaction_period: 5, keep_complete_chunks: 133] test "compaction is scheduled and invoked for a shape that has compaction enabled", ctx do parent = self() diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index bcb4ee0949..c4d27dc09d 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -110,6 +110,7 @@ defmodule Support.ComponentSetup do shape_changes_registry: Map.get(ctx, :registry, Electric.StackSupervisor.registry_name(stack_id)), shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000), + shape_suspend_after: Map.get(ctx, :shape_suspend_after, 1_000), shape_enable_suspend?: Map.get(ctx, :suspend, false), feature_flags: Electric.Config.get_env(:feature_flags), process_spawn_opts: Map.get(ctx, :process_spawn_opts, %{})