Skip to content

Commit d764864

Browse files
authored
Merge pull request #5289 from getsentry/feat/queue-instrumentation-kafka-console-sample
feat(samples): [Queue Instrumentation 15] Add opt-in Kafka console e2e coverage
2 parents 592a210 + 2844be7 commit d764864

10 files changed

Lines changed: 339 additions & 14 deletions

File tree

sentry-kafka/api/sentry-kafka.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ public final class io/sentry/kafka/BuildConfig {
55

66
public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor {
77
public static final field TRACE_ORIGIN Ljava/lang/String;
8+
public fun <init> ()V
89
public fun <init> (Lio/sentry/IScopes;)V
910
public fun close ()V
1011
public fun configure (Ljava/util/Map;)V
@@ -15,6 +16,7 @@ public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/k
1516
public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
1617
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
1718
public static final field TRACE_ORIGIN Ljava/lang/String;
19+
public fun <init> ()V
1820
public fun <init> (Lio/sentry/IScopes;)V
1921
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
2022
public fun close ()V

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.sentry.BaggageHeader;
44
import io.sentry.IScopes;
55
import io.sentry.ITransaction;
6+
import io.sentry.ScopesAdapter;
67
import io.sentry.SentryTraceHeader;
78
import io.sentry.SpanDataConvention;
89
import io.sentry.SpanStatus;
@@ -29,6 +30,10 @@ public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInter
2930

3031
private final @NotNull IScopes scopes;
3132

33+
public SentryKafkaConsumerInterceptor() {
34+
this(ScopesAdapter.getInstance());
35+
}
36+
3237
public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
3338
this.scopes = scopes;
3439
}

sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.sentry.DateUtils;
55
import io.sentry.IScopes;
66
import io.sentry.ISpan;
7+
import io.sentry.ScopesAdapter;
78
import io.sentry.SentryTraceHeader;
89
import io.sentry.SpanDataConvention;
910
import io.sentry.SpanOptions;
@@ -28,6 +29,10 @@ public final class SentryKafkaProducerInterceptor<K, V> implements ProducerInter
2829
private final @NotNull IScopes scopes;
2930
private final @NotNull String traceOrigin;
3031

32+
public SentryKafkaProducerInterceptor() {
33+
this(ScopesAdapter.getInstance(), TRACE_ORIGIN);
34+
}
35+
3136
public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) {
3237
this(scopes, TRACE_ORIGIN);
3338
}

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package io.sentry.kafka
22

33
import io.sentry.IScopes
44
import io.sentry.ITransaction
5+
import io.sentry.Sentry
56
import io.sentry.SentryOptions
67
import io.sentry.TransactionContext
78
import io.sentry.TransactionOptions
9+
import io.sentry.test.initForTest
10+
import kotlin.test.AfterTest
11+
import kotlin.test.BeforeTest
812
import kotlin.test.Test
913
import kotlin.test.assertSame
1014
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -19,6 +23,20 @@ import org.mockito.kotlin.whenever
1923

2024
class SentryKafkaConsumerInterceptorTest {
2125

26+
@BeforeTest
27+
fun setup() {
28+
initForTest {
29+
it.dsn = "https://key@sentry.io/proj"
30+
it.isEnableQueueTracing = true
31+
it.tracesSampleRate = 1.0
32+
}
33+
}
34+
35+
@AfterTest
36+
fun teardown() {
37+
Sentry.close()
38+
}
39+
2240
@Test
2341
fun `does nothing when queue tracing is disabled`() {
2442
val scopes = mock<IScopes>()
@@ -64,6 +82,16 @@ class SentryKafkaConsumerInterceptorTest {
6482
interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1)))
6583
}
6684

