diff --git a/pom.xml b/pom.xml index 6919fb34..7cb5917c 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.axoniq axonserver-connector-java - 2025.2.2-SNAPSHOT + 2026.0.0-SNAPSHOT AxonServer Connector @@ -422,6 +422,8 @@ UTF-8 + false + false apiNote diff --git a/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java b/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java index 81a3d329..ab77d3b2 100644 --- a/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java +++ b/src/main/java/io/axoniq/axonserver/connector/AxonServerConnectionFactory.java @@ -90,13 +90,14 @@ public class AxonServerConnectionFactory { private volatile boolean shutdown; /** - * Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}. + * Instantiates an {@link AxonServerConnectionFactory} with the given {@code builder}. The clientInstanceId is + * postfixed by a random hex string to ensure uniqueness between different instances of the application. * * @param builder the {@link Builder} used to set all the specifics of an {@link AxonServerConnectionFactory} */ protected AxonServerConnectionFactory(Builder builder) { this.componentName = builder.componentName; - this.clientInstanceId = builder.clientInstanceId; + this.clientInstanceId = builder.clientInstanceId + "-" + randomHex(8); this.token = builder.token; this.tags.putAll(builder.tags); this.executorService = builder.executorService; @@ -121,7 +122,6 @@ protected AxonServerConnectionFactory(Builder builder) { * information and should be the same only for instances of the same application or component. * * @param componentName The name of the component connecting to AxonServer - * * @return a builder instance for further configuration of the connector * @see #forClient(String, String) */ @@ -131,8 +131,8 @@ public static Builder forClient(String componentName) { /** - * Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and {@code - * clientInstanceId}. + * Returns a builder to configure a ConnectionFactory instance for the given {@code componentName} and + * {@code clientInstanceId}. *

* The clientInstanceId MUST be a unique value across all instances that connect to AxonServer. The componentName is * used in monitoring information and should be the same only for instances of the same application or component. @@ -142,7 +142,6 @@ public static Builder forClient(String componentName) { * * @param componentName The name of the component connecting to AxonServer * @param clientInstanceId The unique instance identifier for this instance of the component - * * @return a builder instance for further configuration of the connector * @see #forClient(String) */ @@ -154,7 +153,6 @@ public static Builder forClient(String componentName, String clientInstanceId) { * Connects to the given {@code context} using the settings defined in this ConnectionFactory. * * @param context The name of the context to connect to - * * @return a Connection allowing interaction with the mentioned context */ public AxonServerConnection connect(String context) { @@ -206,7 +204,8 @@ private ManagedChannel createChannel(ServerAddress address, String context) { return builder.intercept( new GrpcBufferingInterceptor(50), new HeaderAttachingInterceptor<>(Headers.CONTEXT, context), - new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token) + new HeaderAttachingInterceptor<>(Headers.ACCESS_TOKEN, token), + new HeaderAttachingInterceptor<>(Headers.CLIENT_ID, clientInstanceId) ).build(); } @@ -234,12 +233,24 @@ public void shutdown() { } } + /** + * Returns the unique client instance identifier used by this ConnectionFactory. This is the value configured as + * {@code clientInstanceId} on the Builder, postfixed with a random hex string to ensure uniqueness between + * different instances of the application. + * + * @return the unique client instance identifier used by this ConnectionFactory + */ + public String getClientInstanceId() { + return clientInstanceId; + } + /** * Builder for AxonServerConnectionFactory instances. The methods on this class allow for configuration of the * {@link AxonServerConnectionFactory} instance used to connect to an AxonServer (cluster). *

- * This class is not intended to be instantiated directly, but rather through {@link - * AxonServerConnectionFactory#forClient(String)} or {@link AxonServerConnectionFactory#forClient(String, String)}. + * This class is not intended to be instantiated directly, but rather through + * {@link AxonServerConnectionFactory#forClient(String)} or + * {@link AxonServerConnectionFactory#forClient(String, String)}. */ public static class Builder { @@ -281,7 +292,6 @@ protected Builder(String componentName, String clientInstanceId) { * Defaults to "localhost:8024". * * @param serverAddresses The addresses to try to set up the initial connection with. - * * @return this builder for further configuration */ public Builder routingServers(ServerAddress... serverAddresses) { @@ -297,7 +307,6 @@ public Builder routingServers(ServerAddress... serverAddresses) { * * @param interval The amount of time to wait in between connection attempts * @param timeUnit The unit in which the interval is expressed - * * @return this builder for further configuration */ public Builder reconnectInterval(long interval, TimeUnit timeUnit) { @@ -312,7 +321,6 @@ public Builder reconnectInterval(long interval, TimeUnit timeUnit) { * * @param timeout The amount of time to wait for a connection to be established * @param timeUnit The unit in which the timout is expressed - * * @return this builder for further configuration */ public Builder connectTimeout(long timeout, TimeUnit timeUnit) { @@ -330,7 +338,6 @@ public Builder connectTimeout(long timeout, TimeUnit timeUnit) { * By default, no tags are defined. * * @param additionalClientTags additional tags that define this client component - * * @return this builder for further configuration */ public Builder clientTags(Map additionalClientTags) { @@ -349,7 +356,6 @@ public Builder clientTags(Map additionalClientTags) { * * @param key the key of the Tag to configure * @param value the value of the Tag to configure - * * @return this builder for further configuration */ public Builder clientTag(String key, String value) { @@ -364,7 +370,6 @@ public Builder clientTag(String key, String value) { * AxonServer. * * @param token The token to which the required authorizations have been assigned. - * * @return this builder for further configuration */ public Builder token(String token) { @@ -391,7 +396,6 @@ public Builder useTransportSecurity() { * Defaults to not using TLS. * * @param sslContext The context defining TLS parameters - * * @return this builder for further configuration * @see SslContextBuilder#forClient() */ @@ -403,14 +407,13 @@ public Builder useTransportSecurity(SslContext sslContext) { /** * Indicates whether the connector should always reconnect via the Routing Servers. When {@code true} (default), * the connector will contact the Routing Servers for a new destination each time a connection is dropped. When - * {@code false}, the connector will first attempt to re-establish a connection to the node is was - * previously connected to. When that fails, only then will it contact the Routing Servers. + * {@code false}, the connector will first attempt to re-establish a connection to the node is was previously + * connected to. When that fails, only then will it contact the Routing Servers. *

* Default to {@code true}, forcing the failed connection to be abandoned and a new one to be requested via the * routing servers. * * @param forceReconnectViaRoutingServers whether to force a reconnect to the Cluster via the RoutingServers. - * * @return this builder for further configuration */ public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingServers) { @@ -426,7 +429,6 @@ public Builder forceReconnectViaRoutingServers(boolean forceReconnectViaRoutingS * Defaults to 2. * * @param poolSize The number of threads to assign to Connection related activities. - * * @return this builder for further configuration */ public Builder threadPoolSize(int poolSize) { @@ -449,7 +451,6 @@ public Builder threadPoolSize(int poolSize) { * @param interval time without read activity before sending a keepalive ping * @param timeout the time waiting for read activity after sending a keepalive ping * @param timeUnit the unit in which the interval and timeout are expressed - * * @return this builder for further configuration */ public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, boolean keepAliveWithoutCalls) { @@ -466,7 +467,6 @@ public Builder usingKeepAlive(long interval, long timeout, TimeUnit timeUnit, bo * Default to 4 MiB. * * @param bytes The number of bytes to limit inbound message to - * * @return this builder for further configuration */ public Builder maxInboundMessageSize(int bytes) { @@ -482,7 +482,6 @@ public Builder maxInboundMessageSize(int bytes) { * feature. * * @param customization A function defining the customization to make on the ManagedChannelBuilder - * * @return this builder for further configuration */ public Builder customize(UnaryOperator> customization) { @@ -496,7 +495,6 @@ public Builder customize(UnaryOperator> customization) * * @param interval The interval in which to send status updates * @param unit The unit of time in which the interval is expressed - * * @return this builder for further configuration */ public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) { @@ -511,7 +509,6 @@ public Builder processorInfoUpdateFrequency(long interval, TimeUnit unit) { * Values lower than 16 will be replaced with 16. * * @param permits The number of initial permits - * * @return this builder for further configuration */ public Builder queryPermits(int permits) { @@ -526,7 +523,6 @@ public Builder queryPermits(int permits) { * Values lower than 16 will be replaced with 16. * * @param permits The number of initial permits - * * @return this builder for further configuration */ public Builder commandPermits(int permits) { diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java b/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java index eb5a36e1..26407014 100644 --- a/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java +++ b/src/main/java/io/axoniq/axonserver/connector/impl/ControlChannelImpl.java @@ -193,7 +193,7 @@ public synchronized void connect() { responseObserver.getInstructionsForPlatform(); try { - logger.info("Connected instruction stream for context '{}'. Sending client identification", context); + logger.info("Connected instruction stream for context '{}'. Sending client identification with clientId {}", context, clientIdentification.getClientId()); instructionsForPlatform.onNext(PlatformInboundInstruction.newBuilder() .setRegister(clientIdentification) .build()); diff --git a/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java b/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java index 9d79e045..64598a24 100644 --- a/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java +++ b/src/main/java/io/axoniq/axonserver/connector/impl/Headers.java @@ -35,6 +35,12 @@ public abstract class Headers { public static final Metadata.Key ACCESS_TOKEN = Metadata.Key.of("AxonIQ-Access-Token", Metadata.ASCII_STRING_MARSHALLER); + /** + * A {@link Metadata.Key} defining the client ID of the application sending this message. + */ + public static final Metadata.Key CLIENT_ID = + Metadata.Key.of("AxonIQ-ClientId", Metadata.ASCII_STRING_MARSHALLER); + private Headers() { // Utility class } diff --git a/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java b/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java index 20dba825..dca6bd08 100644 --- a/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java +++ b/src/test/java/io/axoniq/axonserver/connector/impl/ControlChannelIntegrationTest.java @@ -284,7 +284,7 @@ void moveSegmentInstructionIsPickedUpByHandler() throws Exception { String segmentToMove = "0"; String segmentsPath = "/v1/components/" + getClass().getSimpleName() + "/processors/testProcessor/segments/" + - segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=foo"; + segmentToMove + "/move?tokenStoreIdentifier=TokenStoreId&context=default&target=" + clientToMoveTo.getClientInstanceId(); assertWithin(2, TimeUnit.SECONDS, () -> sendToAxonServer(HttpPatch::new, segmentsPath)); assertWithin(2, TimeUnit.SECONDS, () -> assertTrue(instructionHandler.instructions.contains("release" + segmentToMove)));