Skip to content
Draft
5 changes: 5 additions & 0 deletions .changeset/hibernate-then-suspend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': minor
---

Add hibernate-then-suspend behavior for Consumer processes. When suspend is enabled, consumers now hibernate first (triggering GC) before suspending. Adds `shape_suspend_after` config (default 60s) to control the delay between hibernation and suspension. Any activity cancels the pending suspend timer, restarting the cycle.
2 changes: 1 addition & 1 deletion integration-tests/tests/shape-suspension-resumption.lux
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Start Electric and wait for it to finish initialization.
# Set a very small hibernation timeout so consumers will shutdown quickly
[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"]
[invoke setup_electric_with_env "ELECTRIC_SHAPE_HIBERNATE_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_AFTER=200ms ELECTRIC_SHAPE_SUSPEND_CONSUMER=true"]
[shell electric]
[timeout 10]
??[debug] Replication client started streaming
Expand Down
4 changes: 4 additions & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ shape_hibernate_after =

shape_enable_suspend? = env!("ELECTRIC_SHAPE_SUSPEND_CONSUMER", :boolean, nil)

shape_suspend_after =
env!("ELECTRIC_SHAPE_SUSPEND_AFTER", &Electric.Config.parse_human_readable_time!/1, nil)

system_metrics_poll_interval =
env!(
"ELECTRIC_SYSTEM_METRICS_POLL_INTERVAL",
Expand Down Expand Up @@ -264,6 +267,7 @@ config :electric,
env!("ELECTRIC_SUBQUERY_BUFFER_MAX_TRANSACTIONS", :integer, nil),
shape_hibernate_after: shape_hibernate_after,
shape_enable_suspend?: shape_enable_suspend?,
shape_suspend_after: shape_suspend_after,
storage_dir: storage_dir,
storage: storage_spec,
cleanup_interval_ms:
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ defmodule Electric.Application do
cleanup_interval_ms: get_env(opts, :cleanup_interval_ms),
shape_hibernate_after: get_env(opts, :shape_hibernate_after),
shape_enable_suspend?: get_env(opts, :shape_enable_suspend?),
shape_suspend_after: get_env(opts, :shape_suspend_after),
conn_max_requests: get_env(opts, :conn_max_requests),
handler_fullsweep_after: get_env(opts, :handler_fullsweep_after),
process_spawn_opts: get_env(opts, :process_spawn_opts)
Expand Down
9 changes: 7 additions & 2 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ defmodule Electric.Config do
otel_sampling_ratio: 0.01,
metrics_sampling_ratio: 1,
## Memory
# After this duration of inactivity, consumer processes will hibernate
# to allow garbage collection
shape_hibernate_after: :timer.seconds(30),
# Should we terminate consumer processes after `shape_hibernate_after` ms
# or just hibernate them?
# If enabled, terminate (suspend) consumer processes after hibernating.
# This frees memory more aggressively than hibernation alone.
shape_enable_suspend?: false,
# After hibernating, wait this duration before suspending (terminating).
# Only applies when shape_enable_suspend? is true.
shape_suspend_after: :timer.minutes(10),
# Sets max_requests for Bandit handler processes:
# https://hexdocs.pm/bandit/Bandit.html#t:http_1_options/0
# "The maximum number of requests to serve in a single HTTP/{1,2}
Expand Down
60 changes: 37 additions & 23 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ defmodule Electric.Shapes.Consumer do

def handle_call(:await_snapshot_start, from, state) do
Logger.debug("Starting a wait on the snapshot #{state.shape_handle} for #{inspect(from)}}")

{:noreply, State.add_waiter(state, from), state.hibernate_after}
state = State.add_waiter(state, from)
{:noreply, state, state.hibernate_after}
end

def handle_call({:handle_event, event, trace_context}, _from, state) do
Expand All @@ -205,9 +205,8 @@ defmodule Electric.Shapes.Consumer do
def handle_call({:subscribe_materializer, pid}, _from, state) do
Logger.debug("Subscribing materializer for #{state.shape_handle}")
Process.monitor(pid, tag: :materializer_down)

{:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true},
state.hibernate_after}
state = %{state | materializer_subscribed?: true}
{:reply, {:ok, state.latest_offset}, state, state.hibernate_after}
end

def handle_call({:stop, reason}, _from, state) do
Expand Down Expand Up @@ -386,30 +385,37 @@ defmodule Electric.Shapes.Consumer do
{:stop, reason, state}
end

# Set a new value for hibernate after and set a timeout between
# hibernate_after and max_timeout in order to spread
# consumer suspend events.
def handle_info({:configure_suspend, hibernate_after, jitter_period}, state) do
{:noreply, %{state | hibernate_after: hibernate_after},
Enum.random(hibernate_after..jitter_period)}
# Set new values for hibernate_after and suspend_after, and set a jittered
# timeout between hibernate_after and jitter_period to spread hibernation
# events. Each consumer will hibernate at the jittered timeout, then schedule
# suspension for suspend_after ms later.
def handle_info({:configure_suspend, hibernate_after, suspend_after, jitter_period}, state) do
state = %{state | hibernate_after: hibernate_after, suspend_after: suspend_after}
{:noreply, state, Enum.random(hibernate_after..jitter_period)}
end

def handle_info(:timeout, state) do
# we can only suspend (terminate) the consumer process if
#
# 1. Consumer suspend has been enabled in the stack config
# 2. we're not waiting for snapshot information
# 3. we are not part of a subquery dependency tree, that is either
# a. we have no dependent shapes
# b. we don't have a materializer subscribed

if consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do
state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)}

