diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt index 558359c..8b92dcd 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/Routes.kt @@ -75,6 +75,13 @@ object Routes { const val ENTITY_STATE_AT_SEQUENCE = "entity-state-at-sequence" } + object Model { + const val REGISTERED_ENTITIES = "model-registered-entities" + const val DOMAIN_EVENTS = "model-domain-events" + const val ENTITY_STATE_AT_SEQUENCE = "model-entity-state-at-sequence" + const val REPLAY_TIMELINE = "model-replay-timeline" + } + object MessageFlow { const val STATS = "message-flow-stats" } diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt index e108977..92adc2a 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientIdentification.kt @@ -112,6 +112,8 @@ data class SupportedFeatures( val clientStatusUpdates: Boolean? = false, /* Whether the application has the entitlement manager configured, allowing it to receive licenses */ val licenseEntitlement: Boolean? = false, + /* Whether the client supports model inspection (AF5 StateManager-based entity inspection). */ + val modelInspection: Boolean? = false, ) data class Versions( diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/modelApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/modelApi.kt new file mode 100644 index 0000000..4f57b60 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/modelApi.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api + +data class RegisteredEntitiesResult( + val entities: List +) + +data class RegisteredEntityInfo( + val entityType: String, + /** + * All id types registered for this entity. AF5 entities can be addressed by multiple + * id types (e.g. one per command in a build agent), each producing different criteria. + * The frontend should let the user pick which id type to query against. + */ + val idTypes: List, +) + +data class IdType( + /** Fully qualified Java type name of the id class. */ + val type: String, + /** + * Structural descriptors of the id class's properties. Empty for "simple" types + * (String, primitives, UUID, etc.) — frontend renders a single text input. Populated + * for compound types (records / data classes / plain objects) — frontend renders one + * input per descriptor and sends the entityId as a JSON object keyed by descriptor names. + * Only 1-deep properties are described; nested objects are exposed as type "object" and + * left for the user to provide as raw JSON. + */ + val idFields: List = emptyList(), +) + +data class IdFieldDescriptor( + val name: String, + /** Normalized form-friendly type: "string", "number", "boolean", "uuid", or "object". */ + val type: String, + /** Fully qualified Java type name, useful for diagnostics / future extensions. */ + val javaType: String, +) + +data class ModelDomainEventsQuery( + val entityType: String, + val entityId: String, + /** FQ Java type name of the id type the user selected (must match one of [RegisteredEntityInfo.idTypes].type). */ + val idType: String, + val page: Int = 0, + val pageSize: Int = 10, +) + +data class ModelEntityStateAtSequenceQuery( + val entityType: String, + val entityId: String, + /** FQ Java type name of the id type the user selected. */ + val idType: String, + val maxSequenceNumber: Long = 0, +) + +data class ModelTimelineQuery( + val entityType: String, + val entityId: String, + /** FQ Java type name of the id type the user selected. */ + val idType: String, + val offset: Int = 0, + val limit: Int = 100, +) + +data class ModelTimelineResult( + val entityType: String, + val entityId: String, + val entries: List, + val offset: Int = 0, + val totalEvents: Int, + val truncated: Boolean, +) + +data class ModelTimelineEntry( + val sequenceNumber: Long, + /** + * ISO-8601 formatted timestamp (from [java.time.Instant.toString]). + * String is used here — instead of [java.time.Instant] — to avoid ambiguity in how + * the different serializers (CBOR on the RSocket leg, Jackson on the query-handler leg) + * encode the Instant: some emit an epoch-seconds number, which the frontend would + * then incorrectly treat as milliseconds. + */ + val timestamp: String, + val eventType: String, + val eventPayload: String?, + val stateBefore: String?, + val stateAfter: String?, +) diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index 3591be7..f80e9fa 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -170,6 +170,14 @@ public void enhance(ComponentRegistry registry) { UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> { + // Only event processor modules expose a processorName; the doOnSubModules walker + // visits every sub-module (event-sourced entities, command/query modules, ...), so + // skipping non-processor modules here keeps AxoniqPlatformEventHandlingComponent + // from being constructed with a null processor name. + if (!(module instanceof PooledStreamingEventProcessorModule) + && !(module instanceof SubscribingEventProcessorModule)) { + return null; + } componentRegistry .registerDecorator(DecoratorDefinition.forType(EventHandlingComponent.class) .with((cc, name, delegate) -> @@ -181,7 +189,7 @@ public void enhance(ComponentRegistry registry) { .order(0)); return null; - }); + }, true); } /** diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt index d026996..34163cc 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/SetupPayloadCreator.kt @@ -81,7 +81,8 @@ class SetupPayloadCreator( heartbeat = true, threadDump = true, clientStatusUpdates = true, - licenseEntitlement = hasEntitlementManager() + licenseEntitlement = hasEntitlementManager(), + modelInspection = hasStateManager(), ) ) } @@ -346,6 +347,18 @@ class SetupPayloadCreator( } + /** + * Checks whether a StateManager has been registered, indicating AF5 model inspection support. + */ + private fun hasStateManager(): Boolean { + try { + val stateManagerClass = Class.forName("org.axonframework.modelling.StateManager") + return configuration.hasComponent(stateManagerClass) + } catch (_: ClassNotFoundException) { + return false + } + } + /** * Checks whether the PlatformLicenseSource have been configured, in which case we want updates of licenses from Platform. */ diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt new file mode 100644 index 0000000..757a3bf --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import org.axonframework.messaging.core.Context +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.modelling.EntityEvolver +import java.util.function.BiConsumer + +/** + * Wraps the underlying [EntityEvolver] of an [org.axonframework.eventsourcing.EventSourcingRepository] + * so that inspection-time replay can fire BEFORE/AFTER hooks per event without reimplementing + * AF5's dispatch. + * + * The hooks are no-ops unless the matching [Context.ResourceKey] resources are present on the + * [ProcessingContext], so command handling and normal event sourcing go through unchanged. + * + * Constructed by [AxoniqPlatformModelInspectionEnhancer], which detects the inner + * [org.axonframework.eventsourcing.EventSourcingRepository] in the decorator chain and reconstructs + * it with this wrapper substituted for its `entityEvolver` argument. + */ +class AxoniqPlatformEntityEvolver( + private val delegate: EntityEvolver, +) : EntityEvolver { + + companion object { + /** Called before [delegate.evolve]. Receives the event and the pre-evolve entity state. */ + val BEFORE_CONSUMER: Context.ResourceKey> = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.BEFORE_CONSUMER") + + /** Called after [delegate.evolve]. Receives the event and the post-evolve entity state. */ + val AFTER_CONSUMER: Context.ResourceKey> = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.AFTER_CONSUMER") + + /** + * Zero-based event index — when present, [evolve] returns the current entity unchanged + * once it has applied this many events. Lets inspection reconstruct state up to a given + * sequence without doing the bookkeeping outside the framework. + */ + val MAX_INDEX: Context.ResourceKey = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.MAX_INDEX") + + /** Internal counter advanced by [evolve] when [MAX_INDEX] is set. */ + val INDEX_COUNTER: Context.ResourceKey = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.INDEX_COUNTER") + } + + /** Tiny mutable holder so we can advance the index without re-putting a Long resource each call. */ + class LongCounter(var value: Long = 0) + + override fun evolve(entity: E, event: EventMessage, context: ProcessingContext): E { + val maxIndex = context.getResource(MAX_INDEX) + if (maxIndex != null) { + val counter = context.computeResourceIfAbsent(INDEX_COUNTER) { LongCounter() } + if (counter.value > maxIndex) { + return entity + } + counter.value++ + } + context.getResource(BEFORE_CONSUMER)?.accept(event, entity) + val result = delegate.evolve(entity, event, context) + context.getResource(AFTER_CONSUMER)?.accept(event, result) + return result + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java new file mode 100644 index 0000000..07eb91f --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing; + +import io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer; +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; +import org.axonframework.common.configuration.ComponentRegistry; +import org.axonframework.common.configuration.ConfigurationEnhancer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service-loaded enhancer that wires the {@link RSocketModelInspectionResponder} when the + * application has the platform client connected ({@link RSocketHandlerRegistrar} present) and + * {@code axon-eventsourcing} is on the classpath. + * + *

This class is deliberately free of direct references to {@code axon-eventsourcing} types + * so it can be loaded even when event sourcing is absent from the classpath. The actual + * decorator wiring lives in {@link ModelInspectionDecorators} and is only touched after the + * runtime classpath probe succeeds.

+ * + *

We deliberately do not probe for {@code StateManager} either: it's registered by + * {@code ModellingConfigurationDefaults} at {@link Integer#MAX_VALUE}, after this enhancer's + * order, so the probe would falsely return {@code false} during boot.

+ */ +public class AxoniqPlatformModelInspectionEnhancer implements ConfigurationEnhancer { + + private static final Logger logger = LoggerFactory.getLogger(AxoniqPlatformModelInspectionEnhancer.class); + private static final String EVENTSOURCING_PROBE_CLASS = "org.axonframework.eventsourcing.eventstore.EventStorageEngine"; + + @Override + public void enhance(ComponentRegistry registry) { + if (!registry.hasComponent(RSocketHandlerRegistrar.class)) { + return; + } + if (!isClasspathAvailable()) { + logger.debug("axon-eventsourcing not on classpath; skipping model inspection wiring."); + return; + } + ModelInspectionDecorators.apply(registry); + } + + @Override + public int order() { + return AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + 1; + } + + private static boolean isClasspathAvailable() { + try { + Class.forName(EVENTSOURCING_PROBE_CLASS, false, + AxoniqPlatformModelInspectionEnhancer.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java new file mode 100644 index 0000000..c1c8d2e --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing; + +import io.axoniq.platform.framework.ReflectionKt; +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; +import org.axonframework.common.configuration.ComponentDefinition; +import org.axonframework.common.configuration.ComponentRegistry; +import org.axonframework.common.configuration.DecoratorDefinition; +import org.axonframework.common.lifecycle.Phase; +import org.axonframework.eventsourcing.EventSourcedEntityFactory; +import org.axonframework.eventsourcing.EventSourcingRepository; +import org.axonframework.eventsourcing.eventstore.EventStorageEngine; +import org.axonframework.eventsourcing.eventstore.EventStore; +import org.axonframework.eventsourcing.handler.SourcingHandler; +import org.axonframework.modelling.EntityEvolver; +import org.axonframework.modelling.repository.Repository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +/** + * Holds the actual decorator and component wiring for model inspection. Kept separate from + * {@link AxoniqPlatformModelInspectionEnhancer} so the enhancer class can be loaded even when + * {@code axon-eventsourcing} is not on the classpath — this class is only touched after a + * {@code Class.forName} probe confirms the module is present. + * + *

We do not walk submodules: AF5's nested module structure shares a single + * {@link org.axonframework.common.configuration.DefaultComponentRegistry} (each {@code BaseModule} + * resolves the parent's {@code ComponentRegistry} component instead of creating its own), so a + * single {@code Repository} decorator at the top covers every event-sourced entity in the + * application — top-level or arbitrarily nested.

+ */ +final class ModelInspectionDecorators { + + private static final Logger logger = LoggerFactory.getLogger(ModelInspectionDecorators.class); + + private ModelInspectionDecorators() { + } + + static void apply(ComponentRegistry registry) { + if (!registry.hasComponent(EventStorageEngine.class)) { + return; + } + // The enhancer pipeline can fire multiple times against the same registry as nested + // module configurations build. Idempotency guard: once the responder is in place, the + // decorator and its lifecycle hook are already registered, so re-running would + // duplicate the wrapping and double-evolve every event. + if (registry.hasComponent(RSocketModelInspectionResponder.class)) { + return; + } + + registry.registerComponent(ComponentDefinition + .ofType(RSocketModelInspectionResponder.class) + .withBuilder(c -> new RSocketModelInspectionResponder( + c.getComponent(EventStorageEngine.class), + c.getComponent(RSocketHandlerRegistrar.class), + c)) + .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start)); + + // Single decorator at the top covers every Repository registered in the application, + // top-level or nested — AF5's nested modules share the same component registry. + // + // The decorator reconstructs the underlying EventSourcingRepository with entityEvolver + // wrapped in AxoniqPlatformEntityEvolver. We deliberately do NOT decorate the registered + // EntityMetamodel — AnnotatedEventSourcedEntityModule casts the registered metamodel to + // AnnotatedEntityMetamodel inside its EntityIdResolver builder, and a wrapper would + // make that cast fail at startup. + // + // The .onStart hook then registers the rebuilt repository with the responder so it + // knows about this entity for the registered-entities query. + registry.registerDecorator(DecoratorDefinition + .forType(Repository.class) + .with((config, name, delegate) -> rebuildIfEventSourcingRepository(delegate)) + .onStart(Phase.LOCAL_MESSAGE_HANDLER_REGISTRATIONS, (configuration, component) -> { + configuration.getComponent(RSocketModelInspectionResponder.class) + .registerRepository(component); + return CompletableFuture.completedFuture(null); + })); + } + + /** + * Walks the wrapper chain from {@code delegate} downward to find an + * {@link EventSourcingRepository}, reconstructs that ESR with {@code entityEvolver} wrapped + * in {@link AxoniqPlatformEntityEvolver}, and swaps the wrapping component's {@code delegate} + * field to point at the new ESR. The outer wrapper(s) are kept intact so any platform-side + * decoration (e.g. {@code AxoniqPlatformRepository} for metrics) still applies. + * + *

Why peel rather than match {@code instanceof EventSourcingRepository} on the input: + * by the time this decorator runs, lower-order decorators (notably the metrics-adding + * {@code AxoniqPlatformRepository} from the modelling layer at {@code Integer.MIN_VALUE}) + * have already wrapped the ESR. Matching directly would miss every real configuration.

+ * + *

Logged-and-passthrough on reflection failure: if the field layout shifts in a future + * AF release we don't want to break command handling, just lose inspection hooks.

+ */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static Repository rebuildIfEventSourcingRepository(Repository delegate) { + Object current = delegate; + Object parent = null; + while (current != null && !(current instanceof EventSourcingRepository)) { + parent = current; + current = ReflectionKt.getPropertyValue(current, "delegate"); + } + if (!(current instanceof EventSourcingRepository esr)) { + return delegate; + } + try { + Class idType = ReflectionKt.getPropertyValue(esr, "idType"); + Class entityType = ReflectionKt.getPropertyValue(esr, "entityType"); + EventStore eventStore = ReflectionKt.getPropertyValue(esr, "eventStore"); + EventSourcedEntityFactory factory = ReflectionKt.getPropertyValue(esr, "entityFactory"); + EntityEvolver evolver = ReflectionKt.getPropertyValue(esr, "entityEvolver"); + SourcingHandler sourcingHandler = ReflectionKt.getPropertyValue(esr, "sourcingHandler"); + + EntityEvolver wrappedEvolver = new AxoniqPlatformEntityEvolver(evolver); + EventSourcingRepository rebuilt = new EventSourcingRepository( + idType, + entityType, + eventStore, + factory, + wrappedEvolver, + sourcingHandler + ); + if (parent == null) { + // No wrapper between us and the ESR — return the rebuilt ESR directly. + return rebuilt; + } + // Swap the parent wrapper's delegate to point at the rebuilt ESR. Keeps any outer + // wrappers (metrics, etc.) intact, just rewires the bottom of the chain. + ReflectionKt.setPropertyValue(parent, "delegate", rebuilt); + return delegate; + } catch (Exception e) { + logger.warn("[ModelInspection] Could not reconstruct EventSourcingRepository for [{}] — " + + "inspection hooks will be unavailable for this entity, but command handling is unaffected: {}", + esr.entityType().getName(), e.getMessage()); + return delegate; + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt new file mode 100644 index 0000000..8ce31b3 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt @@ -0,0 +1,524 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import com.fasterxml.jackson.annotation.JsonAutoDetect +import com.fasterxml.jackson.annotation.PropertyAccessor +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import io.axoniq.platform.framework.api.* +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.truncateToBytes +import org.axonframework.common.configuration.Configuration +import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.modelling.repository.Repository +import org.slf4j.LoggerFactory +import java.util.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import java.util.function.BiConsumer + +open class RSocketModelInspectionResponder( + @Suppress("unused") private val eventStorageEngine: EventStorageEngine, + private val registrar: RSocketHandlerRegistrar, + private val configuration: Configuration, +) { + private val logger = LoggerFactory.getLogger(this::class.java) + + private val objectMapper = ObjectMapper().apply { + findAndRegisterModules() + disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) + // AF5 entities are typically Kotlin data classes / Java records with private fields and + // no public bean-style getters. Field access lets Jackson surface the actual state + // instead of `{}`. + setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE) + setVisibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE) + } + + private val unitOfWorkFactory: UnitOfWorkFactory by lazy { + configuration.getComponent(UnitOfWorkFactory::class.java) + } + + /** + * Repositories collected at boot from each event-sourced submodule via the decorator hook + * in [AxoniqPlatformModelInspectionEnhancer]. Replaces walking the top-level state manager, + * which only sees the top-level state manager and misses everything registered in submodules. + */ + private val repositories = ConcurrentHashMap, Class<*>>, Repository>() + + @Suppress("UNCHECKED_CAST") + fun registerRepository(repository: Repository<*, *>) { + val key = repository.entityType() to repository.idType() + repositories[key] = repository as Repository + logger.info("[ModelInspection] Registered repository for entity=[{}] id=[{}]", + repository.entityType().name, repository.idType().name) + } + + fun start() { + registrar.registerHandlerWithoutPayload( + Routes.Model.REGISTERED_ENTITIES, + this::handleRegisteredEntities, + ) + registrar.registerHandlerWithPayload( + Routes.Model.DOMAIN_EVENTS, + ModelDomainEventsQuery::class.java, + this::handleDomainEvents, + ) + registrar.registerHandlerWithPayload( + Routes.Model.ENTITY_STATE_AT_SEQUENCE, + ModelEntityStateAtSequenceQuery::class.java, + this::handleEntityStateAtSequence, + ) + registrar.registerHandlerWithPayload( + Routes.Model.REPLAY_TIMELINE, + ModelTimelineQuery::class.java, + this::handleTimelineReplay, + ) + } + + // ------------------------------------------------------------------------------------------ + // Registered entities introspection + // ------------------------------------------------------------------------------------------ + + internal fun handleRegisteredEntities(): RegisteredEntitiesResult { + logger.debug("Handling Axoniq Platform MODEL_REGISTERED_ENTITIES query") + val grouped: Map, List>> = repositories.keys + .groupBy({ it.first }, { it.second }) + + val entities = grouped.map { (entityType, idClasses) -> + RegisteredEntityInfo( + entityType = entityType.name, + idTypes = idClasses.map { idClass -> + IdType( + type = idClass.name, + idFields = describeIdFields(idClass), + ) + }, + ) + } + logger.debug("Found entities: {}", entities) + return RegisteredEntitiesResult(entities = entities) + } + + /** + * Returns structural descriptors for the given id class. Empty for "simple" types (single + * text input on the frontend); populated for compound types (one input per descriptor). + */ + internal fun describeIdFields(idClass: Class<*>): List { + if (isSimpleIdType(idClass)) { + return emptyList() + } + if (idClass.isRecord) { + return idClass.recordComponents.map { component -> + IdFieldDescriptor( + name = component.name, + type = normalizedType(component.type), + javaType = component.type.name, + ) + } + } + return idClass.declaredFields + .filter { field -> + !java.lang.reflect.Modifier.isStatic(field.modifiers) && !field.isSynthetic + } + .map { field -> + IdFieldDescriptor( + name = field.name, + type = normalizedType(field.type), + javaType = field.type.name, + ) + } + } + + internal fun isSimpleIdType(idClass: Class<*>): Boolean { + if (idClass.isPrimitive) return true + if (idClass.isEnum) return true + if (isKotlinValueClass(idClass)) { + val underlying = kotlinValueClassUnderlying(idClass) + return underlying != null && isSimpleIdType(underlying) + } + return when (idClass) { + String::class.java, + java.lang.Long::class.java, + java.lang.Integer::class.java, + java.lang.Short::class.java, + java.lang.Byte::class.java, + java.lang.Double::class.java, + java.lang.Float::class.java, + java.lang.Boolean::class.java, + java.lang.Character::class.java, + UUID::class.java, + java.math.BigInteger::class.java, + java.math.BigDecimal::class.java -> true + else -> false + } + } + + private fun isKotlinValueClass(idClass: Class<*>): Boolean = + idClass.isAnnotationPresent(JvmInline::class.java) + + private fun kotlinValueClassUnderlying(idClass: Class<*>): Class<*>? { + val instanceFields = idClass.declaredFields.filter { f -> + !java.lang.reflect.Modifier.isStatic(f.modifiers) && !f.isSynthetic + } + return instanceFields.singleOrNull()?.type + } + + internal fun normalizedType(type: Class<*>): String { + if (type == UUID::class.java) return "uuid" + if (type == String::class.java) return "string" + if (type == java.lang.Boolean::class.java || type == java.lang.Boolean.TYPE) return "boolean" + if (type == java.lang.Character::class.java || type == java.lang.Character.TYPE) return "string" + if (Number::class.java.isAssignableFrom(type) || type.isPrimitive) return "number" + return "object" + } + + // ------------------------------------------------------------------------------------------ + // Id deserialization + // ------------------------------------------------------------------------------------------ + + private fun deserializeEntityId(entityId: String, idClass: Class<*>): Any? { + val trimmed = entityId.trim() + return when { + idClass == String::class.java -> trimmed + idClass == UUID::class.java -> UUID.fromString(trimmed) + idClass == java.lang.Long::class.java || idClass == java.lang.Long.TYPE -> trimmed.toLong() + idClass == java.lang.Integer::class.java || idClass == java.lang.Integer.TYPE -> trimmed.toInt() + idClass == java.lang.Short::class.java || idClass == java.lang.Short.TYPE -> trimmed.toShort() + idClass == java.lang.Byte::class.java || idClass == java.lang.Byte.TYPE -> trimmed.toByte() + idClass == java.lang.Double::class.java || idClass == java.lang.Double.TYPE -> trimmed.toDouble() + idClass == java.lang.Float::class.java || idClass == java.lang.Float.TYPE -> trimmed.toFloat() + idClass == java.lang.Boolean::class.java || idClass == java.lang.Boolean.TYPE -> trimmed.toBoolean() + idClass == java.math.BigInteger::class.java -> java.math.BigInteger(trimmed) + idClass == java.math.BigDecimal::class.java -> java.math.BigDecimal(trimmed) + idClass.isEnum -> { + @Suppress("UNCHECKED_CAST") + java.lang.Enum.valueOf(idClass as Class>, trimmed) + } + isKotlinValueClass(idClass) -> deserializeValueClass(trimmed, idClass) + else -> objectMapper.readValue(trimmed, idClass) + } + } + + private fun deserializeValueClass(raw: String, idClass: Class<*>): Any? { + val underlying = kotlinValueClassUnderlying(idClass) ?: return null + val underlyingValue = deserializeEntityId(raw, underlying) ?: return null + return try { + idClass.getDeclaredConstructor(underlying) + .apply { isAccessible = true } + .newInstance(underlyingValue) + } catch (e: NoSuchMethodException) { + logger.debug("No public constructor({}) on Kotlin value class [{}]; falling back to Jackson", + underlying.name, idClass.name) + objectMapper.readValue(raw, idClass) + } + } + + // ------------------------------------------------------------------------------------------ + // UoW-driven inspection load + // ------------------------------------------------------------------------------------------ + + /** + * Resolves the (entityType, idType) pair to the registered repository. Returns null if no + * matching repository was registered at boot — e.g. the entity exists in a non-event-sourced + * module, or the user-supplied class names don't resolve. + */ + private fun lookupRepository(entityType: Class<*>, idType: Class<*>): Repository? { + return repositories[entityType to idType] + } + + /** + * Runs [block] inside a real [org.axonframework.messaging.core.unitofwork.UnitOfWork], wiring + * the supplied hooks onto the [ProcessingContext] so the framework's own event-sourcing + * pipeline drives state replay. The repository's load is invoked through the framework path — + * criteria resolution, payload conversion, and `@EventSourcingHandler` dispatch all happen as + * they would in a real command flow. We never append events, so commit is a no-op for storage. + */ + private fun withInspectionUoW( + repository: Repository, + typedId: Any, + beforeConsumer: BiConsumer? = null, + afterConsumer: BiConsumer? = null, + maxIndex: Long? = null, + extract: (entity: Any?) -> R, + ): R { + return unitOfWorkFactory.create().executeWithResult { ctx: ProcessingContext -> + beforeConsumer?.let { ctx.putResource(AxoniqPlatformEntityEvolver.BEFORE_CONSUMER, it) } + afterConsumer?.let { ctx.putResource(AxoniqPlatformEntityEvolver.AFTER_CONSUMER, it) } + maxIndex?.let { ctx.putResource(AxoniqPlatformEntityEvolver.MAX_INDEX, it) } + + repository.load(typedId, ctx).thenApply { managed -> + extract(managed?.entity()) + } + }.get() + } + + // ------------------------------------------------------------------------------------------ + // Event payload utilities + // ------------------------------------------------------------------------------------------ + + /** + * Extracts a human-readable type name. Events read from [EventStorageEngine] often have a + * raw `byte[]` payload whose `payloadType()` returns `[B`; the proper event type is in + * `message.type().name()`. + */ + private fun extractPayloadTypeName(message: EventMessage): String { + return try { + message.type()?.name() ?: message.payloadType().name + } catch (e: Exception) { + message.payloadType().name + } + } + + /** + * Converts the event payload to a String. Payloads sourced from the event store are + * usually raw `byte[]` containing JSON or CBOR. UTF-8 first (works for JSON) and Jackson as + * fallback for typed payloads. + */ + private fun extractPayloadAsString(message: EventMessage): String? { + val payload = message.payload() ?: return null + return when (payload) { + is ByteArray -> try { + String(payload, Charsets.UTF_8) + } catch (e: Exception) { + payload.toString() + } + is String -> payload + else -> try { + objectMapper.writeValueAsString(payload) + } catch (e: Exception) { + payload.toString() + } + } + } + + private fun stateAsJson(state: Any?): String? { + if (state == null) return null + return try { + objectMapper.writeValueAsString(state) + } catch (e: Exception) { + logger.debug("Could not serialize entity state of type [{}]: {}", + state::class.java.name, e.message) + null + } + } + + // ------------------------------------------------------------------------------------------ + // Query handlers + // ------------------------------------------------------------------------------------------ + + internal fun handleDomainEvents(query: ModelDomainEventsQuery): DomainEventsResult { + logger.info("Handling Axoniq Platform MODEL_DOMAIN_EVENTS query for entity [{}] id [{}] idType [{}]", + query.entityType, query.entityId, query.idType) + + val entityClass = Class.forName(query.entityType) + val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return DomainEventsResult( + entityId = query.entityId, + entityType = query.entityType, + domainEvents = emptyList(), + page = query.page, + pageSize = query.pageSize, + totalCount = 0L, + ) + val typedId = deserializeEntityId(query.entityId, idClass) + ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") + + val collected = mutableListOf() + try { + withInspectionUoW( + repository = repository, + typedId = typedId, + beforeConsumer = { event, _ -> + collected += DomainEvent( + sequenceNumber = collected.size.toLong(), + timestamp = event.timestamp(), + payloadType = extractPayloadTypeName(event), + payload = extractPayloadAsString(event), + ) + }, + extract = {}, + ) + } catch (e: Exception) { + logger.error("Error while sourcing events for entity [{}] id [{}]", + query.entityType, query.entityId, e) + } + + val totalCount = collected.size.toLong() + val start = query.page * query.pageSize + val end = minOf(start + query.pageSize, collected.size) + val pagedEvents = if (start < collected.size) collected.subList(start, end) else emptyList() + + return DomainEventsResult( + entityId = query.entityId, + entityType = query.entityType, + domainEvents = pagedEvents, + page = query.page, + pageSize = query.pageSize, + totalCount = totalCount, + ) + } + + internal fun handleEntityStateAtSequence(query: ModelEntityStateAtSequenceQuery): EntityStateResult { + logger.info("Handling Axoniq Platform MODEL_ENTITY_STATE_AT_SEQUENCE query for entity [{}] id [{}] idType [{}] seq [{}]", + query.entityType, query.entityId, query.idType, query.maxSequenceNumber) + + val entityClass = Class.forName(query.entityType) + val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return EntityStateResult( + type = query.entityType, + entityId = query.entityId, + maxSequenceNumber = query.maxSequenceNumber, + state = null, + ) + val typedId = deserializeEntityId(query.entityId, idClass) + ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") + + val finalState = try { + withInspectionUoW( + repository = repository, + typedId = typedId, + // Negative sequence = "all events" — don't set MAX_INDEX so nothing is skipped. + maxIndex = if (query.maxSequenceNumber < 0) null else query.maxSequenceNumber, + extract = { entity -> entity }, + ) + } catch (e: Exception) { + logger.error("Error while reconstructing state for entity [{}] id [{}]", + query.entityType, query.entityId, e) + null + } + + return EntityStateResult( + type = query.entityType, + entityId = query.entityId, + maxSequenceNumber = query.maxSequenceNumber, + state = stateAsJson(finalState), + ) + } + + internal fun handleTimelineReplay(query: ModelTimelineQuery): ModelTimelineResult { + logger.info("Handling Axoniq Platform MODEL_REPLAY_TIMELINE query for entity [{}] id [{}] idType [{}] offset [{}] limit [{}]", + query.entityType, query.entityId, query.idType, query.offset, query.limit) + + val entityClass = Class.forName(query.entityType) + val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return ModelTimelineResult( + entityType = query.entityType, + entityId = query.entityId, + entries = emptyList(), + offset = query.offset, + totalEvents = 0, + truncated = false, + ) + val typedId = deserializeEntityId(query.entityId, idClass) + ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") + + val offset = maxOf(0, query.offset) + val limit = if (query.limit <= 0) 100 else query.limit + // Per-entry truncation budgets sized so a page-of-100 timeline response stays under + // ~2.5 MB pre-gzip (≈ 750 KB post-gzip with the RSocket gzip extension enabled). + // Each entry holds an event payload + before + after state — keep events tight (5 KB) + // and snapshots conservative (20 KB) since most entities serialize well below either + // bound. Larger payloads still come through truncated with a "[truncated]" marker. + val maxStateSizeBytes = 20 * 1024 + val maxEventSizeBytes = 5 * 1024 + + val entries = mutableListOf() + val totalEvents = intArrayOf(0) + // BEFORE/AFTER snapshots have to be paired — capture stateBefore in BEFORE_CONSUMER so it + // reflects the pre-evolve state even when the entity mutates in place. + val pending = arrayOfNulls(2) // [eventMessage, stateBeforeJson] + + try { + withInspectionUoW( + repository = repository, + typedId = typedId, + beforeConsumer = { event, stateBefore -> + pending[0] = event + pending[1] = stateAsJson(stateBefore).truncateToBytes(maxStateSizeBytes) + }, + afterConsumer = { event, stateAfter -> + val seq = totalEvents[0].toLong() + totalEvents[0]++ + if (seq >= offset && entries.size < limit) { + entries += ModelTimelineEntry( + sequenceNumber = seq, + timestamp = event.timestamp().toString(), + eventType = extractPayloadTypeName(event), + eventPayload = extractPayloadAsString(event).truncateToBytes(maxEventSizeBytes), + stateBefore = pending[1] as String?, + stateAfter = stateAsJson(stateAfter).truncateToBytes(maxStateSizeBytes), + ) + } + pending[0] = null + pending[1] = null + }, + extract = {}, + ) + } catch (e: Exception) { + logger.error("Error while sourcing events for timeline of entity [{}] id [{}]", + query.entityType, query.entityId, e) + } + + val remainingAfterWindow = maxOf(0, totalEvents[0] - offset - entries.size) + val truncated = remainingAfterWindow > 0 + // Drop stateBefore on every entry except the first in the page: for i > 0, + // entries[i].stateBefore is exactly entries[i-1].stateAfter, so sending both is + // redundant. The FE rehydrates the field by looking back one position. On a + // page-of-100 with 20 KB snapshots this saves ~99 × 20 KB ≈ 1.9 MB raw per + // response (~45% of the pre-gzip envelope). + trimRedundantStateBefore(entries) + logger.info("Sourced [{}] events for timeline of [{}] id [{}] (returning [{}] from offset [{}], truncated={})", + totalEvents[0], query.entityType, query.entityId, entries.size, offset, truncated) + + return ModelTimelineResult( + entityType = query.entityType, + entityId = query.entityId, + entries = entries, + offset = offset, + totalEvents = totalEvents[0], + truncated = truncated, + ) + } + + /** + * Blanks the `stateBefore` field on every entry past the first. The FE rehydrates these + * positions from the previous entry's `stateAfter` since the two are equal by definition + * of event sourcing — so transmitting both is wasted bytes (a 20 KB string per entry). + * + * The first entry of each page keeps its `stateBefore` because the FE has no in-band + * lookback at the page boundary; transmitting it preserves the "show pre-event state" UX + * without requiring a separate request for the previous page's tail. + * + * Visible for tests so the post-processing logic can be exercised without standing up + * a full event-sourcing context. + */ + internal fun trimRedundantStateBefore(entries: MutableList) { + for (i in 1 until entries.size) { + if (entries[i].stateBefore != null) { + entries[i] = entries[i].copy(stateBefore = null) + } + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt index 4d3241e..60d5a25 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt @@ -58,10 +58,6 @@ class HandlerMeasurement( } fun registerMetricValue(metric: Metric, timeInNs: Long) { - if (completedTime != null) { - logger.warn { "HandlerMeasurement for handler [${message.type()}] is already completed. Can not register metric [$metric] with value [$timeInNs]. Ignoring." } - return - } registeredMetrics.compute(metric) { _, it -> // Sum the metric if it was already registered (it ?: 0L) + timeInNs @@ -69,10 +65,6 @@ class HandlerMeasurement( } fun reportMessageDispatched(messageIdentifier: MessageIdentifier) { - if (completedTime != null) { - logger.warn { "HandlerMeasurement for handler [${message.type()}] is already completed. Can not report dispatched message [$messageIdentifier]. Ignoring." } - return - } dispatchedMessages.add(messageIdentifier) } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt index 8ac2f69..f029c5b 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,10 @@ class AxoniqPlatformStateManager( private val delegate: StateManager ): StateManager { override fun register(repository: Repository): StateManager { + if(repository is AxoniqPlatformRepository) { + delegate.register(repository) + return this + } delegate.register(AxoniqPlatformRepository(repository)) return this } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java index 6c80d20..3a7c4e5 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java @@ -35,21 +35,27 @@ private ModellingDecorators() { static void apply(ComponentRegistry registry) { registry.registerDecorator(DecoratorDefinition.forType(StateManager.class) - .with((cc, name, delegate) -> - new AxoniqPlatformStateManager(delegate)) + .with((cc, name, delegate) -> { + if(delegate instanceof AxoniqPlatformStateManager) { + return delegate; + } + return new AxoniqPlatformStateManager(delegate); + }) .order(Integer.MAX_VALUE)); UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> { componentRegistry .registerDecorator(DecoratorDefinition.forType(Repository.class) .with((cc, name, delegate) -> + delegate instanceof AxoniqPlatformRepository ? delegate : new AxoniqPlatformRepository<>(delegate)) .order(Integer.MIN_VALUE)) .registerDecorator(DecoratorDefinition.forType(StateManager.class) .with((cc, name, delegate) -> + delegate instanceof AxoniqPlatformStateManager ? delegate : new AxoniqPlatformStateManager(delegate)) .order(Integer.MAX_VALUE)); return null; - }); + }, true); } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt index 6eefda3..2fcfbbc 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt @@ -18,6 +18,7 @@ package io.axoniq.platform.framework import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer import org.axonframework.common.ReflectionUtils +import org.axonframework.common.configuration.BaseModule import org.axonframework.common.configuration.ComponentRegistry import org.axonframework.common.configuration.Module import java.lang.reflect.Field @@ -188,13 +189,19 @@ fun String?.truncateToBytes(maxBytes: Int): String? { return truncatedContent + truncationMessage } -fun ComponentRegistry.doOnSubModules(block: (ComponentRegistry, Module?) -> Unit) { +fun ComponentRegistry.doOnSubModules(block: (ComponentRegistry, Module?) -> Unit, recursive: Boolean = true) { val modules = this.getPropertyValue>("modules") modules?.forEach { entry -> val module = entry.value - module.getPropertyValue("componentRegistry")?.let { cr -> - block(cr, module) - cr.doOnSubModules(block) + block(this, module) + if (recursive && module is BaseModule<*>) { + // BaseModule's nested registry only materialises when the module is built. Defer + // the inner walk via the public componentRegistry(action) API; the action runs on + // the module's own registry at build time, with arbitrary-depth nesting visible to + // recursive doOnSubModules calls inside. + module.componentRegistry { innerRegistry -> + innerRegistry.doOnSubModules(block, recursive) + } } } } \ No newline at end of file diff --git a/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer b/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer index d69912f..7dd9a2c 100644 --- a/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer +++ b/framework-client/src/main/resources/META-INF/services/org.axonframework.common.configuration.ConfigurationEnhancer @@ -17,4 +17,5 @@ io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer io.axoniq.platform.framework.messaging.distributed.AxoniqPlatformDistributedMessagingConfigurerEnhancer io.axoniq.platform.framework.modelling.AxoniqPlatformModellingConfigurationEnhancer -io.axoniq.platform.framework.eventsourcing.AxoniqPlatformEventsourcingConfigurerEnhancer \ No newline at end of file +io.axoniq.platform.framework.eventsourcing.AxoniqPlatformEventsourcingConfigurerEnhancer +io.axoniq.platform.framework.eventsourcing.AxoniqPlatformModelInspectionEnhancer \ No newline at end of file diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt new file mode 100644 index 0000000..dca2a61 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.ComponentRegistry +import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +/** + * Verifies the guard in [AxoniqPlatformModelInspectionEnhancer]: the responder must register + * only when the platform client is wired ([RSocketHandlerRegistrar] present) and an + * [EventStorageEngine] is available. Missing either is the no-event-sourcing / no-platform-client + * case where registering would have nothing to act on. + * + * The enhancer itself does not probe for [EventStorageEngine] directly — it first probes for + * {@code axon-eventsourcing} on the classpath and delegates the rest of the wiring to + * [ModelInspectionDecorators], which is where the [EventStorageEngine] check lives. In tests + * the classpath probe always succeeds (axon-eventsourcing is a test dependency), so we exercise + * both branches via the registry mocks. + * + * StateManager is intentionally NOT probed: ModellingConfigurationDefaults registers it at + * Integer.MAX_VALUE, after this enhancer's order, so the probe would falsely return false + * during a real boot. + */ +class AxoniqPlatformModelInspectionEnhancerTest { + + private val enhancer = AxoniqPlatformModelInspectionEnhancer() + + @Test + fun `registers the responder when EventStorageEngine and RSocketHandlerRegistrar are both present`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(EventStorageEngine::class.java) } returns true + every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true + + enhancer.enhance(registry) + + verify(exactly = 1) { registry.registerComponent(any>()) } + } + + @Test + fun `skips registration when EventStorageEngine is missing — typical AF4 application`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(EventStorageEngine::class.java) } returns false + every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + @Test + fun `skips registration when RSocketHandlerRegistrar is missing — console client not wired`() { + val registry = mockk(relaxed = true) + every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns false + + enhancer.enhance(registry) + + verify(exactly = 0) { registry.registerComponent(any>()) } + } + + @Test + fun `runs after the platform configurer enhancer so its components are visible`() { + // The responder builder reads multiple components from the configuration during + // start; this enhancer must run AFTER the platform enhancer that registers them. + // Concretely: order > PLATFORM_ENHANCER_ORDER (currently +1 above it). + val platformOrder = io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + assertEquals(platformOrder + 1, enhancer.order()) + } +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt new file mode 100644 index 0000000..df31e58 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelTimelineEntry +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.mockk.mockk +import org.axonframework.common.configuration.Configuration +import org.axonframework.eventsourcing.eventstore.EventStorageEngine +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.math.BigDecimal +import java.math.BigInteger +import java.util.UUID + +/** + * Unit tests for the pure-logic helpers on [RSocketModelInspectionResponder] that don't + * require a live AF5 configuration / event store. These helpers govern the FE id-type form, + * so regressing them silently breaks the inspection UI. + */ +class RSocketModelInspectionResponderHelpersTest { + + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + // The helpers under test never reach into these dependencies, so simple unrecorded + // mocks are enough — we don't need MockK relaxed mocks elsewhere. + responder = RSocketModelInspectionResponder( + eventStorageEngine = mockk(), + registrar = mockk(), + configuration = mockk(), + ) + } + + // --------------------------------------------------------------------------------------- + // isSimpleIdType + // + // Drives whether the FE renders a single text input or a multi-field form. False + // negatives produce a wrong UI for compound ids. + // --------------------------------------------------------------------------------------- + + @Test + fun `isSimpleIdType is true for String UUID and primitives`() { + assertTrue(responder.isSimpleIdType(String::class.java)) + assertTrue(responder.isSimpleIdType(UUID::class.java)) + assertTrue(responder.isSimpleIdType(java.lang.Long::class.java)) + assertTrue(responder.isSimpleIdType(java.lang.Integer::class.java)) + assertTrue(responder.isSimpleIdType(java.lang.Long.TYPE)) // primitive long + assertTrue(responder.isSimpleIdType(java.lang.Integer.TYPE)) // primitive int + assertTrue(responder.isSimpleIdType(BigDecimal::class.java)) + assertTrue(responder.isSimpleIdType(BigInteger::class.java)) + } + + @Test + fun `isSimpleIdType is true for enums`() { + assertTrue(responder.isSimpleIdType(SampleEnumId::class.java)) + } + + @Test + fun `isSimpleIdType is false for record-style compound ids`() { + assertFalse(responder.isSimpleIdType(SampleCompoundId::class.java)) + } + + @Test + fun `isSimpleIdType unwraps Kotlin value classes and tests their underlying type`() { + // SampleValueId wraps a String, so it should be classified as simple. + assertTrue(responder.isSimpleIdType(SampleValueId::class.java)) + } + + // --------------------------------------------------------------------------------------- + // normalizedType + // + // Maps Java types to FE-friendly strings consumed by EntityIdForm.vue field renderer. + // --------------------------------------------------------------------------------------- + + @Test + fun `normalizedType maps common Java types to FE strings`() { + assertEquals("string", responder.normalizedType(String::class.java)) + assertEquals("uuid", responder.normalizedType(UUID::class.java)) + assertEquals("number", responder.normalizedType(java.lang.Long::class.java)) + assertEquals("number", responder.normalizedType(java.lang.Integer.TYPE)) // primitive + assertEquals("number", responder.normalizedType(BigDecimal::class.java)) + assertEquals("boolean", responder.normalizedType(java.lang.Boolean::class.java)) + assertEquals("boolean", responder.normalizedType(java.lang.Boolean.TYPE)) + assertEquals("string", responder.normalizedType(java.lang.Character::class.java)) + // Anything we don't have a special case for falls through to "object". + assertEquals("object", responder.normalizedType(SampleCompoundId::class.java)) + } + + // --------------------------------------------------------------------------------------- + // describeIdFields + // + // This shape directly drives the FE multi-field form. Records expose recordComponents, + // POJOs expose declared fields, and simple types collapse to an empty list (single + // text input on the FE). + // --------------------------------------------------------------------------------------- + + @Test + fun `describeIdFields returns empty for simple id types`() { + assertTrue(responder.describeIdFields(String::class.java).isEmpty()) + assertTrue(responder.describeIdFields(UUID::class.java).isEmpty()) + assertTrue(responder.describeIdFields(java.lang.Long::class.java).isEmpty()) + } + + @Test + fun `describeIdFields exposes record components in declaration order with normalized types`() { + val descriptors = responder.describeIdFields(SampleCompoundId::class.java) + assertEquals(2, descriptors.size) + + assertEquals("showId", descriptors[0].name) + assertEquals("string", descriptors[0].type) + assertEquals(String::class.java.name, descriptors[0].javaType) + + assertEquals("seatNumber", descriptors[1].name) + assertEquals("number", descriptors[1].type) + assertEquals(java.lang.Integer.TYPE.name, descriptors[1].javaType) + } + + @Test + fun `describeIdFields exposes plain POJO declared fields and skips static synthetic`() { + val descriptors = responder.describeIdFields(SamplePojoId::class.java) + // STATIC_FIELD must not appear; only `tenant` and `code`. + assertEquals(listOf("tenant", "code"), descriptors.map { it.name }) + assertEquals(listOf("string", "number"), descriptors.map { it.type }) + } + + // --------------------------------------------------------------------------------------- + // trimRedundantStateBefore + // + // Trims stateBefore from every entry past the first in a page. The FE rehydrates these + // positions from the previous entry's stateAfter, so transmitting both is wasted bytes. + // Regressing this silently doubles a page-of-100 timeline response (~1.9 MB pre-gzip). + // --------------------------------------------------------------------------------------- + + @Test + fun `trimRedundantStateBefore keeps stateBefore on the first entry and nulls the rest`() { + val entries = mutableListOf( + entry(seq = 0, before = "{\"v\":\"initial\"}", after = "{\"v\":\"a\"}"), + entry(seq = 1, before = "{\"v\":\"a\"}", after = "{\"v\":\"b\"}"), + entry(seq = 2, before = "{\"v\":\"b\"}", after = "{\"v\":\"c\"}"), + ) + + responder.trimRedundantStateBefore(entries) + + assertEquals("{\"v\":\"initial\"}", entries[0].stateBefore) + assertNull(entries[1].stateBefore) + assertNull(entries[2].stateBefore) + // stateAfter and the other fields must survive untouched — the FE relies on the after + // chain for its lookback rehydration. + assertEquals("{\"v\":\"a\"}", entries[0].stateAfter) + assertEquals("{\"v\":\"b\"}", entries[1].stateAfter) + assertEquals("{\"v\":\"c\"}", entries[2].stateAfter) + } + + @Test + fun `trimRedundantStateBefore is a no-op on an empty page`() { + val entries = mutableListOf() + responder.trimRedundantStateBefore(entries) + assertTrue(entries.isEmpty()) + } + + @Test + fun `trimRedundantStateBefore is a no-op on a single-entry page (no later entries to trim)`() { + val entries = mutableListOf(entry(seq = 0, before = "{\"v\":\"only\"}", after = "{\"v\":\"a\"}")) + responder.trimRedundantStateBefore(entries) + assertEquals("{\"v\":\"only\"}", entries[0].stateBefore) + } + + @Test + fun `trimRedundantStateBefore leaves an already-null stateBefore alone`() { + // The very first event of an entity has no prior state, so the upstream collector + // may already emit stateBefore = null. The trim must not throw on that path. + val entries = mutableListOf( + entry(seq = 0, before = null, after = "{\"v\":\"a\"}"), + entry(seq = 1, before = "{\"v\":\"a\"}", after = "{\"v\":\"b\"}"), + ) + + responder.trimRedundantStateBefore(entries) + + assertNull(entries[0].stateBefore) + assertNull(entries[1].stateBefore) + } + + private fun entry(seq: Long, before: String?, after: String?): ModelTimelineEntry = + ModelTimelineEntry( + sequenceNumber = seq, + timestamp = "2026-01-01T00:00:00Z", + eventType = "SampleEvent", + eventPayload = "{}", + stateBefore = before, + stateAfter = after, + ) + + // --------------------------------------------------------------------------------------- + // Test fixtures + // --------------------------------------------------------------------------------------- + + enum class SampleEnumId { A, B } + + /** A typical AF5 compound entity id (record). Mirrors `ReservationId(showId, seatNumber)`. */ + @JvmRecord + data class SampleCompoundId(val showId: String, val seatNumber: Int) + + /** A plain POJO id with a static field that must be ignored. */ + @Suppress("unused") + class SamplePojoId(val tenant: String, val code: Int) { + companion object { + @JvmStatic + val STATIC_FIELD: String = "ignore-me" + } + } + + /** A Kotlin inline value class wrapping a String — should classify as simple. */ + @JvmInline + value class SampleValueId(val raw: String) +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt new file mode 100644 index 0000000..646f541 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelDomainEventsQuery +import io.axoniq.platform.framework.api.ModelEntityStateAtSequenceQuery +import io.axoniq.platform.framework.api.ModelTimelineQuery +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.BaseModule +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.Configuration +import org.axonframework.common.configuration.LifecycleRegistry +import org.axonframework.common.configuration.Module +import org.axonframework.eventsourcing.annotation.EventSourcedEntity +import org.axonframework.eventsourcing.annotation.EventTag +import org.axonframework.eventsourcing.annotation.reflection.EntityCreator +import org.axonframework.eventsourcing.annotation.reflection.InjectEntityId +import org.axonframework.eventsourcing.configuration.EventSourcedEntityModule +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.eventsourcing.annotation.EventSourcingHandler +import org.axonframework.messaging.commandhandling.configuration.CommandHandlingModule +import org.axonframework.messaging.eventhandling.EventSink +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.messaging.core.MessageType +import org.axonframework.modelling.SimpleStateManager +import org.axonframework.modelling.StateManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * End-to-end test for the model inspection feature: spins up a real + * [EventSourcingConfigurer] with the inspection enhancer wired in, registers an + * event-sourced entity in a sub-module, publishes events through the in-memory event + * store, then drives the responder's query handlers and asserts on their outputs. + * + * The submodule structure is what makes this interesting: the entity is registered as a + * nested module (via [EventSourcedEntityModule.autodetected]) so the enhancer's + * `doOnSubModules` walker has to drill in to find it. A bug there would cause the + * registered-entities query to come back empty. + */ +class RSocketModelInspectionResponderIntegrationTest { + + private lateinit var configuration: AxonConfiguration + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + // Build a minimal AF5 application: + // - InMemoryEventStorageEngine (default) + // - one annotated event-sourced entity registered as a sub-module + // - a stub RSocketHandlerRegistrar (the inspection enhancer requires its presence, + // but we don't exercise the RSocket transport — we call responder methods directly) + // - the AxoniqPlatformModelInspectionEnhancer registered manually + // The AxoniqPlatformModelInspectionEnhancer is auto-discovered via the + // META-INF/services SPI registration — no need to register it manually. + configuration = EventSourcingConfigurer.create() + .registerEntity(EventSourcedEntityModule.autodetected(String::class.java, Reservation::class.java)) + .componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(RSocketHandlerRegistrar::class.java) + .withBuilder { RSocketHandlerRegistrar(CborJackson3EncodingStrategy()) }) + } + .start() + + responder = configuration.getComponent(RSocketModelInspectionResponder::class.java) + + // Publish a known sequence of events for entity "RES-1". + val sink = configuration.getComponent(EventSink::class.java) + sink.publish( + null, + event(ReservationCreated("RES-1", "alice")), + event(ReservationConfirmed("RES-1")), + event(ReservationCancelled("RES-1", "double-booked")), + ).get() + } + + @AfterEach + fun tearDown() { + configuration.shutdown() + } + + private fun event(payload: Any) = GenericEventMessage( + MessageType(payload.javaClass), + payload, + ) + + // ------------------------------------------------------------------------------------------ + // Registered entities + // ------------------------------------------------------------------------------------------ + + @Test + fun `registered entities query discovers entities defined in nested modules`() { + val result = invokeRegisteredEntities() + assertEquals(1, result.entities.size, "expected the Reservation entity to be visible") + + val entity = result.entities.first() + assertEquals(Reservation::class.java.name, entity.entityType) + assertEquals(1, entity.idTypes.size) + assertEquals(String::class.java.name, entity.idTypes.first().type) + // String is a simple id type — no sub-fields surface to the FE. + assertTrue(entity.idTypes.first().idFields.isEmpty()) + } + + // ------------------------------------------------------------------------------------------ + // Domain events listing + // ------------------------------------------------------------------------------------------ + + @Test + fun `domain events query returns the published events in publication order with typed names`() { + val result = responder.handleDomainEvents(domainEventsQuery("RES-1")) + + assertEquals(3, result.totalCount, "all three events should be returned") + assertEquals(3, result.domainEvents.size) + + val payloadTypes = result.domainEvents.map { it.payloadType } + assertEquals( + listOf( + ReservationCreated::class.java.name, + ReservationConfirmed::class.java.name, + ReservationCancelled::class.java.name, + ), + payloadTypes, + ) + + // Sequence numbers are 0-indexed and dense across the listed events. + assertEquals(listOf(0L, 1L, 2L), result.domainEvents.map { it.sequenceNumber }) + } + + @Test + fun `domain events query returns empty result for an unknown entity id without throwing`() { + val result = responder.handleDomainEvents(domainEventsQuery("RES-DOES-NOT-EXIST")) + assertEquals(0, result.totalCount) + assertTrue(result.domainEvents.isEmpty()) + } + + // ------------------------------------------------------------------------------------------ + // Entity state at sequence + // ------------------------------------------------------------------------------------------ + + @Test + fun `entity state at sequence reconstructs intermediate state by replaying through the metamodel`() { + // After event 0 (created): status = CREATED, eventCount = 1 + val afterCreated = responder.handleEntityStateAtSequence(stateQuery("RES-1", 0)) + assertNotNull(afterCreated.state) + assertTrue(afterCreated.state!!.contains("\"status\":\"CREATED\""), afterCreated.state) + assertTrue(afterCreated.state!!.contains("\"eventCount\":1"), afterCreated.state) + + // After event 1 (confirmed): status = CONFIRMED, eventCount = 2 + val afterConfirmed = responder.handleEntityStateAtSequence(stateQuery("RES-1", 1)) + assertNotNull(afterConfirmed.state) + assertTrue(afterConfirmed.state!!.contains("\"status\":\"CONFIRMED\""), afterConfirmed.state) + assertTrue(afterConfirmed.state!!.contains("\"eventCount\":2"), afterConfirmed.state) + + // After event 2 (cancelled): status = CANCELLED, eventCount = 3, reason captured + val afterCancelled = responder.handleEntityStateAtSequence(stateQuery("RES-1", 2)) + assertNotNull(afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"status\":\"CANCELLED\""), afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"eventCount\":3"), afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"cancelReason\":\"double-booked\""), afterCancelled.state) + } + + @Test + fun `entity state at negative sequence replays all events`() { + val result = responder.handleEntityStateAtSequence(stateQuery("RES-1", -1)) + assertNotNull(result.state) + assertTrue(result.state!!.contains("\"status\":\"CANCELLED\"")) + assertTrue(result.state!!.contains("\"eventCount\":3")) + } + + @Test + fun `entity state for unknown id returns null state`() { + val result = responder.handleEntityStateAtSequence(stateQuery("RES-MISSING", -1)) + assertNull(result.state) + } + + // ------------------------------------------------------------------------------------------ + // Timeline replay + // ------------------------------------------------------------------------------------------ + + @Test + fun `timeline replay produces stateBefore and stateAfter pairs for every event`() { + val result = responder.handleTimelineReplay(timelineQuery("RES-1")) + + assertEquals(3, result.totalEvents) + assertEquals(3, result.entries.size) + + // Event 0 — AF5's factory has just created the entity (defaults applied: status=CREATED, + // eventCount=0, customerId=null) BEFORE the @EventSourcingHandler runs. So stateBefore + // captures that just-constructed shape; stateAfter captures the post-handler state with + // customerId set and eventCount=1. + val first = result.entries[0] + assertEquals(0L, first.sequenceNumber) + assertEquals(ReservationCreated::class.java.name, first.eventType) + assertNotNull(first.stateBefore) + assertTrue(first.stateBefore!!.contains("\"eventCount\":0")) + assertTrue(first.stateBefore!!.contains("\"customerId\":null")) + assertNotNull(first.stateAfter) + assertTrue(first.stateAfter!!.contains("\"eventCount\":1")) + assertTrue(first.stateAfter!!.contains("\"customerId\":\"alice\"")) + + // Event 1 — stateBefore = CREATED, stateAfter = CONFIRMED. + val second = result.entries[1] + assertEquals(1L, second.sequenceNumber) + assertTrue(second.stateBefore!!.contains("\"status\":\"CREATED\"")) + assertTrue(second.stateAfter!!.contains("\"status\":\"CONFIRMED\"")) + + // Event 2 — stateBefore = CONFIRMED, stateAfter = CANCELLED. + val third = result.entries[2] + assertEquals(2L, third.sequenceNumber) + assertTrue(third.stateBefore!!.contains("\"status\":\"CONFIRMED\"")) + assertTrue(third.stateAfter!!.contains("\"status\":\"CANCELLED\"")) + } + + @Test + fun `timeline replay honours offset and limit`() { + val result = responder.handleTimelineReplay(timelineQuery("RES-1", offset = 1, limit = 1)) + + // Total still reflects the full stream so the FE can drive pagination. + assertEquals(3, result.totalEvents) + assertEquals(1, result.entries.size) + assertEquals(1L, result.entries.first().sequenceNumber) + assertEquals(ReservationConfirmed::class.java.name, result.entries.first().eventType) + assertTrue(result.truncated, "events remain after the requested window") + } + + // ------------------------------------------------------------------------------------------ + // Helpers + // ------------------------------------------------------------------------------------------ + + private fun invokeRegisteredEntities() = responder.handleRegisteredEntities() + + private fun domainEventsQuery(id: String) = ModelDomainEventsQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + ) + + private fun stateQuery(id: String, maxSeq: Long) = ModelEntityStateAtSequenceQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + maxSequenceNumber = maxSeq, + ) + + private fun timelineQuery(id: String, offset: Int = 0, limit: Int = 100) = ModelTimelineQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + offset = offset, + limit = limit, + ) + + // ------------------------------------------------------------------------------------------ + // Test fixture: entity + events + // ------------------------------------------------------------------------------------------ + + /** + * Status enum (declared as a top-level type within the test file) — using an enum gives us + * a state field whose JSON representation is a clean string we can assert against. + */ + enum class Status { CREATED, CONFIRMED, CANCELLED } + + /** + * Mutable event-sourced entity with `@EventSourcingHandler` methods. We mutate in place + * (the AF5 default for annotated entities) so the test exercises the same dispatch path + * a real application uses. + */ + @EventSourcedEntity(tagKey = "reservationId") + class Reservation @EntityCreator constructor( + // @InjectEntityId disambiguates the id from the payload parameter — without it, + // AF5 treats the first ctor arg as the event payload type and no match exists. + @Suppress("unused") @InjectEntityId val reservationId: String, + ) { + var status: Status = Status.CREATED + var customerId: String? = null + var cancelReason: String? = null + var eventCount: Int = 0 + + @EventSourcingHandler + fun on(event: ReservationCreated) { + customerId = event.customerId + status = Status.CREATED + eventCount++ + } + + @EventSourcingHandler + fun on(@Suppress("unused") event: ReservationConfirmed) { + status = Status.CONFIRMED + eventCount++ + } + + @EventSourcingHandler + fun on(event: ReservationCancelled) { + status = Status.CANCELLED + cancelReason = event.reason + eventCount++ + } + } + + data class ReservationCreated( + @field:EventTag(key = "reservationId") val reservationId: String, + val customerId: String, + ) + + data class ReservationConfirmed( + @field:EventTag(key = "reservationId") val reservationId: String, + ) + + data class ReservationCancelled( + @field:EventTag(key = "reservationId") val reservationId: String, + val reason: String, + ) +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt new file mode 100644 index 0000000..363d409 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelEntityStateAtSequenceQuery +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.BaseModule +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.eventsourcing.annotation.EventSourcedEntity +import org.axonframework.eventsourcing.annotation.EventTag +import org.axonframework.eventsourcing.annotation.reflection.EntityCreator +import org.axonframework.eventsourcing.annotation.reflection.InjectEntityId +import org.axonframework.eventsourcing.annotation.EventSourcingHandler +import org.axonframework.eventsourcing.configuration.EventSourcedEntityModule +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.messaging.core.MessageType +import org.axonframework.messaging.eventhandling.EventSink +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.modelling.SimpleStateManager +import org.axonframework.modelling.StateManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * End-to-end test for the case the existing integration test doesn't cover: an event-sourced + * entity buried inside a custom user-defined [BaseModule]. Without this, you can't tell whether + * inspection works only for the conveniently top-level entity registration that + * `registerEntity(...)` produces, or also for arbitrary user nesting. + * + * The setup: + * - {@link OuterEntity} registered at the top level via the usual `registerEntity(...)` path. + * - [MySubModule] — a hand-rolled [BaseModule] that registers its own [StateManager] and an + * {@link InnerEntity} inside it via `registerModule(EventSourcedEntityModule.autodetected(...))`. + * + * Both entities must surface in the registered-entities query, and queries against the inner one + * must reconstruct state correctly — proving the model-inspection enhancer's submodule walk + * reaches arbitrary depth, not just one level. + */ +class RSocketModelInspectionResponderNestedModuleIntegrationTest { + + private lateinit var configuration: AxonConfiguration + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + configuration = EventSourcingConfigurer.create() + .registerEntity(EventSourcedEntityModule.autodetected(String::class.java, OuterEntity::class.java)) + .componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(RSocketHandlerRegistrar::class.java) + .withBuilder { RSocketHandlerRegistrar(CborJackson3EncodingStrategy()) }) + // The custom BaseModule lives directly under the root component registry. + // Inside it, a further sub-module registers InnerEntity — two levels deep. + cr.registerModule(MySubModule()) + } + .start() + + responder = configuration.getComponent(RSocketModelInspectionResponder::class.java) + + val sink = configuration.getComponent(EventSink::class.java) + sink.publish( + null, + event(OuterCreated("OUTER-1", "blue")), + event(InnerOpened("INNER-1", 42)), + event(InnerClosed("INNER-1")), + ).get() + } + + @AfterEach + fun tearDown() { + configuration.shutdown() + } + + private fun event(payload: Any) = GenericEventMessage(MessageType(payload.javaClass), payload) + + @Test + fun `registered entities query surfaces both top-level and deeply nested entities`() { + val result = responder.handleRegisteredEntities() + val typeNames = result.entities.map { it.entityType }.toSet() + + assertTrue(typeNames.contains(OuterEntity::class.java.name), + "expected OuterEntity (top-level) to be registered") + assertTrue(typeNames.contains(InnerEntity::class.java.name), + "expected InnerEntity (nested inside MySubModule) to be registered — submodule walker must reach it") + } + + @Test + fun `state at sequence reconstructs the inner entity in the nested module`() { + val result = responder.handleEntityStateAtSequence(ModelEntityStateAtSequenceQuery( + entityType = InnerEntity::class.java.name, + entityId = "INNER-1", + idType = String::class.java.name, + maxSequenceNumber = -1, + )) + + assertNotNull(result.state, "state must be reconstructed for the inner entity") + assertTrue(result.state!!.contains("\"open\":false"), result.state) + assertTrue(result.state!!.contains("\"value\":42"), result.state) + } + + @Test + fun `state at sequence reconstructs the outer entity registered at the root`() { + val result = responder.handleEntityStateAtSequence(ModelEntityStateAtSequenceQuery( + entityType = OuterEntity::class.java.name, + entityId = "OUTER-1", + idType = String::class.java.name, + maxSequenceNumber = -1, + )) + + assertNotNull(result.state) + assertTrue(result.state!!.contains("\"colour\":\"blue\""), result.state) + } + + // ------------------------------------------------------------------------------------------ + // Test fixtures + // ------------------------------------------------------------------------------------------ + + /** + * Custom user module that owns its own [StateManager] and registers an event-sourced entity + * as a sub-module. Mirrors how a real application might package a bounded context. + */ + class MySubModule : BaseModule("MySubModule") { + init { + componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(StateManager::class.java) + .withBuilder { SimpleStateManager.named("MySubModuleStateManager") }) + cr.registerModule(EventSourcedEntityModule.autodetected(String::class.java, InnerEntity::class.java)) + } + } + } + + @EventSourcedEntity(tagKey = "outerId") + class OuterEntity @EntityCreator constructor( + @Suppress("unused") @InjectEntityId val outerId: String, + ) { + var colour: String = "" + + @EventSourcingHandler + fun on(event: OuterCreated) { + colour = event.colour + } + } + + data class OuterCreated( + @field:EventTag(key = "outerId") val outerId: String, + val colour: String, + ) + + @EventSourcedEntity(tagKey = "innerId") + class InnerEntity @EntityCreator constructor( + @Suppress("unused") @InjectEntityId val innerId: String, + ) { + var open: Boolean = false + var value: Int = 0 + + @EventSourcingHandler + fun on(event: InnerOpened) { + open = true + value = event.value + } + + @EventSourcingHandler + fun on(@Suppress("unused") event: InnerClosed) { + open = false + } + } + + data class InnerOpened( + @field:EventTag(key = "innerId") val innerId: String, + val value: Int, + ) + + data class InnerClosed( + @field:EventTag(key = "innerId") val innerId: String, + ) +}