Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions examples/oauth_oidc_ccloud_aws_iam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""End-to-end example for AWS IAM OAUTHBEARER authentication.

Activation is config-only: setting
``sasl.oauthbearer.metadata.authentication.type=aws_iam`` is enough.

Install:
pip install 'confluent-kafka[oauthbearer-aws]'

Runs on AWS compute (EC2 / EKS / ECS / Fargate / Lambda) with an IAM role
attached — boto3's default credential chain resolves it, no static keys.

To run:
python oauth_oidc_ccloud_aws_iam.py \\
-b pkc-xxxx.aws.confluent.cloud:9092 \\
--region us-east-1 \\
--audience https://confluent.cloud/oidc \\
--extensions logicalCluster=lkc-abc,identityPoolId=pool-xyz
"""

import argparse
import logging
import time
import uuid

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.serialization import StringSerializer


def common_config(args):
"""SASL config shared by Producer, Consumer, and AdminClient."""
conf = {
'bootstrap.servers': args.bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.metadata.authentication.type': 'aws_iam',
'sasl.oauthbearer.config': f'region={args.region} '
f'audience={args.audience} '
f'duration_seconds={args.duration_seconds}',
'debug': 'security',
}

if args.extensions:
conf['sasl.oauthbearer.extensions'] = args.extensions

return conf


def consumer_config(args, group_id):
cfg = common_config(args)
cfg['group.id'] = group_id
cfg['auto.offset.reset'] = 'earliest'
cfg['enable.auto.offset.store'] = False # commit offsets manually
return cfg


def create_topic(admin_conf, topic_name, num_partitions=1, replication_factor=3):
admin = AdminClient(admin_conf)
futures = admin.create_topics(
[
NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor),
]
)
for topic, future in futures.items():
try:
future.result()
print(f"[admin] Topic '{topic}' created " f"({num_partitions} partition(s), RF={replication_factor})")
except Exception as exc:
print(f"[admin] Failed to create topic '{topic}': {exc}")
raise


def delivery_report(err, msg):
if err is not None:
print(f"[producer] Delivery failed: {err}")
return
print(
f"[producer] Produced to {msg.topic()} [{msg.partition()}] "
f"at offset {msg.offset()}: {msg.value().decode('utf-8')}"
)


def main(args):
# Unique topic + group per run so the example is self-contained.
topic_name = f"aws-iam-{uuid.uuid4()}"
group_id = f"aws-iam-consumer-{uuid.uuid4()}"

p_conf = common_config(args)
c_conf = consumer_config(args, group_id)
a_conf = common_config(args)

logging.basicConfig(level=logging.INFO)

print("\n=== AWS IAM OAUTHBEARER end-to-end example ===")
print(f"bootstrap.servers: {args.bootstrap_servers}")
print(f"region: {args.region}")
print(f"audience: {args.audience}")
print(f"duration_seconds: {args.duration_seconds} " f"(auto-refresh at ~{int(args.duration_seconds * 0.8)}s)")
print(f"run-for: {args.run_for}s")
print(f"topic (generated): {topic_name}")
print(f"group.id (generated): {group_id}\n")

create_topic(a_conf, topic_name)

producer = Producer(p_conf)
consumer = Consumer(c_conf)
consumer.subscribe([topic_name])
serializer = StringSerializer('utf_8')

start = time.time()
end_at = start + args.run_for
produced = 0
consumed = 0

print(
f"[loop] Producing/consuming for {args.run_for}s — "
f"watch the debug=security logs for token-refresh events.\n"
)

try:
while time.time() < end_at:
elapsed = time.time() - start
msg = f"hello-from-aws-iam T+{elapsed:.1f}s"

producer.produce(
topic_name,
value=serializer(msg),
on_delivery=delivery_report,
)
producer.poll(0)
produced += 1

received = consumer.poll(1.0)
if received is None:
pass # poll timeout, no message yet
elif received.error() is not None:
print(f"[consumer] error: {received.error()}")
else:
consumer.store_offsets(received)
consumed += 1
print(
f"[consumer] Received from "
f"{received.topic()} [{received.partition()}] "
f"at offset {received.offset()}: "
f"{received.value().decode('utf-8')}"
)

time.sleep(args.interval)
except KeyboardInterrupt:
print("\n[main] Interrupted — flushing.")
finally:
print(f"\n[summary] Produced {produced}, consumed {consumed} " f"in {time.time() - start:.1f}s. Flushing...")
producer.flush(timeout=10)
consumer.close()
print("[summary] Done.")


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='End-to-end OAUTHBEARER example via AWS IAM autowire ' '(produce + consume + admin).',
)
parser.add_argument('-b', dest='bootstrap_servers', required=True, help='Bootstrap broker(s) (host[:port])')
parser.add_argument('--region', required=True, help='AWS region (e.g. us-east-1)')
parser.add_argument(
'--audience',
required=True,
help='OIDC audience claim the broker expects ' '(e.g. https://confluent.cloud/oidc)',
)
parser.add_argument(
'--extensions',
default=None,
help='Optional sasl.oauthbearer.extensions value ' '(comma-separated key=value pairs)',
)
parser.add_argument(
'--duration-seconds',
dest='duration_seconds',
type=int,
default=60,
help='STS DurationSeconds (default 60 = AWS minimum); ' 'librdkafka auto-refreshes at ~80%% of it.',
)
parser.add_argument(
'--run-for', dest='run_for', type=int, default=120, help='Run duration in seconds (default 120).'
)
parser.add_argument('--interval', type=float, default=5.0, help='Seconds between produce calls (default 5).')

main(parser.parse_args())
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ optional-dependencies.rules = { file = ["requirements/requirements-rules.txt", "
optional-dependencies.avro = { file = ["requirements/requirements-avro.txt", "requirements/requirements-schemaregistry.txt"] }
optional-dependencies.json = { file = ["requirements/requirements-json.txt", "requirements/requirements-schemaregistry.txt"] }
optional-dependencies.protobuf = { file = ["requirements/requirements-protobuf.txt", "requirements/requirements-schemaregistry.txt"] }
optional-dependencies.oauthbearer-aws = { file = ["requirements/requirements-oauthbearer-aws.txt"] }
optional-dependencies.dev = { file = [
"requirements/requirements-docs.txt",
"requirements/requirements-examples.txt",
Expand All @@ -114,7 +115,8 @@ optional-dependencies.dev = { file = [
"requirements/requirements-rules.txt",
"requirements/requirements-avro.txt",
"requirements/requirements-json.txt",
"requirements/requirements-protobuf.txt"] }
"requirements/requirements-protobuf.txt",
"requirements/requirements-oauthbearer-aws.txt"] }
optional-dependencies.docs = { file = [
"requirements/requirements-docs.txt",
"requirements/requirements-schemaregistry.txt",
Expand All @@ -128,7 +130,8 @@ optional-dependencies.tests = { file = [
"requirements/requirements-rules.txt",
"requirements/requirements-avro.txt",
"requirements/requirements-json.txt",
"requirements/requirements-protobuf.txt"] }
"requirements/requirements-protobuf.txt",
"requirements/requirements-oauthbearer-aws.txt"] }
optional-dependencies.examples = { file = ["requirements/requirements-examples.txt"] }
optional-dependencies.soaktest = { file = ["requirements/requirements-soaktest.txt"] }
optional-dependencies.all = { file = [
Expand All @@ -140,7 +143,8 @@ optional-dependencies.all = { file = [
"requirements/requirements-rules.txt",
"requirements/requirements-avro.txt",
"requirements/requirements-json.txt",
"requirements/requirements-protobuf.txt"] }
"requirements/requirements-protobuf.txt",
"requirements/requirements-oauthbearer-aws.txt"] }

[tool.pytest.ini_options]
asyncio_mode = "auto"
Expand Down
3 changes: 2 additions & 1 deletion requirements/requirements-all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
-r requirements-examples.txt
-r requirements-tests.txt
-r requirements-docs.txt
-r requirements-soaktest.txt
-r requirements-soaktest.txt
-r requirements-oauthbearer-aws.txt
1 change: 1 addition & 0 deletions requirements/requirements-oauthbearer-aws.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3>=1.42.25
1 change: 1 addition & 0 deletions requirements/requirements-tests-install.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
-r requirements-avro.txt
-r requirements-protobuf.txt
-r requirements-json.txt
-r requirements-oauthbearer-aws.txt
tests/trivup/trivup-0.14.0.tar.gz
79 changes: 79 additions & 0 deletions src/confluent_kafka/_util/kv_string_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared utility for parsing ``key=value`` strings."""