state =
if consumer_suspend_enabled?(state) and consumer_can_suspend?(state),
do: schedule_suspend_timer(state),
else: state

{:noreply, state, :hibernate}
end

# Suspend timer uses a generation number to handle stale timers. If activity
# occurred after the timer was scheduled, a new timer with a higher generation
# will have been scheduled, making this one stale (generation mismatch).
def handle_info({:suspend_timeout, generation}, state) do
if generation == state.suspend_generation and
consumer_suspend_enabled?(state) and consumer_can_suspend?(state) do
Logger.debug(fn -> ["Suspending consumer ", to_string(state.shape_handle)] end)
{:stop, ShapeCleaner.consumer_suspend_reason(), state}
else
state = %{state | writer: ShapeCache.Storage.hibernate(state.writer)}

{:noreply, state, :hibernate}
# Stale timer or conditions changed - just restart the hibernate timeout
{:noreply, state, state.hibernate_after}
end
end

Expand All @@ -422,6 +428,14 @@ defmodule Electric.Shapes.Consumer do
not state.materializer_subscribed?
end

defp schedule_suspend_timer(%{suspend_after: nil} = state), do: state

defp schedule_suspend_timer(%{suspend_after: suspend_after, suspend_generation: gen} = state) do
next_gen = gen + 1
:erlang.send_after(suspend_after, self(), {:suspend_timeout, next_gen})
%{state | suspend_generation: next_gen}
end

@impl GenServer
def terminate(reason, state) do
Logger.debug(fn ->
Expand Down
14 changes: 13 additions & 1 deletion packages/sync-service/lib/electric/shapes/consumer/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ defmodule Electric.Shapes.Consumer.State do
# When a {Storage, :flushed, offset} message arrives during a pending
# transaction, we defer the notification and store the max flushed offset
# here. Multiple deferred notifications are collapsed into a single most recent offset.
pending_flush_offset: nil
pending_flush_offset: nil,
# Generation counter for suspend timers - incremented each time we schedule
# a new suspend timer. When a timer fires, it checks if its generation matches
# the current one; if not, activity occurred and the timer is stale (ignored).
suspend_generation: 0,
# How long after hibernation to suspend (in ms)
suspend_after: nil
]

