Introduce QueryShutdownManager and named QueryGateway factory#4482
Introduce QueryShutdownManager and named QueryGateway factory#4482abuijze wants to merge 5 commits into
Conversation
QueryShutdownManager tracks locally-initiated subscription and streaming query streams and cancels them on shutdown, using either a close-immediately or grace-period policy. Callers opt in per stream via track(), or configure a named gateway to apply tracking automatically to every dispatched query. MessagingConfigurer.queryGateway(String, Consumer<QueryGatewayConfigurer>) registers an independently configured QueryGateway component under the given name, sharing the main QueryBus and supporting infrastructure. Shutdown tracking is applied when cancellingSubscriptionQueryOnShutdown or cancellingStreamingQueryOnShutdown is configured on the QueryGatewayConfigurer, producing a ShutdownTrackingQueryGateway wrapper. Use withDefaults() to register a plain gateway without decorators. For standalone use, QueryShutdownManager.registerShutdown(LifecycleRegistry) wires shutdown() into Axon's lifecycle at OUTBOUND_QUERY_CONNECTORS. Spring Boot: QueryShutdownAutoConfiguration wires all QueryShutdownManager beans into Spring's SmartLifecycle at the web-server graceful-shutdown phase. Gateway wrapping requires an explicit named @bean QueryGateway; the Spring bridge registers it by bean name automatically. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
smcvb
left a comment
There was a problem hiding this comment.
I think I am mostly unclear on why this QueryShutdownManager (1) does not have a single default instance in the MessagingConfigurer and (2) is not automatically decorated for QueryGateway instances. There is always value in this feature, right?
| completion.complete(null); | ||
| }); | ||
| activeItems.put(key, new TrackedItem(() -> { | ||
| tracked.close(); |
There was a problem hiding this comment.
Do we need to close both the given stream as well as the result from stream#onClose on line 172? Not sure of the need to invoke it on both.
There was a problem hiding this comment.
Both are necessary. tracked is the downstream wrapper returned to the caller: closing it runs the cleanup handler. stream is the upstream connection to the query bus: closing it actually terminates the query. Without stream.close(), the query bus connection stays open; without tracked.close(), the cleanup handler never fires.
| .toArray(CompletableFuture[]::new) | ||
| ); | ||
| return allDone.orTimeout(gracePeriod.toMillis(), TimeUnit.MILLISECONDS) | ||
| .exceptionally(e -> null) |
There was a problem hiding this comment.
Curious: Do we want to swallow all exceptions here?
There was a problem hiding this comment.
The only realistic exception here is TimeoutException from orTimeout. The individual completion futures in activeItems are plain CompletableFuture instances that we only ever complete normally, so no other exception can arrive. Added a comment to the code to make the intent clear.
| * | ||
| * @since 5.2.0 | ||
| */ | ||
| public static class Spec { |
There was a problem hiding this comment.
Not so sure about having this inner Spec class. Isn't all it's replacing a Supplier<QueryShutdownManager> in the QueryGatewayConfigurer?
There was a problem hiding this comment.
Deliberate DSL entry point. The Spec surface (closeImmediately(), withGracePeriod(Duration), withGracePeriod(long)) is what IDE autocomplete surfaces inside the lambda, so users don't need to know QueryShutdownManager.* exists. With a Supplier<QueryShutdownManager> they'd have to discover the static factory methods on their own.
| * .cancellingSubscriptionQueryOnShutdown(c -> c.closeImmediately()) | ||
| * ); | ||
| * }</pre> | ||
| * In a Spring Boot application, declare a named {@code @Bean} of type {@link QueryGateway} instead. |
There was a problem hiding this comment.
Similar as with the QueryShutdownManager: not so sure we should explain Spring-specific in a non-Spring module. I'd rather have generic dependency-injection-speach here.
| * @param configurerTask lambda consuming the {@link QueryGatewayConfigurer} | ||
| * @return the current instance of the {@code Configurer} for a fluent API | ||
| */ | ||
| public MessagingConfigurer queryGateway(String name, Consumer<QueryGatewayConfigurer> configurerTask) { |
There was a problem hiding this comment.
Wouldn't we want to do this by default? The tracking and usage of the QueryShutdownManager, that is. If so, I think it would be nice to have it placed in the MessagingConfigurationDefaults as well.
There was a problem hiding this comment.
Or, did you intend to let the QueryShutdownManager by a customer detail, not set by default?
Given that this feature comes "out of thin air" to me, I am not so sure what's expected of it by our users. I can guess a bit, of course, but still curious what the reasoning was for not making this a default.
There was a problem hiding this comment.
I mean, if the default QueryShutdownManager is present in the MessagingConfigurationDefaults, we would be able to automatically wrap all QueryGateway instances with a DecoratorDefinition there. Furthermore, we can automatically shut the manager down when registering the main QueryShutdownManager Component.
I am mainly triggering on this, as the "feel" is rather different from our other configuration approaches.
There was a problem hiding this comment.
The opt-in design is intentional. The grace period is application-specific: SSE endpoints, reactive streams, and long-polling clients have very different expectations for how long "long enough" is. A fixed default would silently apply shutdown behavior to everyone while only being correct for a subset. Making it explicit keeps the intent visible and the timeout meaningful.
| * | ||
| * @param name the name under which the produced {@link QueryGateway} is registered | ||
| */ | ||
| public QueryGatewayConfigurer(String name) { |
There was a problem hiding this comment.
If it's intended for internal use, we can mark it as @Internal.
| streamingManager = QueryShutdownManager.closeImmediately(); | ||
| } | ||
|
|
||
| private static class StubQueryGateway implements QueryGateway { |
There was a problem hiding this comment.
Curious: Why make a stub instead of using Mockito or something alike? Not that I mind the pattern, it just seems like a little much for a component that mostly delegates calls.
There was a problem hiding this comment.
Overall, the solution with a Stub was easier to read and validate than a version with mocks.
|



QueryShutdownManager tracks locally-initiated subscription and streaming query streams and cancels them on shutdown, using either a close-immediately or grace-period policy. Callers opt in per stream via track(), or configure a named gateway to apply tracking automatically to every dispatched query.
MessagingConfigurer.queryGateway(String, Consumer) registers an independently configured QueryGateway component under the given name, sharing the main QueryBus and supporting infrastructure. Shutdown tracking is applied when cancellingSubscriptionQueryOnShutdown or cancellingStreamingQueryOnShutdown is configured on the QueryGatewayConfigurer, producing a ShutdownTrackingQueryGateway wrapper. Use withDefaults() to register a plain gateway without decorators.
For standalone use, QueryShutdownManager.registerShutdown(LifecycleRegistry) wires shutdown() into Axon's lifecycle at OUTBOUND_QUERY_CONNECTORS.
Spring Boot: QueryShutdownAutoConfiguration wires all QueryShutdownManager beans into Spring's SmartLifecycle at the web-server graceful-shutdown phase. Gateway wrapping requires an explicit named @bean QueryGateway; the Spring bridge registers it by bean name automatically.