Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/redact-otel-query-params.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Redact sensitive query parameters from request telemetry and debug logs before export.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ defmodule Electric.Plug.ServeShapePlug do
@subset_keys ~w(where order_by limit offset params where_expr order_by_expr)

defp validate_request(%Conn{assigns: %{config: config, body_params: body_params}} = conn, _) do
Logger.debug("Query String: #{conn.query_string}")
Logger.debug("Query String: #{Electric.Plug.Utils.redact_query_string(conn.query_string)}")

query_params = Utils.extract_prefixed_keys_into_map(conn.query_params, "subset", "__")
merged_params = merge_body_params(query_params, body_params)
Expand Down
53 changes: 50 additions & 3 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ defmodule Electric.Plug.Utils do
path and query parameters.
"""

@redacted_query_value "[REDACTED]"
@sensitive_query_param_names MapSet.new(
~w(secret api_secret token access_token api_key password)
)

@doc """
Parse columns parameter from a string consisting of a comma separated list
of potentially quoted column names into a sorted list of strings.
Expand Down Expand Up @@ -79,18 +84,32 @@ defmodule Electric.Plug.Utils do

alias OpenTelemetry.SemConv, as: SC

@doc false
def redact_query_string(nil), do: nil
def redact_query_string(""), do: ""

def redact_query_string(query_string) when is_binary(query_string) do
query_string
|> String.split("&", trim: false)
|> Enum.map_join("&", &redact_query_segment/1)
end

def common_open_telemetry_attrs(%Plug.Conn{assigns: assigns} = conn) do
sanitized_query_string = redact_query_string(conn.query_string)

query_params_map =
if is_struct(conn.query_params, Plug.Conn.Unfetched) do
%{}
else
Map.new(conn.query_params, fn {k, v} -> {"http.query_param.#{k}", v} end)
Map.new(conn.query_params, fn {k, v} ->
{"http.query_param.#{k}", redact_query_param_value(k, v)}
end)
end

%{
"error.type" => assigns[:error_str],
"http.request_id" => assigns[:plug_request_id],
"http.query_string" => conn.query_string,
"http.query_string" => sanitized_query_string,
SC.ClientAttributes.client_address() => conn.remote_ip,
SC.ServerAttributes.server_address() => conn.host,
SC.ServerAttributes.server_port() => conn.port,
Expand All @@ -108,7 +127,7 @@ defmodule Electric.Plug.Utils do
host: conn.host,
port: conn.port,
path: conn.request_path,
query: conn.query_string
query: sanitized_query_string
}
|> to_string()
}
Expand All @@ -126,6 +145,34 @@ defmodule Electric.Plug.Utils do
end
end

defp redact_query_segment(segment) do
case String.split(segment, "=", parts: 2) do
[key, _value] ->
if sensitive_query_param?(decode_query_key(key)) do
key <> "=" <> @redacted_query_value
else
segment
end

[_key] ->
segment
end
end

defp redact_query_param_value(key, value) do
if sensitive_query_param?(key), do: @redacted_query_value, else: value
end

defp sensitive_query_param?(key) when is_binary(key) do
MapSet.member?(@sensitive_query_param_names, String.downcase(key))
end

defp decode_query_key(key) do
URI.decode_www_form(key)
rescue
ArgumentError -> key
end

defmodule CORSHeaderPlug do
@behaviour Plug
import Plug.Conn
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Electric.Plug.ServeShapePlugLoggingTest do
use ExUnit.Case, async: false

import ExUnit.CaptureLog

alias Electric.Plug.ServeShapePlug
alias Electric.Shapes.Api

import Support.ComponentSetup
import Support.TestUtils, only: [set_status_to_active: 1]

@inspector Support.StubInspector.new(
tables: ["users"],
columns: [
%{name: "id", type: "int8", pk_position: 0, type_id: {20, 1}},
%{name: "value", type: "text", pk_position: nil, type_id: {28, 1}}
]
)
@moduletag :tmp_dir