@type pg_snapshot() :: SnapshotQuery.pg_snapshot()
Expand Down Expand Up @@ -95,6 +101,12 @@ defmodule Electric.Shapes.Consumer.State do
:shape_hibernate_after,
Electric.Config.default(:shape_hibernate_after)
),
suspend_after:
Electric.StackConfig.lookup(
stack_id,
:shape_suspend_after,
Electric.Config.default(:shape_suspend_after)
),
buffering?: true
}
end
Expand Down
24 changes: 13 additions & 11 deletions packages/sync-service/lib/electric/shapes/consumer_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -217,33 +217,35 @@ defmodule Electric.Shapes.ConsumerRegistry do
disabled, because the configuration message will have the side-effect of
waking all consumers from hibernation.

The `jitter_period` value allows for spreading the suspension of existing
consumers over a large time period to avoid a sudden rush of consumer
shutdowns after `hibernate_after` ms.
The `jitter_period` value allows for spreading the hibernation of existing
consumers over a time period to avoid a sudden rush of hibernation events.
Each consumer picks a random timeout between `hibernate_after` and `jitter_period`,
then hibernates and schedules suspension for `suspend_after` ms later.

To re-enable consumer suspend:

# set the hibernation timeout to 1 minute but phase the suspension of
# existing consumers over a 20 minute period
Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20)
# hibernation timeout: 1 min, suspend timeout: 4 min, jitter window: 20 min
# Consumers will hibernate between 1-20 min, then suspend 4 min after hibernating
Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 4 * 60_000, 60_000 * 20)

Disabling suspension is as easy as:

Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false)

"""
@spec enable_suspend(stack_id(), pos_integer(), pos_integer()) ::
@spec enable_suspend(stack_id(), pos_integer(), pos_integer(), pos_integer()) ::
consumer_count :: non_neg_integer()
def enable_suspend(stack_id, hibernate_after, jitter_period)
when is_integer(hibernate_after) and is_integer(jitter_period) and
jitter_period > hibernate_after do
def enable_suspend(stack_id, hibernate_after, suspend_after, jitter_period)
when is_integer(hibernate_after) and is_integer(suspend_after) and
is_integer(jitter_period) and jitter_period > hibernate_after do
Electric.StackConfig.put(stack_id, :shape_hibernate_after, hibernate_after)
Electric.StackConfig.put(stack_id, :shape_suspend_after, suspend_after)
Electric.StackConfig.put(stack_id, :shape_enable_suspend?, true)

:ets.foldl(
fn {_shape_handle, pid}, n ->
if Process.alive?(pid),
do: send(pid, {:configure_suspend, hibernate_after, jitter_period})
do: send(pid, {:configure_suspend, hibernate_after, suspend_after, jitter_period})

n + 1
end,
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/stack_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ defmodule Electric.StackConfig do
snapshot_timeout_to_first_data: :timer.seconds(30),
shape_hibernate_after: Electric.Config.default(:shape_hibernate_after),
shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?),
shape_suspend_after: Electric.Config.default(:shape_suspend_after),
chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(),
feature_flags: [],
process_spawn_opts: %{}
Expand Down
6 changes: 6 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ defmodule Electric.StackSupervisor do
type: :boolean,
default: Electric.Config.default(:shape_enable_suspend?)
],
shape_suspend_after: [
type: :non_neg_integer,
default: Electric.Config.default(:shape_suspend_after)
],
snapshot_timeout_to_first_data: [
type: :pos_integer,
default: Electric.Config.default(:snapshot_timeout_to_first_data)
Expand Down Expand Up @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do

shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after)
shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?)
shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after)
process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts)

shape_cache_opts = [
Expand Down Expand Up @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do
inspector: inspector,
shape_hibernate_after: shape_hibernate_after,
shape_enable_suspend?: shape_enable_suspend?,
shape_suspend_after: shape_suspend_after,
process_spawn_opts: process_spawn_opts,
feature_flags: Map.get(config, :feature_flags, [])
]},
Expand Down
Loading
Loading