import re
from typing import Iterable, Iterator, Optional, Tuple

__all__ = ["parse_kv"]


def parse_kv(
raw: str,
separators: Iterable[str],
context_label: Optional[str] = None,
trim_tokens: bool = True,
) -> Iterator[Tuple[str, str]]:
"""Tokenize ``raw`` and yield each non-empty token as a ``(key, value)`` pair.

Tokens are split on any character in ``separators`` (e.g. ``[',']`` for
comma-separated values or ``[' ', '\\t', '\\r', '\\n']`` for
whitespace-separated). Within each token the split is on the first ``=``
only — values may legitimately contain ``=`` (e.g. URL query strings).

Empty tokens (e.g. consecutive separators, or whitespace-only tokens when
``trim_tokens`` is true) are skipped. Tokens with no ``=`` or with ``=``
at position 0 (empty key) raise :class:`ValueError` with
``context_label`` woven into the message when supplied.

The default trimming behaviour mirrors librdkafka's
``rd_string_split`` (``rdstring.c``).

:param raw: Input string to tokenize.
:param separators: Iterable of single-character separators. Each element
may be a string of any length, but only its first character is used.
:param context_label: Optional label woven into error messages to
identify which config the malformed token came from. When ``None``,
error messages fall back to a generic ``"key=value"`` phrasing.
:param trim_tokens: When true (default), each token is stripped of
leading and trailing whitespace before being split on ``=``. Set to
false to preserve whitespace inside tokens.
:raises TypeError: ``raw`` or ``separators`` is ``None``.
:raises ValueError: A token is malformed (no ``=`` or empty key).
"""
if raw is None:
raise TypeError("raw must not be None")
if separators is None:
raise TypeError("separators must not be None")

