diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt index 558359c..1ef61bf 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt @@ -57,6 +57,9 @@ object Routes { object DeadLetter { const val LETTERS = "dlq-query-dead-letters" const val SEQUENCE_SIZE = "dlq-query-dead-letter-sequence-size" + // Paginated lookup of letters within a single sequence — added in AF5 framework-client + // 5.1.0 so the platform UI can browse very long sequences without loading them all. + const val SEQUENCE_LETTERS = "dlq-query-dead-letter-sequence-letters" const val DELETE_SEQUENCE = "dlq-command-delete-sequence" const val DELETE_ALL_SEQUENCES = "dlq-command-delete-all-sequences" diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt index 49f1bbd..b9f7786 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/deadLetterApi.kt @@ -71,3 +71,28 @@ data class DeadLetterProcessRequest( val processingGroup: String, val messageIdentifier: String ) + +/** + * Request paginated letters belonging to a single sequence inside the DLQ. Used by the platform UI + * detail modal to browse long sequences without loading them all up-front. + * + * @param processingGroup The processing group / DLQ identifier. + * @param sequenceIdentifier Synthetic sequence id as previously returned by [DeadLetter.sequenceIdentifier]. + * @param offset Zero-based offset into the sequence. + * @param size Number of letters to return (capped server-side). + */ +data class FetchSequenceLettersRequest( + val processingGroup: String, + val sequenceIdentifier: String, + val offset: Int, + val size: Int, +) + +/** + * Response payload for [FetchSequenceLettersRequest]. Carries the requested slice of letters along + * with the total number of letters in the sequence so the UI can render full pagination. + */ +data class SequenceLettersResponse( + val letters: List, + val totalCount: Long = letters.size.toLong(), +) diff --git a/framework-client/pom.xml b/framework-client/pom.xml index fe44c45..5288079 100644 --- a/framework-client/pom.xml +++ b/framework-client/pom.xml @@ -83,6 +83,12 @@ provided true + + io.axoniq.framework + axoniq-dead-letter + provided + true + tools.jackson.core diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancer.java new file mode 100644 index 0000000..de2c489 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancer.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor; + +import io.axoniq.platform.framework.AxoniqPlatformConfiguration; +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; +import org.axonframework.common.configuration.ComponentDefinition; +import org.axonframework.common.configuration.ComponentRegistry; +import org.axonframework.common.configuration.ConfigurationEnhancer; +import org.axonframework.common.lifecycle.Phase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER; + +/** + * Service-loaded enhancer that registers the dead-letter queue inspection components only when the + * {@code axoniq-dead-letter} module is present on the classpath. Kept free of direct references to + * {@link DeadLetterManager} or {@link RSocketDlqResponder} (which import optional types) so the class can be + * loaded even when the addon is absent. + */ +public class AxoniqPlatformDeadLetterConfigurerEnhancer implements ConfigurationEnhancer { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AxoniqPlatformDeadLetterConfigurerEnhancer.class); + private static final String DEAD_LETTER_PROBE_CLASS = + "io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue"; + + @Override + public void enhance(ComponentRegistry registry) { + if (!registry.hasComponent(AxoniqPlatformConfiguration.class)) { + return; + } + // Enhancers can be invoked more than once during context refresh — bail out if the DLQ + // components are already registered to avoid ComponentOverrideException. + if (registry.hasComponent(DeadLetterManager.class) + || registry.hasComponent(ProcessingGroupInfoSource.class)) { + return; + } + if (!isClasspathAvailable()) { + LOGGER.debug("axoniq-dead-letter not on classpath; skipping dead-letter queue inspection wiring."); + return; + } + register(registry); + } + + @Override + public int order() { + // Run after the main platform enhancer so the RSocketHandlerRegistrar component is already declared. + return PLATFORM_ENHANCER_ORDER + 1; + } + + private static void register(ComponentRegistry registry) { + registry.registerComponent(ComponentDefinition + .ofType(DeadLetterManager.class) + .withBuilder(DeadLetterManager::new) + // Discover DLQs after event processors have started, by which point the + // EventHandlingComponent decorator chain has materialised every DLQ. + .onStart(Phase.INSTRUCTION_COMPONENTS, DeadLetterManager::start)); + + // The Spring-backed ComponentRegistry exposes a registered component under all of its + // implemented interfaces automatically, so registering DeadLetterManager already makes + // ProcessingGroupInfoSource available. The plain AF5 ComponentRegistry is exact-typed + // though, so only register the seam there to keep ProcessorReportCreator's lookup + // (`getOptionalComponent(ProcessingGroupInfoSource.class)`) working in both worlds. + if (!registry.hasComponent(ProcessingGroupInfoSource.class)) { + registry.registerComponent(ComponentDefinition + .ofType(ProcessingGroupInfoSource.class) + .withBuilder(c -> c.getComponent(DeadLetterManager.class))); + } + + registry.registerComponent(ComponentDefinition + .ofType(RSocketDlqResponder.class) + .withBuilder(c -> new RSocketDlqResponder( + c.getComponent(DeadLetterManager.class), + c.getComponent(RSocketHandlerRegistrar.class))) + .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketDlqResponder::start)); + } + + private static boolean isClasspathAvailable() { + try { + Class.forName(DEAD_LETTER_PROBE_CLASS, false, + AxoniqPlatformDeadLetterConfigurerEnhancer.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt new file mode 100644 index 0000000..3b7b14c --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/DeadLetterManager.kt @@ -0,0 +1,358 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.framework.messaging.deadletter.DeadLetter +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterProcessor +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue +import io.axoniq.platform.framework.api.DeadLetterResponse +import io.axoniq.platform.framework.api.SequenceLettersResponse +import org.axonframework.common.configuration.Configuration +import org.axonframework.messaging.eventhandling.EventMessage +import org.slf4j.LoggerFactory +import java.util.concurrent.TimeUnit +import io.axoniq.platform.framework.api.DeadLetter as ApiDeadLetter + +private const val LETTER_PAYLOAD_SIZE_LIMIT = 1024 +private val logger = LoggerFactory.getLogger(DeadLetterManager::class.java) + +/** + * Inspects and operates on the dead-letter queues belonging to event handling components configured on this + * application. + * + * In AF5 each event handling component within a Pooled Streaming processor may have its own dead-letter queue. + * Queues are registered in the [Configuration] under names of the form + * `DeadLetterQueue[EventHandlingComponent[][]]`. + * + * To stay compatible with the platform's AF4-based DLQ API (which expects a single "processing group" identifier per + * DLQ) this manager exposes each DLQ under a synthesised identifier: + * - if a processor has a single DLQ the identifier equals the processor name (matches the issue requirement); + * - if a processor has multiple DLQs each is exposed as `::` so they remain + * addressable individually. + */ +class DeadLetterManager( + private val configuration: Configuration, +) : ProcessingGroupInfoSource { + + @Volatile + private var entries: List? = null + + /** + * Discovers the DLQs configured on this application by walking each event-processor module. + * Called once via the lifecycle; subsequent invocations refresh the cached view. + */ + fun start() { + entries = discoverEntries() + } + + override fun infoFor(processorName: String): List = + dlqInfoForProcessor(processorName).map { + ProcessingGroupInfoSource.ProcessingGroupInfo(it.processingGroup, it.sequenceCount) + } + + + /** + * Internal view of a discovered DLQ together with all metadata required to address it through the public API. + */ + private data class DlqEntry( + val processingGroup: String, + val processorName: String, + val componentName: String, + val dlq: SequencedDeadLetterQueue, + val processor: SequencedDeadLetterProcessor, + ) + + private val dlqNamePattern = + Regex("""^DeadLetterQueue\[EventHandlingComponent\[([^]]+)]\[(.+)]]$""") + + fun deadLetters( + processingGroup: String, + offset: Int = 0, + size: Int = 25, + // Per-sequence cap intentionally small. The list query is meant to give the platform UI + // a *page* of sequences with enough preview letters to seed the detail modal — not to + // ship every letter every refresh. Mitchell observed 7-second refresh cycles on a local + // DLQ when this defaulted to 1000 (page-size 25 sequences x up to 1000 letters each = + // ~25k letter records serialised on every poll). 10 matches the historical "10+" + // placeholder behaviour the platform UI already displays for capped sequences and keeps + // the per-letter payload off the hot path. The detail modal pulls full pages lazily + // through `sequenceLetters(...)` (FetchDeadLettersForSequence) so long sequences are + // still browsable end-to-end without inflating the list query. + maxSequenceLetters: Int = 10, + ): DeadLetterResponse { + val entry = dlqFor(processingGroup) + val sequences = entry.dlq.deadLetters(null).join() + val pageOfSequences = sequences + .drop(offset) + .take(size) + .map { sequence -> + val letters = sequence.toList() + // The AF5 SequencedDeadLetterQueue does not expose the underlying sequence + // identifier (the Object passed to enqueue()) on a DeadLetter, so we synthesise + // a stable identifier from the first letter's message id and apply it to every + // letter in the sequence. Operations look up sequences by walking deadLetters() + // and matching this synthetic id — see findSequence(...). + val syntheticSequenceId = letters.firstOrNull()?.message()?.identifier() ?: "" + letters + .take(maxSequenceLetters) + .map { it.toApiLetter(syntheticSequenceId) } + } + val total = entry.dlq.amountOfSequences(null).join() + return DeadLetterResponse(pageOfSequences, total) + } + + fun sequenceSize(processingGroup: String, sequenceIdentifier: String): Long { + val dlq = dlqFor(processingGroup).dlq + return findSequence(dlq, sequenceIdentifier)?.count()?.toLong() ?: 0L + } + + /** + * Returns a paginated slice of letters belonging to the sequence identified by [sequenceIdentifier]. + * Used by the platform UI's detail modal so very long sequences can be browsed without loading + * them all up-front through the [deadLetters] batch query. + */ + fun lettersForSequence( + processingGroup: String, + sequenceIdentifier: String, + offset: Int, + size: Int, + ): SequenceLettersResponse { + val sequence = findSequence(dlqFor(processingGroup).dlq, sequenceIdentifier) + ?: return SequenceLettersResponse(emptyList(), 0) + val total = sequence.size.toLong() + val safeOffset = offset.coerceAtLeast(0) + val safeSize = size.coerceAtLeast(1) + val slice = sequence + .drop(safeOffset) + .take(safeSize) + .map { it.toApiLetter(sequenceIdentifier) } + return SequenceLettersResponse(slice, total) + } + + /** + * Evicts every letter belonging to the sequence identified by [sequenceIdentifier]. + * + * @return the number of letters that were actually evicted (0 when the synthetic id no longer + * resolves — e.g. the operator's view was stale). + */ + fun delete(processingGroup: String, sequenceIdentifier: String): Int { + val dlq = dlqFor(processingGroup).dlq + val sequence = findSequence(dlq, sequenceIdentifier) + if (sequence == null) { + logger.warn( + "DLQ delete-sequence: no sequence in [{}] matches synthetic id [{}] — nothing to evict", + processingGroup, sequenceIdentifier, + ) + return 0 + } + logger.info( + "DLQ delete-sequence: evicting {} letters from sequence [{}] in [{}]", + sequence.size, sequenceIdentifier, processingGroup, + ) + var evicted = 0 + sequence.forEach { + dlq.evict(it, null).join() + evicted++ + } + return evicted + } + + /** + * Evicts a single letter identified by [messageIdentifier] from the sequence identified by + * [sequenceIdentifier]. Returns `true` when an eviction was performed; `false` indicates the + * synthetic id or message id no longer resolves (typically because the caller's view was stale). + */ + fun delete(processingGroup: String, sequenceIdentifier: String, messageIdentifier: String): Boolean { + val dlq = dlqFor(processingGroup).dlq + val sequence = findSequence(dlq, sequenceIdentifier) + if (sequence == null) { + logger.warn( + "DLQ delete-letter: no sequence in [{}] matches synthetic id [{}] (message id was [{}]) — caller view likely stale", + processingGroup, sequenceIdentifier, messageIdentifier, + ) + return false + } + val target = sequence.firstOrNull { it.message().identifier() == messageIdentifier } + if (target == null) { + logger.warn( + "DLQ delete-letter: sequence [{}] in [{}] (size={}) does not contain message id [{}] — already evicted?", + sequenceIdentifier, processingGroup, sequence.size, messageIdentifier, + ) + return false + } + logger.info( + "DLQ delete-letter: evicting message [{}] from sequence [{}] in [{}]", + messageIdentifier, sequenceIdentifier, processingGroup, + ) + dlq.evict(target, null).join() + return true + } + + /** + * Resolves a DLQ sequence by the synthetic identifier this manager exposes through the API + * (the message id of the sequence's first letter). Walks all sequences once and matches. + */ + private fun findSequence( + dlq: SequencedDeadLetterQueue, + syntheticSequenceId: String, + ): List>? { + val sequences = dlq.deadLetters(null).join() + for (sequence in sequences) { + val letters = sequence.toList() + if (letters.firstOrNull()?.message()?.identifier() == syntheticSequenceId) { + return letters + } + } + return null + } + + fun process(processingGroup: String, messageIdentifier: String): Boolean { + val processor = dlqFor(processingGroup).processor + return processor.process { it.message().identifier() == messageIdentifier } + .get(60, TimeUnit.SECONDS) + } + + fun processAll( + processingGroup: String, + maxMessages: Int? = null, + timeoutSeconds: Long = 600, + ): Int { + val processor = dlqFor(processingGroup).processor + var processed = 0 + val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds) + while (maxMessages == null || processed < maxMessages) { + if (System.nanoTime() > deadline) break + val didProcess = processor.process { true }.get(timeoutSeconds, TimeUnit.SECONDS) + if (!didProcess) break + processed++ + } + return processed + } + + fun deleteAll(processingGroup: String, timeoutSeconds: Long = 600): Int { + val dlq = dlqFor(processingGroup).dlq + val totalCount = dlq.size(null).get(timeoutSeconds, TimeUnit.SECONDS).toInt() + dlq.clear(null).get(timeoutSeconds, TimeUnit.SECONDS) + return totalCount + } + + /** + * Returns the DLQ entries belonging to the given processor — used by [ProcessorReportCreator] to surface DLQ size + * per processing group in the processor report. + */ + fun dlqInfoForProcessor(processorName: String): List = + discover() + .filter { it.processorName == processorName } + .map { DlqInfo(it.processingGroup, it.dlq.amountOfSequences(null).join()) } + + private fun dlqFor(processingGroup: String): DlqEntry = + discover().firstOrNull { it.processingGroup == processingGroup } + ?: throw IllegalArgumentException( + "There is no dead-letter queue for processing group [$processingGroup]") + + @Suppress("UNCHECKED_CAST") + private fun discoverEntries(): List { + data class Parsed( + val module: Configuration, + val processor: String, + val component: String, + val dlq: SequencedDeadLetterQueue, + ) + + val parsed = configuration.moduleConfigurations.flatMap { module -> + module.getComponents(SequencedDeadLetterQueue::class.java) + .mapNotNull { (name, dlq) -> + val match = dlqNamePattern.find(name) ?: return@mapNotNull null + Parsed( + module = module, + processor = match.groupValues[1], + component = match.groupValues[2], + dlq = dlq as SequencedDeadLetterQueue, + ) + } + } + val perProcessor = parsed.groupingBy { it.processor }.eachCount() + return parsed.map { + val ehcName = "EventHandlingComponent[${it.processor}][${it.component}]" + val processor = it.module + .getOptionalComponent(SequencedDeadLetterProcessor::class.java, ehcName) + .orElseThrow { + IllegalStateException( + "Component [$ehcName] is not wrapped with dead-letter processing") + } as SequencedDeadLetterProcessor + DlqEntry( + processingGroup = if (perProcessor[it.processor] == 1) it.processor else "${it.processor}::${it.component}", + processorName = it.processor, + componentName = it.component, + dlq = it.dlq, + processor = processor, + ) + } + } + + private fun discover(): List = entries ?: discoverEntries().also { entries = it } + + private fun DeadLetter.toApiLetter(sequenceIdentifier: String): ApiDeadLetter { + val message = this.message() + return ApiDeadLetter( + messageIdentifier = message.identifier(), + message = serializePayload(message), + messageType = messageTypeOf(message), + causeType = this.cause().map { it.type() }.orElse(null), + causeMessage = this.cause().map { it.message() }.orElse(null), + enqueuedAt = this.enqueuedAt(), + lastTouched = this.lastTouched(), + diagnostics = this.diagnostics(), + sequenceIdentifier = sequenceIdentifier, + ) + } + + /** + * Best-effort human-readable type name for the payload. When the DLQ has the message in its + * still-serialised form the JVM type is `byte[]`, which is useless to display, so we fall back + * to the qualified name carried on the message's [org.axonframework.messaging.core.MessageType]. + */ + private fun messageTypeOf(message: EventMessage): String { + val payloadClass = message.payloadType() + if (payloadClass == ByteArray::class.java) { + return runCatching { message.type().name() }.getOrDefault("byte[]") + } + return payloadClass.simpleName ?: payloadClass.name + } + + private fun serializePayload(message: EventMessage): String { + val raw: String = try { + when (val payload = message.payload()) { + null -> "" + is ByteArray -> String(payload, Charsets.UTF_8) + is String -> payload + else -> payload.toString() + } + } catch (_: Exception) { + "" + } + // UTF-8-safe truncation so multi-byte characters can't get split mid-codepoint. + return raw.toByteArray(Charsets.UTF_8) + .let { if (it.size <= LETTER_PAYLOAD_SIZE_LIMIT) raw else String(it, 0, LETTER_PAYLOAD_SIZE_LIMIT, Charsets.UTF_8) } + } + + /** + * Lightweight DTO returned to [ProcessorReportCreator] so it can populate per-processor DLQ size information + * without exposing the full dead-letter API. + */ + data class DlqInfo(val processingGroup: String, val sequenceCount: Long) +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessingGroupInfoSource.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessingGroupInfoSource.kt new file mode 100644 index 0000000..13ec7c8 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessingGroupInfoSource.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +/** + * Always-loadable seam used by [ProcessorReportCreator] to learn about processing groups (and their DLQ size, if + * any) belonging to a processor. + * + * Carrying this contract in a class with no references to the optional `axoniq-dead-letter` types lets + * [ProcessorReportCreator] stay free of those types so it can run on classpaths where the addon is absent. + */ +interface ProcessingGroupInfoSource { + + fun infoFor(processorName: String): List + + data class ProcessingGroupInfo( + val processingGroup: String, + val dlqSize: Long?, + ) +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt index 494ebf2..9c88c68 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreator.kt @@ -16,6 +16,7 @@ package io.axoniq.platform.framework.eventprocessor +import io.axoniq.platform.framework.api.ProcessingGroupStatus import io.axoniq.platform.framework.api.ProcessorMode import io.axoniq.platform.framework.api.ProcessorStatus import io.axoniq.platform.framework.api.ProcessorStatusReport @@ -30,9 +31,14 @@ import org.axonframework.messaging.eventhandling.processing.streaming.pooled.Poo import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.EventTrackerStatus import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessor +import org.slf4j.LoggerFactory class ProcessorReportCreator(private val processingConfig: Configuration) { + private val logger = LoggerFactory.getLogger(ProcessorReportCreator::class.java) private val metricsRegistry = processingConfig.getComponent(ProcessorMetricsRegistry::class.java) + // Optional — only present when an addon (currently only axoniq-dead-letter) registers a source. + private val processingGroupInfoSource: ProcessingGroupInfoSource? = + processingConfig.getOptionalComponent(ProcessingGroupInfoSource::class.java).orElse(null) companion object { const val MULTI_TENANT_PROCESSOR_CLASS = "org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor" } @@ -51,7 +57,7 @@ class ProcessorReportCreator(private val processingConfig: Configuration) { private fun streamingStatus(name: String, processor: StreamingEventProcessor) = ProcessorStatus( name, - emptyList(), + processingGroupsFor(name), processor.tokenStoreIdentifier, processor.toType(), processor.isRunning, @@ -61,6 +67,18 @@ class ProcessorReportCreator(private val processingConfig: Configuration) { processor.processingStatus().map { (_, segment) -> segment.toStatus(name) }, ) + private fun processingGroupsFor(processorName: String): List { + val source = processingGroupInfoSource ?: return emptyList() + return try { + source.infoFor(processorName) + .map { ProcessingGroupStatus(it.processingGroup, it.dlqSize) } + } catch (e: Exception) { + // A failing probe must not break processor reporting. + logger.warn("Failed to collect processing group information for processor [{}]", processorName, e) + emptyList() + } + } + private fun subscribingStatus(name: String, processor: SubscribingEventProcessor) = ProcessorStatus( name, diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponder.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponder.kt new file mode 100644 index 0000000..c0f3117 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponder.kt @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.api.DeadLetterProcessRequest +import io.axoniq.platform.framework.api.DeadLetterRequest +import io.axoniq.platform.framework.api.DeadLetterResponse +import io.axoniq.platform.framework.api.DeadLetterSequenceDeleteRequest +import io.axoniq.platform.framework.api.DeadLetterSequenceSize +import io.axoniq.platform.framework.api.DeadLetterSingleDeleteRequest +import io.axoniq.platform.framework.api.DeleteAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.FetchSequenceLettersRequest +import io.axoniq.platform.framework.api.ProcessAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.Routes +import io.axoniq.platform.framework.api.SequenceLettersResponse +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import org.slf4j.LoggerFactory + +open class RSocketDlqResponder( + private val deadLetterManager: DeadLetterManager, + private val registrar: RSocketHandlerRegistrar, +) { + private val logger = LoggerFactory.getLogger(this::class.java) + + fun start() { + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.LETTERS, + DeadLetterRequest::class.java, + this::handleDeadLetterQuery, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.SEQUENCE_SIZE, + DeadLetterSequenceSize::class.java, + this::handleSequenceSizeQuery, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.SEQUENCE_LETTERS, + FetchSequenceLettersRequest::class.java, + this::handleSequenceLettersQuery, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.DELETE_SEQUENCE, + DeadLetterSequenceDeleteRequest::class.java, + this::handleDeleteSequenceCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.DELETE_LETTER, + DeadLetterSingleDeleteRequest::class.java, + this::handleDeleteLetterCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.PROCESS, + DeadLetterProcessRequest::class.java, + this::handleProcessCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES, + ProcessAllDeadLetterSequencesRequest::class.java, + this::handleProcessAllSequencesCommand, + ) + registrar.registerHandlerWithPayload( + Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES, + DeleteAllDeadLetterSequencesRequest::class.java, + this::handleDeleteAllSequencesCommand, + ) + } + + private fun handleDeadLetterQuery(request: DeadLetterRequest): DeadLetterResponse { + logger.debug("Handling Axoniq Platform DEAD_LETTERS query [{}]", request) + return deadLetterManager.deadLetters( + request.processingGroup, + request.offset, + request.size, + request.maxSequenceLetters, + ) + } + + private fun handleSequenceSizeQuery(request: DeadLetterSequenceSize): Long { + logger.debug( + "Handling Axoniq Platform DEAD_LETTER_SEQUENCE_SIZE query for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.sequenceSize(request.processingGroup, request.sequenceIdentifier) + } + + private fun handleSequenceLettersQuery(request: FetchSequenceLettersRequest): SequenceLettersResponse { + logger.debug( + "Handling Axoniq Platform DEAD_LETTER_SEQUENCE_LETTERS query for processing group [{}], sequence [{}], offset={}, size={}", + request.processingGroup, + request.sequenceIdentifier, + request.offset, + request.size, + ) + return deadLetterManager.lettersForSequence( + request.processingGroup, + request.sequenceIdentifier, + request.offset, + request.size, + ) + } + + private fun handleDeleteSequenceCommand(request: DeadLetterSequenceDeleteRequest) { + logger.debug( + "Handling Axoniq Platform DELETE_SEQUENCE command for processing group [{}], sequence [{}]", + request.processingGroup, request.sequenceIdentifier, + ) + val evicted = deadLetterManager.delete(request.processingGroup, request.sequenceIdentifier) + logger.info( + "DELETE_SEQUENCE for [{}] sequence [{}] → evicted {} letter(s)", + request.processingGroup, request.sequenceIdentifier, evicted, + ) + } + + private fun handleDeleteLetterCommand(request: DeadLetterSingleDeleteRequest) { + logger.debug( + "Handling Axoniq Platform DELETE_LETTER command for processing group [{}], sequence [{}], message [{}]", + request.processingGroup, request.sequenceIdentifier, request.messageIdentifier, + ) + val evicted = deadLetterManager.delete( + request.processingGroup, + request.sequenceIdentifier, + request.messageIdentifier, + ) + logger.info( + "DELETE_LETTER for [{}] sequence [{}] message [{}] → {}", + request.processingGroup, request.sequenceIdentifier, request.messageIdentifier, + if (evicted) "evicted" else "no-op (id no longer resolves)", + ) + } + + private fun handleProcessCommand(request: DeadLetterProcessRequest): Boolean { + logger.debug( + "Handling Axoniq Platform PROCESS command for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.process(request.processingGroup, request.messageIdentifier) + } + + private fun handleProcessAllSequencesCommand(request: ProcessAllDeadLetterSequencesRequest): Int { + logger.debug( + "Handling Axoniq Platform PROCESS_ALL_SEQUENCES command for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.processAll(request.processingGroup, request.maxMessages) + } + + private fun handleDeleteAllSequencesCommand(request: DeleteAllDeadLetterSequencesRequest): Int { + logger.debug( + "Handling Axoniq Platform DELETE_ALL_SEQUENCES command for processing group [{}]", + request.processingGroup, + ) + return deadLetterManager.deleteAll(request.processingGroup) + } +} diff --git a/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer b/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer index d69912f..4ec0605 100644 --- a/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer +++ b/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer @@ -17,4 +17,5 @@ io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer io.axoniq.platform.framework.messaging.distributed.AxoniqPlatformDistributedMessagingConfigurerEnhancer io.axoniq.platform.framework.modelling.AxoniqPlatformModellingConfigurationEnhancer -io.axoniq.platform.framework.eventsourcing.AxoniqPlatformEventsourcingConfigurerEnhancer \ No newline at end of file +io.axoniq.platform.framework.eventsourcing.AxoniqPlatformEventsourcingConfigurerEnhancer +io.axoniq.platform.framework.eventprocessor.AxoniqPlatformDeadLetterConfigurerEnhancer \ No newline at end of file diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancerTest.kt new file mode 100644 index 0000000..a2305d1 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/AxoniqPlatformDeadLetterConfigurerEnhancerTest.kt @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.AxoniqPlatformConfiguration +import io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.ComponentRegistry +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +/** + * Verifies the guards in [AxoniqPlatformDeadLetterConfigurerEnhancer]: the DLQ components must + * register only when the host application is wired with the platform client + * ([AxoniqPlatformConfiguration] present) and the addon hasn't already been registered. + * + * The classpath probe is intentionally NOT covered here — `axoniq-dead-letter` is on the test + * classpath via `provided` scope, so [Class.forName] always succeeds in tests. Exercising the + * absent branch would require classloader trickery that adds more risk than coverage. + */ +class AxoniqPlatformDeadLetterConfigurerEnhancerTest { + + private val enhancer = AxoniqPlatformDeadLetterConfigurerEnhancer() + + @Test + fun `registers all three components when AxoniqPlatformConfiguration is present and neither DLQ component is registered yet`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns true + every { registry.hasComponent(DeadLetterManager::class.java) } returns false + every { registry.hasComponent(ProcessingGroupInfoSource::class.java) } returns false + + enhancer.enhance(registry) + + // DeadLetterManager + ProcessingGroupInfoSource + RSocketDlqResponder + verify(exactly = 3) { registry.registerComponent(any>()) } + } + + @Test + fun `no-op when AxoniqPlatformConfiguration is absent — host is not a platform client`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns false + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + @Test + fun `idempotent — no registrations when DeadLetterManager is already registered`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns true + every { registry.hasComponent(DeadLetterManager::class.java) } returns true + every { registry.hasComponent(ProcessingGroupInfoSource::class.java) } returns false + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + @Test + fun `idempotent — no registrations when ProcessingGroupInfoSource is already registered`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(AxoniqPlatformConfiguration::class.java) } returns true + every { registry.hasComponent(DeadLetterManager::class.java) } returns false + every { registry.hasComponent(ProcessingGroupInfoSource::class.java) } returns true + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + // Note: the "Spring-path" branch in `register(...)` that skips re-registering + // ProcessingGroupInfoSource when it's already exposed by the Spring-backed registry is not + // reachable on the current code path — the top-level guard above bails out if EITHER + // DeadLetterManager or ProcessingGroupInfoSource is already registered. The pair of + // idempotency tests above therefore cover both flags of that combined guard. If the + // top-level guard is ever loosened, this branch would need its own dedicated test. + + @Test + fun `order is PLATFORM_ENHANCER_ORDER + 1 so the platform client components are visible`() { + // RSocketDlqResponder needs RSocketHandlerRegistrar at start-time; that component is + // registered by the main platform enhancer, so this enhancer must run after it. + assertEquals(AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + 1, enhancer.order()) + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/DeadLetterManagerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/DeadLetterManagerTest.kt new file mode 100644 index 0000000..1dc1107 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/DeadLetterManagerTest.kt @@ -0,0 +1,520 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.framework.messaging.deadletter.Cause +import io.axoniq.framework.messaging.deadletter.DeadLetter +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterProcessor +import io.axoniq.framework.messaging.deadletter.SequencedDeadLetterQueue +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.axonframework.common.configuration.Configuration +import org.axonframework.messaging.core.Metadata +import org.axonframework.messaging.core.MessageType +import org.axonframework.messaging.eventhandling.EventMessage +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.time.Instant +import java.util.Optional +import java.util.concurrent.CompletableFuture + +/** + * Unit tests for [DeadLetterManager] — discovery, synthetic-id mapping, pagination, payload + * truncation, and delegation to the underlying [SequencedDeadLetterQueue]. Builds the queue + * fakes with mockk; the real DLQ implementation isn't on the classpath we want to exercise. + * + * Intentionally out of scope: + * - `findDeadLetterProcessor` reflective walk over `EventHandlingComponent` decorators — that + * needs the AF5 module wiring to materialise, which is integration territory. + * - `process(...)` / `processAll(...)` — these delegate to the resolved + * `SequencedDeadLetterProcessor` whose `process(Predicate)` future-form is asymmetric to + * construct from the test side; the colleague explicitly said no integration tests. + */ +class DeadLetterManagerTest { + + // --------------------------------------------------------------------------------------- + // Discovery / processing group naming + // --------------------------------------------------------------------------------------- + + @Test + fun `exposes processor name as processing group when the processor has a single DLQ`() { + val dlq = fakeDlq() + val configuration = configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + val manager = DeadLetterManager(configuration).also { it.start() } + + val infos = manager.infoFor("orders") + + assertEquals(listOf("orders"), infos.map { it.processingGroup }) + } + + @Test + fun `exposes processor__component identifier when a processor has multiple DLQs`() { + val configuration = configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + "DeadLetterQueue[EventHandlingComponent[orders][AuditProjector]]" to fakeDlq(), + ) + val manager = DeadLetterManager(configuration).also { it.start() } + + val infos = manager.infoFor("orders") + + assertEquals( + setOf("orders::OrderProjector", "orders::AuditProjector"), + infos.map { it.processingGroup }.toSet(), + ) + } + + @Test + fun `ignores components whose names do not match the DLQ pattern`() { + val configuration = configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + "SomeOtherComponent" to fakeDlq(), + "DeadLetterQueue[Other][format]" to fakeDlq(), + ) + val manager = DeadLetterManager(configuration).also { it.start() } + + assertEquals(listOf("orders"), manager.infoFor("orders").map { it.processingGroup }) + } + + @Test + fun `infoFor returns only DLQs belonging to the requested processor`() { + val configuration = configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(sequenceCount = 3), + "DeadLetterQueue[EventHandlingComponent[shipping][ShipmentProjector]]" to fakeDlq(sequenceCount = 7), + ) + val manager = DeadLetterManager(configuration).also { it.start() } + + val ordersInfo = manager.infoFor("orders") + val shippingInfo = manager.infoFor("shipping") + + assertEquals(listOf("orders" to 3L), ordersInfo.map { it.processingGroup to it.dlqSize }) + assertEquals(listOf("shipping" to 7L), shippingInfo.map { it.processingGroup to it.dlqSize }) + } + + // --------------------------------------------------------------------------------------- + // Synthetic sequence id + // --------------------------------------------------------------------------------------- + + @Test + fun `deadLetters stamps every letter in a sequence with the first letter's message id`() { + val sequence = listOf( + fakeLetter(messageId = "first"), + fakeLetter(messageId = "second"), + fakeLetter(messageId = "third"), + ) + val dlq = fakeDlq(sequences = listOf(sequence)) + val configuration = configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + val manager = DeadLetterManager(configuration).also { it.start() } + + val response = manager.deadLetters("orders") + + assertEquals(1, response.sequences.size) + assertEquals(listOf("first", "first", "first"), response.sequences[0].map { it.sequenceIdentifier }) + } + + @Test + fun `empty sequence gets an empty-string synthetic id without crashing`() { + // Degenerate but documented: if a sequence iterator yields no letters, the synthetic id + // is the empty string. The mapped list is empty, so there's nothing to inspect on it — + // we just need this not to throw. + val dlq = fakeDlq(sequences = listOf(emptyList())) + val configuration = configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + ) + val manager = DeadLetterManager(configuration).also { it.start() } + + val response = manager.deadLetters("orders") + + assertEquals(1, response.sequences.size) + assertTrue(response.sequences[0].isEmpty()) + } + + // --------------------------------------------------------------------------------------- + // lettersForSequence pagination + // --------------------------------------------------------------------------------------- + + @Test + fun `lettersForSequence returns the requested slice in order with the correct total`() { + val sequence = (1..5).map { fakeLetter(messageId = "m$it", payload = "p$it") } + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val response = manager.lettersForSequence("orders", "m1", offset = 1, size = 2) + + assertEquals(5L, response.totalCount) + assertEquals(listOf("m2", "m3"), response.letters.map { it.messageIdentifier }) + } + + @Test + fun `lettersForSequence coerces a negative offset to zero`() { + val sequence = (1..3).map { fakeLetter(messageId = "m$it") } + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val response = manager.lettersForSequence("orders", "m1", offset = -5, size = 2) + + assertEquals(listOf("m1", "m2"), response.letters.map { it.messageIdentifier }) + } + + @Test + fun `lettersForSequence coerces a non-positive size to one`() { + val sequence = (1..3).map { fakeLetter(messageId = "m$it") } + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val response = manager.lettersForSequence("orders", "m1", offset = 0, size = 0) + + assertEquals(1, response.letters.size) + assertEquals("m1", response.letters[0].messageIdentifier) + } + + @Test + fun `lettersForSequence returns an empty response when no sequence matches the synthetic id`() { + val sequence = listOf(fakeLetter(messageId = "real")) + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val response = manager.lettersForSequence("orders", "stale-id", 0, 10) + + assertTrue(response.letters.isEmpty()) + assertEquals(0L, response.totalCount) + } + + @Test + fun `lettersForSequence caps the slice at the size argument even when the sequence is larger`() { + val sequence = (1..10).map { fakeLetter(messageId = "m$it") } + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val response = manager.lettersForSequence("orders", "m1", offset = 0, size = 3) + + assertEquals(3, response.letters.size) + assertEquals(10L, response.totalCount) + } + + // --------------------------------------------------------------------------------------- + // delete / deleteAll delegation + // --------------------------------------------------------------------------------------- + + @Test + fun `delete by sequence evicts every letter in that sequence`() { + val letters = listOf(fakeLetter("m1"), fakeLetter("m2"), fakeLetter("m3")) + val dlq = fakeDlq(sequences = listOf(letters)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val evicted = manager.delete("orders", "m1") + + assertEquals(3, evicted) + letters.forEach { verify(exactly = 1) { dlq.evict(it, null) } } + } + + @Test + fun `delete by sequence is a no-op when the sequence does not exist`() { + val dlq = fakeDlq(sequences = listOf(listOf(fakeLetter("real")))) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val evicted = manager.delete("orders", "ghost") + + assertEquals(0, evicted) + verify(exactly = 0) { dlq.evict(any>(), any()) } + } + + @Test + fun `delete by message evicts only the matching letter`() { + val letter1 = fakeLetter("m1") + val letter2 = fakeLetter("m2") + val dlq = fakeDlq(sequences = listOf(listOf(letter1, letter2))) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val evicted = manager.delete("orders", "m1", "m2") + + assertTrue(evicted) + verify(exactly = 1) { dlq.evict(letter2, null) } + verify(exactly = 0) { dlq.evict(letter1, null) } + } + + @Test + fun `delete by message is a no-op when the message id is unknown in the sequence`() { + val letter1 = fakeLetter("m1") + val dlq = fakeDlq(sequences = listOf(listOf(letter1))) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val evicted = manager.delete("orders", "m1", "missing") + + assertFalse(evicted) + verify(exactly = 0) { dlq.evict(any>(), any()) } + } + + @Test + fun `delete by message is a no-op when the sequence does not resolve`() { + val dlq = fakeDlq(sequences = listOf(listOf(fakeLetter("real")))) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val evicted = manager.delete("orders", "ghost", "anything") + + assertFalse(evicted) + verify(exactly = 0) { dlq.evict(any>(), any()) } + } + + @Test + fun `deleteAll returns the queue size and clears the queue`() { + val dlq = fakeDlq(totalSize = 42L) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val deleted = manager.deleteAll("orders") + + assertEquals(42, deleted) + verify(exactly = 1) { dlq.clear(null) } + } + + @Test + fun `sequenceSize returns the count of letters for the matching synthetic id`() { + val sequence = (1..4).map { fakeLetter("m$it") } + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + assertEquals(4L, manager.sequenceSize("orders", "m1")) + } + + @Test + fun `sequenceSize returns zero when the synthetic id does not resolve`() { + val sequence = listOf(fakeLetter("real")) + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + assertEquals(0L, manager.sequenceSize("orders", "ghost")) + } + + // --------------------------------------------------------------------------------------- + // Payload truncation + messageType fallback + // --------------------------------------------------------------------------------------- + + @Test + fun `payload at or below 1024 UTF-8 bytes is returned untouched`() { + val payload = "x".repeat(1024) + val sequence = listOf(fakeLetter("m1", payload = payload)) + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val response = manager.deadLetters("orders") + + assertEquals(payload, response.sequences[0][0].message) + } + + @Test + fun `payload over 1024 UTF-8 bytes is truncated without splitting a multi-byte codepoint`() { + // "č" is U+010D, two bytes in UTF-8. Filling beyond 1024 bytes guarantees the cutoff + // lands inside a multi-byte sequence — a naive byte slice would yield a malformed + // codepoint there. The implementation must round down to the previous valid boundary. + val char = "č" + val payload = char.repeat(600) // 600 * 2 = 1200 bytes + val sequence = listOf(fakeLetter("m1", payload = payload)) + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val result = manager.deadLetters("orders").sequences[0][0].message + + // Truncated string fits within the limit ... + assertTrue(result.toByteArray(Charsets.UTF_8).size <= 1024) + // ... and contains no replacement characters from a mid-codepoint split. + assertFalse(result.contains('�')) + // The resulting string is composed entirely of valid "č" codepoints. + assertTrue(result.all { it == 'č' }) + } + + @Test + fun `messageType falls back to message type name when payload class is ByteArray`() { + val message = fakeEventMessage( + id = "m1", + payload = "still serialised".toByteArray(), + payloadType = ByteArray::class.java, + typeName = "com.example.OrderPlaced", + ) + val letter = fakeLetterFromMessage(message) + val dlq = fakeDlq(sequences = listOf(listOf(letter))) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val apiLetter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("com.example.OrderPlaced", apiLetter.messageType) + } + + @Test + fun `messageType uses payload class simple name for non-ByteArray payloads`() { + val sequence = listOf(fakeLetter("m1", payload = "hello", payloadType = String::class.java)) + val dlq = fakeDlq(sequences = listOf(sequence)) + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to dlq, + )).also { it.start() } + + val apiLetter = manager.deadLetters("orders").sequences[0][0] + + assertEquals("String", apiLetter.messageType) + } + + // --------------------------------------------------------------------------------------- + // dlqFor error + // --------------------------------------------------------------------------------------- + + @Test + fun `unknown processing group throws IllegalArgumentException`() { + val manager = DeadLetterManager(configurationWith( + "DeadLetterQueue[EventHandlingComponent[orders][OrderProjector]]" to fakeDlq(), + )).also { it.start() } + + assertThrows(IllegalArgumentException::class.java) { + manager.sequenceSize("unknown-group", "whatever") + } + } + + // --------------------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------------------- + + /** + * Builds a [Configuration] backed by a single module that exposes the given DLQs and a + * matching [SequencedDeadLetterProcessor] for each — the manager looks up the latter by the + * `EventHandlingComponent[][]` name to materialise its + * `DlqEntry.processor` field. + */ + private fun configurationWith( + vararg dlqsByName: Pair>, + ): Configuration { + val module = mockk(relaxed = true) + every { module.getComponents(SequencedDeadLetterQueue::class.java) } returns + dlqsByName.toMap().mapValues { it.value as SequencedDeadLetterQueue<*> } + // Every DLQ name carries the processor + component segment used to address its processor. + dlqsByName.forEach { (name, _) -> + val match = Regex("""^DeadLetterQueue\[EventHandlingComponent\[([^]]+)]\[(.+)]]$""").find(name) + if (match != null) { + val ehcName = "EventHandlingComponent[${match.groupValues[1]}][${match.groupValues[2]}]" + val processor = mockk>(relaxed = true) + every { + module.getOptionalComponent(SequencedDeadLetterProcessor::class.java, ehcName) + } returns Optional.of(processor) + } + } + val root = mockk(relaxed = true) + every { root.moduleConfigurations } returns listOf(module) + return root + } + + private fun fakeDlq( + sequences: List>> = emptyList(), + sequenceCount: Long = sequences.size.toLong(), + totalSize: Long = sequences.sumOf { it.size.toLong() }, + ): SequencedDeadLetterQueue { + val dlq = mockk>(relaxed = true) + // `deadLetters(null)` returns a CompletableFuture>>; the + // manager calls `.join()` and iterates with `.toList()` on each inner sequence, so any + // Iterable shape works here. + every { dlq.deadLetters(null) } returns CompletableFuture.completedFuture( + sequences as Iterable>>, + ) + every { dlq.amountOfSequences(null) } returns CompletableFuture.completedFuture(sequenceCount) + every { dlq.size(null) } returns CompletableFuture.completedFuture(totalSize) + every { dlq.clear(null) } returns CompletableFuture.completedFuture(null) + every { dlq.evict(any>(), null) } returns CompletableFuture.completedFuture(null) + return dlq + } + + private fun fakeLetter( + messageId: String, + payload: Any? = "payload-$messageId", + payloadType: Class<*> = (payload?.javaClass ?: String::class.java), + causeType: String? = "java.lang.RuntimeException", + causeMessage: String? = "boom", + ): DeadLetter { + val message = fakeEventMessage(messageId, payload, payloadType) + return fakeLetterFromMessage(message, causeType, causeMessage) + } + + private fun fakeLetterFromMessage( + message: EventMessage, + causeType: String? = "java.lang.RuntimeException", + causeMessage: String? = "boom", + ): DeadLetter { + val letter = mockk>(relaxed = true) + every { letter.message() } returns message + val cause: Optional = if (causeType == null) Optional.empty() else { + val c = mockk() + every { c.type() } returns causeType + every { c.message() } returns (causeMessage ?: "") + Optional.of(c) + } + every { letter.cause() } returns cause + every { letter.enqueuedAt() } returns Instant.EPOCH + every { letter.lastTouched() } returns Instant.EPOCH + every { letter.diagnostics() } returns Metadata.emptyInstance() + return letter + } + + private fun fakeEventMessage( + id: String, + payload: Any?, + payloadType: Class<*>, + typeName: String = "com.example.${payloadType.simpleName ?: "Anonymous"}", + ): EventMessage { + val message = mockk(relaxed = true) + every { message.identifier() } returns id + every { message.payload() } returns payload + every { message.payloadType() } returns payloadType + val type = mockk() + every { type.name() } returns typeName + every { message.type() } returns type + return message + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreatorTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreatorTest.kt new file mode 100644 index 0000000..9f6ecf2 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/ProcessorReportCreatorTest.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.api.ProcessingGroupStatus +import io.mockk.every +import io.mockk.mockk +import org.axonframework.common.configuration.Configuration +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import java.util.Optional + +/** + * Focused unit tests for the [ProcessingGroupInfoSource] integration in [ProcessorReportCreator]. + * The full `createReport()` flow needs a live AF5 configuration with processors and segment + * tracker statuses — that's integration territory and not what we're testing here. Instead we + * reach into the private `processingGroupsFor` via reflection so we can exercise the optional + * source seam in isolation. + * + * Reflection is the right tool here because the production class deliberately keeps this + * method private (it's a callee of `streamingStatus(...)` only), and widening visibility just + * for tests would leak the seam into the public surface. + */ +class ProcessorReportCreatorTest { + + @Test + fun `processingGroupsFor returns empty when no ProcessingGroupInfoSource is registered`() { + val config = baseConfigurationWith(infoSource = Optional.empty()) + val creator = ProcessorReportCreator(config) + + val result = invokeProcessingGroupsFor(creator, "orders") + + assertEquals(emptyList(), result) + } + + @Test + fun `processingGroupsFor maps source infos to ProcessingGroupStatus entries`() { + val source = mockk() + every { source.infoFor("orders") } returns listOf( + ProcessingGroupInfoSource.ProcessingGroupInfo("orders", 3L), + ProcessingGroupInfoSource.ProcessingGroupInfo("orders::audit", 0L), + ) + val config = baseConfigurationWith(infoSource = Optional.of(source)) + val creator = ProcessorReportCreator(config) + + val result = invokeProcessingGroupsFor(creator, "orders") + + assertEquals( + listOf( + ProcessingGroupStatus("orders", 3L), + ProcessingGroupStatus("orders::audit", 0L), + ), + result, + ) + } + + @Test + fun `processingGroupsFor swallows source exceptions and returns empty list`() { + // A failing probe must not break processor reporting — the warning is logged but the + // caller receives an empty list and the rest of the report still renders. + val source = mockk() + every { source.infoFor("orders") } throws RuntimeException("boom") + val config = baseConfigurationWith(infoSource = Optional.of(source)) + val creator = ProcessorReportCreator(config) + + val result = invokeProcessingGroupsFor(creator, "orders") + + assertEquals(emptyList(), result) + } + + /** + * The constructor calls `getComponent(ProcessorMetricsRegistry::class.java)` and + * `getOptionalComponent(ProcessingGroupInfoSource::class.java)`. We provide just enough of + * each so construction succeeds; the metrics registry isn't exercised by the test path + * (no segments => no metrics lookups), so a relaxed mock is fine. + */ + private fun baseConfigurationWith(infoSource: Optional): Configuration { + val config = mockk(relaxed = true) + every { config.getComponent(ProcessorMetricsRegistry::class.java) } returns ProcessorMetricsRegistry() + every { config.getOptionalComponent(ProcessingGroupInfoSource::class.java) } returns infoSource + return config + } + + @Suppress("UNCHECKED_CAST") + private fun invokeProcessingGroupsFor(creator: ProcessorReportCreator, processorName: String): List { + val method = ProcessorReportCreator::class.java.getDeclaredMethod("processingGroupsFor", String::class.java) + method.isAccessible = true + return method.invoke(creator, processorName) as List + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderTest.kt new file mode 100644 index 0000000..e31b402 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventprocessor/RSocketDlqResponderTest.kt @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventprocessor + +import io.axoniq.platform.framework.api.DeadLetterProcessRequest +import io.axoniq.platform.framework.api.DeadLetterRequest +import io.axoniq.platform.framework.api.DeadLetterResponse +import io.axoniq.platform.framework.api.DeadLetterSequenceDeleteRequest +import io.axoniq.platform.framework.api.DeadLetterSequenceSize +import io.axoniq.platform.framework.api.DeadLetterSingleDeleteRequest +import io.axoniq.platform.framework.api.DeleteAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.FetchSequenceLettersRequest +import io.axoniq.platform.framework.api.ProcessAllDeadLetterSequencesRequest +import io.axoniq.platform.framework.api.Routes +import io.axoniq.platform.framework.api.SequenceLettersResponse +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.mockk.CapturingSlot +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import io.mockk.slot +import io.mockk.verify +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * Verifies that [RSocketDlqResponder] registers a handler for every DLQ route on + * [RSocketHandlerRegistrar.registerHandlerWithPayload] and that each captured handler delegates + * to the corresponding [DeadLetterManager] method. We capture the lambda handed to the registrar + * so we exercise both registration AND the handler's body in a single test per route. + * + * The slots are typed as `(T) -> Any` because that is the exact functional shape + * `registerHandlerWithPayload` declares; the production handlers happen to return more specific + * types but the registrar erases them down to `Any`. + */ +class RSocketDlqResponderTest { + + private lateinit var manager: DeadLetterManager + private lateinit var registrar: RSocketHandlerRegistrar + private lateinit var responder: RSocketDlqResponder + + private val letterHandler = slot<(DeadLetterRequest) -> Any>() + private val sequenceSizeHandler = slot<(DeadLetterSequenceSize) -> Any>() + private val sequenceLettersHandler = slot<(FetchSequenceLettersRequest) -> Any>() + private val deleteSequenceHandler = slot<(DeadLetterSequenceDeleteRequest) -> Any>() + private val deleteLetterHandler = slot<(DeadLetterSingleDeleteRequest) -> Any>() + private val processHandler = slot<(DeadLetterProcessRequest) -> Any>() + private val processAllHandler = slot<(ProcessAllDeadLetterSequencesRequest) -> Any>() + private val deleteAllHandler = slot<(DeleteAllDeadLetterSequencesRequest) -> Any>() + + @BeforeEach + fun setUp() { + manager = mockk(relaxed = true) + registrar = mockk(relaxed = true) + captureHandler(Routes.ProcessingGroup.DeadLetter.LETTERS, DeadLetterRequest::class.java, letterHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.SEQUENCE_SIZE, DeadLetterSequenceSize::class.java, sequenceSizeHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.SEQUENCE_LETTERS, FetchSequenceLettersRequest::class.java, sequenceLettersHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.DELETE_SEQUENCE, DeadLetterSequenceDeleteRequest::class.java, deleteSequenceHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.DELETE_LETTER, DeadLetterSingleDeleteRequest::class.java, deleteLetterHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.PROCESS, DeadLetterProcessRequest::class.java, processHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES, ProcessAllDeadLetterSequencesRequest::class.java, processAllHandler) + captureHandler(Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES, DeleteAllDeadLetterSequencesRequest::class.java, deleteAllHandler) + + responder = RSocketDlqResponder(manager, registrar) + responder.start() + } + + @Test + fun `start registers a handler for each of the eight DLQ routes`() { + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.LETTERS, DeadLetterRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.SEQUENCE_SIZE, DeadLetterSequenceSize::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.SEQUENCE_LETTERS, FetchSequenceLettersRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.DELETE_SEQUENCE, DeadLetterSequenceDeleteRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.DELETE_LETTER, DeadLetterSingleDeleteRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.PROCESS, DeadLetterProcessRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.PROCESS_ALL_SEQUENCES, ProcessAllDeadLetterSequencesRequest::class.java, any()) } + verify(exactly = 1) { registrar.registerHandlerWithPayload(Routes.ProcessingGroup.DeadLetter.DELETE_ALL_SEQUENCES, DeleteAllDeadLetterSequencesRequest::class.java, any()) } + } + + @Test + fun `LETTERS handler forwards all request parameters to DeadLetterManager deadLetters`() { + val expected = DeadLetterResponse(emptyList(), 7) + every { manager.deadLetters("g", 1, 10, 5) } returns expected + + val result = letterHandler.captured(DeadLetterRequest("g", offset = 1, size = 10, maxSequenceLetters = 5)) + + assertEquals(expected, result) + verify(exactly = 1) { manager.deadLetters("g", 1, 10, 5) } + } + + @Test + fun `SEQUENCE_SIZE handler forwards processing group and sequence id`() { + every { manager.sequenceSize("g", "seq-1") } returns 42L + + val result = sequenceSizeHandler.captured(DeadLetterSequenceSize("g", "seq-1")) + + assertEquals(42L, result) + verify(exactly = 1) { manager.sequenceSize("g", "seq-1") } + } + + @Test + fun `SEQUENCE_LETTERS handler forwards pagination arguments`() { + val expected = SequenceLettersResponse(emptyList(), 0) + every { manager.lettersForSequence("g", "seq-1", 5, 25) } returns expected + + val result = sequenceLettersHandler.captured(FetchSequenceLettersRequest("g", "seq-1", offset = 5, size = 25)) + + assertEquals(expected, result) + verify(exactly = 1) { manager.lettersForSequence("g", "seq-1", 5, 25) } + } + + @Test + fun `DELETE_SEQUENCE handler delegates to DeadLetterManager delete-by-sequence`() { + every { manager.delete("g", "seq-1") } returns 3 + + deleteSequenceHandler.captured(DeadLetterSequenceDeleteRequest("g", "seq-1")) + + verify(exactly = 1) { manager.delete("g", "seq-1") } + } + + @Test + fun `DELETE_LETTER handler delegates to DeadLetterManager delete-by-message`() { + every { manager.delete("g", "seq-1", "msg-1") } returns true + + deleteLetterHandler.captured(DeadLetterSingleDeleteRequest("g", "seq-1", "msg-1")) + + verify(exactly = 1) { manager.delete("g", "seq-1", "msg-1") } + } + + @Test + fun `PROCESS handler returns the manager's result`() { + every { manager.process("g", "msg-1") } returns true + + val result = processHandler.captured(DeadLetterProcessRequest("g", "msg-1")) + + assertEquals(true, result) + verify(exactly = 1) { manager.process("g", "msg-1") } + } + + @Test + fun `PROCESS_ALL_SEQUENCES handler forwards maxMessages`() { + every { manager.processAll("g", 9) } returns 7 + + val result = processAllHandler.captured(ProcessAllDeadLetterSequencesRequest("g", maxMessages = 9)) + + assertEquals(7, result) + verify(exactly = 1) { manager.processAll("g", 9) } + } + + @Test + fun `DELETE_ALL_SEQUENCES handler returns the cleared count`() { + every { manager.deleteAll("g") } returns 11 + + val result = deleteAllHandler.captured(DeleteAllDeadLetterSequencesRequest("g")) + + assertEquals(11, result) + verify(exactly = 1) { manager.deleteAll("g") } + } + + private fun captureHandler(route: String, payloadType: Class, handler: CapturingSlot<(T) -> Any>) { + every { + registrar.registerHandlerWithPayload(route, payloadType, capture(handler)) + } just runs + } +}