85+
@Test
86+
fun `no-arg constructor uses current scopes`() {
87+
val interceptor = SentryKafkaConsumerInterceptor<String, String>()
88+
val records = singleRecordBatch()
89+
90+
val result = interceptor.onConsume(records)
91+
92+
assertSame(records, result)
93+
}
94+
6795
private fun singleRecordBatch(): ConsumerRecords<String, String> {
6896
val partition = TopicPartition("my-topic", 0)
6997
val record = ConsumerRecord("my-topic", 0, 0L, "key", "value")

sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.sentry.kafka
22

33
import io.sentry.IScopes
4+
import io.sentry.ISentryLifecycleToken
45
import io.sentry.Sentry
56
import io.sentry.SentryOptions
67
import io.sentry.SentryTraceHeader
@@ -26,7 +27,11 @@ class SentryKafkaProducerInterceptorTest {
2627

2728
@BeforeTest
2829
fun setup() {
29-
initForTest { it.dsn = "https://key@sentry.io/proj" }
30+
initForTest {
31+
it.dsn = "https://key@sentry.io/proj"
32+
it.isEnableQueueTracing = true
33+
it.tracesSampleRate = 1.0
34+
}
3035
scopes = mock()
3136
options =
3237
SentryOptions().apply {
@@ -95,4 +100,27 @@ class SentryKafkaProducerInterceptorTest {
95100

96101
assertSame(record, result)
97102
}
103+
104+
@Test
105+
fun `no-arg constructor uses current scopes`() {
106+
val transaction = Sentry.startTransaction("tx", "op")
107+
val record = ProducerRecord("my-topic", "key", "value")
108+
109+
try {
110+
val token: ISentryLifecycleToken = transaction.makeCurrent()
111+
try {
112+
val interceptor = SentryKafkaProducerInterceptor<String, String>()
113+
interceptor.onSend(record)
114+
} finally {
115+
token.close()
116+
}
117+
} finally {
118+
transaction.finish()
119+
}
120+
121+
assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER))
122+
assertNotNull(
123+
record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER)
124+
)
125+
}
98126
}