setup [:with_stack_id_from_test, :with_status_monitor]

setup ctx do
set_status_to_active(ctx)
:ok
end

def conn(_ctx, method, params, "?" <> _ = query_string) do
Plug.Test.conn(method, "/" <> query_string, params)
end

defp build_plug_opts(ctx) do
Api.plug_opts(stack_id: ctx.stack_id, inspector: @inspector, stack_ready_timeout: 100)
end

defp call_serve_shape_plug(conn, ctx) do
ServeShapePlug.call(conn, build_plug_opts(ctx))
end

test "redacts sensitive query params in debug logs", ctx do
previous_level = Logger.level()
on_exit(fn -> Logger.configure(level: previous_level) end)
Logger.configure(level: :debug)

log =
capture_log([level: :debug], fn ->
conn =
ctx
|> conn(
:get,
%{"table" => ".invalid_shape"},
"?offset=-1&secret=topsecret&api_secret=legacy&token=abc123&table=users"
)
|> call_serve_shape_plug(ctx)

assert conn.status == 400
end)

assert log =~ "Query String:"
assert log =~ "secret=[REDACTED]"
assert log =~ "api_secret=[REDACTED]"
assert log =~ "token=[REDACTED]"
refute log =~ "topsecret"
refute log =~ "legacy"
refute log =~ "abc123"
end
end
46 changes: 46 additions & 0 deletions packages/sync-service/test/electric/plug/utils_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,54 @@
defmodule Electric.Plug.UtilsTest do
alias Electric.Plug.Utils
alias OpenTelemetry.SemConv, as: SC

use ExUnit.Case, async: true
doctest Utils, import: true

describe "redact_query_string/1" do
test "redacts sensitive query params while preserving order and flags" do
assert Utils.redact_query_string(
"table=users&secret=hunter2&flag&Token=abc123&offset=-1&token"
) ==
"table=users&secret=[REDACTED]&flag&Token=[REDACTED]&offset=-1&token"
end

test "redacts empty sensitive values and encoded sensitive keys" do
assert Utils.redact_query_string(
"secret=&api_secret=legacy&access%5Ftoken=abc&api_key=123&password=swordfish"
) ==
"secret=[REDACTED]&api_secret=[REDACTED]&access%5Ftoken=[REDACTED]&api_key=[REDACTED]&password=[REDACTED]"
end
end

describe "common_open_telemetry_attrs/1" do
test "redacts query string, url.full and per-param attributes consistently" do
conn =
Plug.Test.conn(
"GET",
"/v1/shape?table=users&secret=hunter2&api_secret=legacy&Token=abc123&flag&offset=-1&api_key=123"
)
|> Plug.Conn.fetch_query_params()
|> Plug.Conn.assign(:plug_request_id, "req-123")

attrs = Utils.common_open_telemetry_attrs(conn)

assert attrs["http.query_string"] ==
"table=users&secret=[REDACTED]&api_secret=[REDACTED]&Token=[REDACTED]&flag&offset=-1&api_key=[REDACTED]"

assert attrs[SC.URLAttributes.url_full()] ==
"http://www.example.com/v1/shape?table=users&secret=[REDACTED]&api_secret=[REDACTED]&Token=[REDACTED]&flag&offset=-1&api_key=[REDACTED]"

assert attrs["http.query_param.secret"] == "[REDACTED]"
assert attrs["http.query_param.api_secret"] == "[REDACTED]"
assert attrs["http.query_param.Token"] == "[REDACTED]"
assert attrs["http.query_param.api_key"] == "[REDACTED]"
assert attrs["http.query_param.table"] == "users"
assert attrs["http.query_param.offset"] == "-1"
assert attrs["http.query_param.flag"] == ""
end
end

describe "get_next_interval_timestamp/2" do
test "returns expected interval" do
long_poll_timeout_ms = 20000
Expand Down
Loading