# Build a single-character split pattern from the supplied separators.
chars = "".join(str(s)[0] for s in separators if s)
if not chars:
# Degenerate "no separators" → treat raw as a single token.
raw_tokens = [raw]
else:
raw_tokens = re.split("[" + re.escape(chars) + "]", raw)

for raw_token in raw_tokens:
token = raw_token.strip() if trim_tokens else raw_token
if len(token) == 0:
continue

idx = token.find("=")
if idx <= 0:
what = f"'{context_label}'" if context_label else "key=value"
raise ValueError(f"Malformed {what} entry '{token}' (expected key=value).")

yield token[:idx], token[idx + 1 :]
15 changes: 15 additions & 0 deletions src/confluent_kafka/oauthbearer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Namespace package for OAUTHBEARER provider integrations."""
26 changes: 26 additions & 0 deletions src/confluent_kafka/oauthbearer/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""AWS IAM OAUTHBEARER autowire subpackage.

The only publicly importable name in this subpackage is
:func:`confluent_kafka.oauthbearer.aws.aws_autowire.create_handler`, loaded by
core's C extension when the user sets
``sasl.oauthbearer.metadata.authentication.type=aws_iam``. All other modules
are private (underscore-prefixed) and not re-exported here.

Install with::

pip install 'confluent-kafka[oauthbearer-aws]'
"""
Loading