sentry-samples/sentry-samples-console/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ dependencies {
3636
implementation(projects.sentry)
3737
implementation(projects.sentryAsyncProfiler)
3838
implementation(projects.sentryJcache)
39+
implementation(projects.sentryKafka)
3940
implementation(libs.jcache)
4041
implementation(libs.caffeine.jcache)
42+
implementation(libs.kafka.clients)
4143

4244
testImplementation(kotlin(Config.kotlinStdLib))
4345
testImplementation(projects.sentry)

sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.sentry.jcache.SentryJCacheWrapper;
66
import io.sentry.protocol.Message;
77
import io.sentry.protocol.User;
8+
import io.sentry.samples.console.kafka.KafkaShowcase;
89
import java.util.Collections;
910
import javax.cache.Cache;
1011
import javax.cache.CacheManager;
@@ -16,6 +17,10 @@ public class Main {
1617
private static long numberOfDiscardedSpansDueToOverflow = 0;
1718

1819
public static void main(String[] args) throws InterruptedException {
20+
final String kafkaBootstrapServers = System.getenv("SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS");
21+
final boolean kafkaEnabled =
22+
kafkaBootstrapServers != null && !kafkaBootstrapServers.trim().isEmpty();
23+
1924
Sentry.init(
2025
options -> {
2126
// NOTE: Replace the test DSN below with YOUR OWN DSN to see the events from this app in
@@ -95,6 +100,7 @@ public static void main(String[] args) throws InterruptedException {
95100

96101
// Enable cache tracing to create spans for cache operations
97102
options.setEnableCacheTracing(true);
103+
options.setEnableQueueTracing(kafkaEnabled);
98104

99105
// Determine traces sample rate based on the sampling context
100106
// options.setTracesSampler(
@@ -178,6 +184,13 @@ public static void main(String[] args) throws InterruptedException {
178184
// cache.remove, and cache.flush spans as children of the active transaction.
179185
demonstrateCacheTracing();
180186

187+
// Kafka queue tracing with kafka-clients interceptors.
188+
//
189+
// Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
190+
if (kafkaEnabled) {
191+
KafkaShowcase.runKafkaWithSentryInterceptors(kafkaBootstrapServers);
192+
}
193+
181194
// Performance feature
182195
//
183196
// Transactions collect execution time of the piece of code that's executed between the start
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package io.sentry.samples.console.kafka;
2+
3+
import io.sentry.ISentryLifecycleToken;
4+
import io.sentry.ITransaction;
5+
import io.sentry.Sentry;
6+
import io.sentry.kafka.SentryKafkaConsumerInterceptor;
7+
import io.sentry.kafka.SentryKafkaProducerInterceptor;
8+
import java.time.Duration;
9+
import java.util.Collections;
10+
import java.util.Properties;
11+
import java.util.UUID;
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
import org.apache.kafka.clients.consumer.ConsumerConfig;
15+
import org.apache.kafka.clients.consumer.ConsumerRecords;
16+
import org.apache.kafka.clients.consumer.KafkaConsumer;
17+
import org.apache.kafka.clients.producer.KafkaProducer;
18+
import org.apache.kafka.clients.producer.ProducerConfig;
19+
import org.apache.kafka.clients.producer.ProducerRecord;
20+
import org.apache.kafka.common.serialization.StringDeserializer;
21+
import org.apache.kafka.common.serialization.StringSerializer;
22+
23+
public final class KafkaShowcase {
24+
25+
public static final String TOPIC = "sentry-topic-console-sample";
26+
27+
private KafkaShowcase() {}
28+
29+
public static void runKafkaWithSentryInterceptors(final String bootstrapServers) {
30+
final CountDownLatch consumedLatch = new CountDownLatch(1);
31+
final Thread consumerThread =
32+
startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch);
33+
final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers);
34+
35+
final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo");
36+
try (ISentryLifecycleToken ignored = transaction.makeCurrent()) {
37+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
38+
Thread.sleep(500);
39+
producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get();
40+
} catch (InterruptedException e) {
41+
Thread.currentThread().interrupt();
42+
} catch (Exception ignoredException) {
43+
// local broker may not be available when running the sample
44+
}
45+
46+
try {
47+
consumedLatch.await(5, TimeUnit.SECONDS);
48+
} catch (InterruptedException e) {
49+
Thread.currentThread().interrupt();
50+
}
51+
} finally {
52+
consumerThread.interrupt();
53+
try {
54+
consumerThread.join(1000);
55+
} catch (InterruptedException e) {
56+
Thread.currentThread().interrupt();
57+
}
58+
transaction.finish();
59+
}
60+
}
61+
62+
public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) {
63+
final Properties producerProperties = new Properties();
64+
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
65+
producerProperties.put(
66+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
67+
producerProperties.put(
68+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
69+
70+
// Required for Sentry queue tracing in kafka-clients producer setup.
71+
producerProperties.put(
72+
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName());
73+
74+
// Optional tuning for sample stability in CI/local runs.
75+
producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
76+
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
77+
producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000);
78+
79+
return producerProperties;
80+
}
81+
82+
public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) {
83+
final Properties consumerProperties = new Properties();
84+
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
85+
consumerProperties.put(
86+
ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID());
87+
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
88+
consumerProperties.put(
89+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
90+
consumerProperties.put(
91+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
92+
93+
// Required for Sentry queue tracing in kafka-clients consumer setup.
94+
consumerProperties.put(
95+
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName());
96+
97+
// Optional tuning for sample stability in CI/local runs.
98+
consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000);
99+
consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
100+
101+
return consumerProperties;
102+
}
103+
104+
private static Thread startConsumerWithSentryInterceptor(
105+
final String bootstrapServers, final CountDownLatch consumedLatch) {
106+
final Thread consumerThread =
107+
new Thread(
108+
() -> {
109+
final Properties consumerProperties =
110+
createConsumerPropertiesWithSentry(bootstrapServers);
111+
112+
try (KafkaConsumer<String, String> consumer =
113+
new KafkaConsumer<>(consumerProperties)) {
114+
consumer.subscribe(Collections.singletonList(TOPIC));
115+
116+
while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) {
117+
final ConsumerRecords<String, String> records =
118+
consumer.poll(Duration.ofMillis(500));
119+
if (!records.isEmpty()) {
120+
consumedLatch.countDown();
121+
break;
122+
}
123+
}
124+
} catch (Exception ignored) {
125+
// local broker may not be available when running the sample
126+
}
127+
},
128+
"sentry-kafka-sample-consumer");
129+
consumerThread.start();
130+
return consumerThread;
131+
}
132+
}

0 commit comments

Comments
 (0)