diff --git a/framework-client-api/pom.xml b/framework-client-api/pom.xml index 2d2be66e..1aa0ef8e 100644 --- a/framework-client-api/pom.xml +++ b/framework-client-api/pom.xml @@ -21,7 +21,7 @@ io.axoniq.platform framework-client-parent - 5.1.0 + 5.1.0-SNAPSHOT 4.0.0 diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt index 1f27651c..db54dc7f 100644 --- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,4 +35,10 @@ data class ClientSettingsV2( val processorReportInterval: Long, val handlerReportInterval: Long, val applicationReportInterval: Long, + /** + * Codecs the server can decode on inbound frames and produce on outbound. Null when the server is + * older and predates compression negotiation; clients must treat null as "no compression". + * Values are [CompressionCodec.wireName] tokens, e.g. "gzip". + */ + val supportedCompressionCodecs: List? = null, ) \ No newline at end of file diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/compressionApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/compressionApi.kt new file mode 100644 index 00000000..b047ba01 --- /dev/null +++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/compressionApi.kt @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.api + +/** + * Custom RSocket composite-metadata MIME type used to flag a payload's data as compressed with a specific codec. + * The metadata content is a single byte equal to the [CompressionCodec.id] used. + */ +const val COMPRESSION_METADATA_MIME: String = "application/vnd.axoniq.platform.compression.v0" + +/** + * Compression codecs supported on the Axoniq Console wire protocol. + * + * The codec is identified on the wire by [id]; the [wireName] is the lowercase token used in + * [ClientSettingsV2.supportedCompressionCodecs] for capability advertisement. + */ +enum class CompressionCodec(val id: Byte, val wireName: String) { + GZIP(1, "gzip"); + + companion object { + fun fromId(id: Byte): CompressionCodec? = entries.firstOrNull { it.id == id } + fun fromWireName(name: String): CompressionCodec? = entries.firstOrNull { it.wireName.equals(name, ignoreCase = true) } + } +} diff --git a/framework-client/pom.xml b/framework-client/pom.xml index 215bdd72..fe44c45e 100644 --- a/framework-client/pom.xml +++ b/framework-client/pom.xml @@ -21,7 +21,7 @@ io.axoniq.platform framework-client-parent - 5.1.0 + 5.1.0-SNAPSHOT 4.0.0 diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index b3846e25..3591be7e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -27,6 +27,7 @@ import io.axoniq.platform.framework.client.ServerProcessorReporter; import io.axoniq.platform.framework.client.SetupPayloadCreator; import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy; +import io.axoniq.platform.framework.client.strategy.CompressingEncodingStrategy; import io.axoniq.platform.framework.client.strategy.RSocketPayloadEncodingStrategy; import io.axoniq.platform.framework.eventprocessor.AxoniqPlatformEventHandlingComponent; import io.axoniq.platform.framework.eventprocessor.EventProcessorManager; @@ -88,6 +89,10 @@ public void enhance(ComponentRegistry registry) { .registerComponent(ComponentDefinition .ofType(RSocketPayloadEncodingStrategy.class) .withBuilder(c -> createEncodingStrategy())) + .registerDecorator(DecoratorDefinition.forType(RSocketPayloadEncodingStrategy.class) + .with((c, name, delegate) -> + new CompressingEncodingStrategy(delegate, c.getComponent(PlatformClientConnectionService.class))) + .order(Integer.MAX_VALUE)) .registerComponent(ComponentDefinition .ofType(RSocketHandlerRegistrar.class) .withBuilder(c -> new RSocketHandlerRegistrar(c.getComponent( diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt new file mode 100644 index 00000000..9925c497 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.client.strategy + +import io.axoniq.platform.framework.api.ClientSettingsV2 +import io.axoniq.platform.framework.api.ClientStatus +import io.axoniq.platform.framework.api.CompressionCodec +import io.axoniq.platform.framework.client.PlatformClientConnectionObserver +import io.axoniq.platform.framework.client.PlatformClientConnectionService +import io.axoniq.platform.framework.client.addCompressionMetadata +import io.axoniq.platform.framework.client.readCompressionCodec +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.rsocket.Payload +import io.rsocket.metadata.WellKnownMimeType +import io.rsocket.util.DefaultPayload +import java.util.concurrent.atomic.AtomicReference + +/** + * Decorator [RSocketPayloadEncodingStrategy] that gzips outbound payloads above a fixed size threshold and + * transparently decompresses inbound frames flagged with the compression metadata entry. Mime type, payload + * shape, and the rest of the encoding contract delegate straight through to [delegate]. + * + * The decorator subscribes itself to [PlatformClientConnectionService] so the active codec follows the + * server's settings handshake — a server that doesn't advertise compression keeps the connection in raw mode. + */ +class CompressingEncodingStrategy( + private val delegate: RSocketPayloadEncodingStrategy, + platformClientConnectionService: PlatformClientConnectionService, +) : RSocketPayloadEncodingStrategy, PlatformClientConnectionObserver { + + init { + platformClientConnectionService.subscribeToSettings(this) + } + + private val outboundCodec = AtomicReference(null) + + override fun getMimeType(): WellKnownMimeType = delegate.getMimeType() + + fun enableCompression(codec: CompressionCodec) = outboundCodec.set(codec) + + fun disableCompression() = outboundCodec.set(null) + + /** + * Picks the first wire-name from [ClientSettingsV2.supportedCompressionCodecs] that this client knows + * how to speak and enables it; if the server advertises nothing (older server) compression stays off. + */ + override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) { + val codec = settings.supportedCompressionCodecs + ?.asSequence() + ?.mapNotNull { CompressionCodec.fromWireName(it) } + ?.firstOrNull() + if (codec != null) enableCompression(codec) else disableCompression() + } + + override fun onDisconnected() = disableCompression() + + override fun encode(payload: Any, metadata: ByteBuf?): Payload { + val inner = delegate.encode(payload, metadata) + val codec = outboundCodec.get() ?: return inner + if (inner.data.remaining() < COMPRESSION_THRESHOLD) return inner + + val raw = ByteArray(inner.data.remaining()).also { inner.data.duplicate().get(it) } + val compressed = Compression.compress(codec, raw) + if (compressed.size >= raw.size) return inner + + val newData = ByteBufAllocator.DEFAULT.buffer(compressed.size).writeBytes(compressed) + val newMetadata = appendCompressionFlag(inner.sliceMetadata(), codec) + return DefaultPayload.create(newData, newMetadata) + } + + override fun decode(payload: Payload, expectedType: Class): T { + val codec = payload.sliceMetadata().readCompressionCodec() + ?: return delegate.decode(payload, expectedType) + + val raw = ByteArray(payload.data.remaining()).also { payload.data.duplicate().get(it) } + val decompressed = Compression.decompress(codec, raw) + val rebuilt = DefaultPayload.create( + ByteBufAllocator.DEFAULT.buffer(decompressed.size).writeBytes(decompressed), + payload.sliceMetadata().copy() + ) + return delegate.decode(rebuilt, expectedType) + } + + private fun appendCompressionFlag(existing: ByteBuf, codec: CompressionCodec): CompositeByteBuf { + val out = ByteBufAllocator.DEFAULT.compositeBuffer() + if (existing.readableBytes() > 0) out.addComponent(true, existing.copy()) + out.addCompressionMetadata(codec) + return out + } + + companion object { + const val COMPRESSION_THRESHOLD: Int = 256 + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/Compression.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/Compression.kt new file mode 100644 index 00000000..4c2af018 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/Compression.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.client.strategy + +import io.axoniq.platform.framework.api.CompressionCodec +import java.io.ByteArrayOutputStream +import java.util.zip.GZIPInputStream +import java.util.zip.GZIPOutputStream + +internal object Compression { + fun compress(codec: CompressionCodec, data: ByteArray): ByteArray = when (codec) { + CompressionCodec.GZIP -> gzip(data) + } + + fun decompress(codec: CompressionCodec, data: ByteArray): ByteArray = when (codec) { + CompressionCodec.GZIP -> gunzip(data) + } + + private fun gzip(data: ByteArray): ByteArray { + val out = ByteArrayOutputStream(data.size / 2) + GZIPOutputStream(out).use { it.write(data) } + return out.toByteArray() + } + + private fun gunzip(data: ByteArray): ByteArray { + return GZIPInputStream(data.inputStream()).use { it.readBytes() } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt index 8c203370..2abfd13a 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt @@ -16,9 +16,13 @@ package io.axoniq.platform.framework.client +import io.axoniq.platform.framework.api.COMPRESSION_METADATA_MIME +import io.axoniq.platform.framework.api.CompressionCodec import io.axoniq.platform.framework.api.PlatformClientAuthentication +import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.CompositeByteBuf +import io.rsocket.metadata.CompositeMetadata import io.rsocket.metadata.CompositeMetadataCodec import io.rsocket.metadata.TaggingMetadataCodec import io.rsocket.metadata.WellKnownMimeType @@ -44,3 +48,34 @@ fun CompositeByteBuf.addAuthMetadata(auth: PlatformClientAuthentication) { authMetadata ) } + +/** + * Appends a compression-flag entry to this composite metadata. The peer will read [codec] as a single byte + * and decompress the data buffer accordingly. + */ +fun CompositeByteBuf.addCompressionMetadata(codec: CompressionCodec) { + val content = ByteBufAllocator.DEFAULT.buffer(1).writeByte(codec.id.toInt()) + CompositeMetadataCodec.encodeAndAddMetadata( + this, + ByteBufAllocator.DEFAULT, + COMPRESSION_METADATA_MIME, + content + ) +} + +/** + * Reads the compression flag from RSocket composite metadata, if present. Returns null when the peer did not + * compress this frame or when the metadata buffer is null. + */ +fun ByteBuf?.readCompressionCodec(): CompressionCodec? { + if (this == null || this.readableBytes() == 0) return null + val composite = CompositeMetadata(this, false) + for (entry in composite) { + if (entry.mimeType == COMPRESSION_METADATA_MIME) { + val content = entry.content + if (content.readableBytes() < 1) return null + return CompressionCodec.fromId(content.getByte(content.readerIndex())) + } + } + return null +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategyTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategyTest.kt new file mode 100644 index 00000000..52bd2597 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategyTest.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.client.strategy + +import io.axoniq.platform.framework.api.COMPRESSION_METADATA_MIME +import io.axoniq.platform.framework.api.CompressionCodec +import io.axoniq.platform.framework.client.PlatformClientConnectionService +import io.axoniq.platform.framework.client.addRouteMetadata +import io.netty.buffer.ByteBufAllocator +import io.rsocket.metadata.CompositeMetadata +import io.rsocket.util.DefaultPayload +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import tools.jackson.dataformat.cbor.CBORMapper + +class CompressingEncodingStrategyTest { + + private val strategy = CompressingEncodingStrategy( + delegate = CborJackson3EncodingStrategy(), + platformClientConnectionService = PlatformClientConnectionService(), + ) + + private data class Sample(val a: String, val b: List) + + @Test + fun `encoded payload is not compressed when codec is not enabled`() { + val payload = strategy.encode(largeSample(), routingMetadata()) + + assertFalse(compressionEntryPresent(payload.sliceMetadata())) + // Round-trip works without enabling compression. + val decoded = strategy.decode(payload, Sample::class.java) + assertEquals(largeSample(), decoded) + } + + @Test + fun `small payload is not compressed even when codec is enabled`() { + strategy.enableCompression(CompressionCodec.GZIP) + val payload = strategy.encode(Sample("a", listOf("b")), routingMetadata()) + + assertFalse(compressionEntryPresent(payload.sliceMetadata())) + } + + @Test + fun `large payload is compressed when codec is enabled and shrunk on the wire`() { + strategy.enableCompression(CompressionCodec.GZIP) + val sample = largeSample() + + val rawSize = DefaultPayload.create( + CBORMapper.builder().build().writeValueAsBytes(sample) + ).data.remaining() + + val payload = strategy.encode(sample, routingMetadata()) + + assertTrue(compressionEntryPresent(payload.sliceMetadata())) + assertTrue(payload.data.remaining() < rawSize, "compressed=${payload.data.remaining()} raw=$rawSize") + } + + @Test + fun `compressed payload round-trips through encode then decode`() { + strategy.enableCompression(CompressionCodec.GZIP) + val sample = largeSample() + + val payload = strategy.encode(sample, routingMetadata()) + val decoded = strategy.decode(payload, Sample::class.java) + + assertEquals(sample, decoded) + } + + @Test + fun `disableCompression turns off outbound but inbound still decompresses`() { + strategy.enableCompression(CompressionCodec.GZIP) + val compressedPayload = strategy.encode(largeSample(), routingMetadata()) + assertTrue(compressionEntryPresent(compressedPayload.sliceMetadata())) + + strategy.disableCompression() + val plainPayload = strategy.encode(largeSample(), routingMetadata()) + assertFalse(compressionEntryPresent(plainPayload.sliceMetadata())) + + // Even with outbound off, inbound flagged frames are still decoded. + val decoded = strategy.decode(compressedPayload, Sample::class.java) + assertEquals(largeSample(), decoded) + } + + private fun largeSample(): Sample { + // Repetitive structure so gzip beats CBOR comfortably above the 256-byte threshold. + val tokens = List(50) { "handler-name-$it-with-padding" } + return Sample("the-route-key-is-quite-verbose", tokens) + } + + private fun routingMetadata() = ByteBufAllocator.DEFAULT.compositeBuffer().also { + it.addRouteMetadata("test-route") + } + + private fun compressionEntryPresent(metadata: io.netty.buffer.ByteBuf?): Boolean { + if (metadata == null || metadata.readableBytes() == 0) return false + for (entry in CompositeMetadata(metadata, false)) { + if (entry.mimeType == COMPRESSION_METADATA_MIME) return true + } + return false + } +} diff --git a/pom.xml b/pom.xml index 2b1c5ab0..deea6e63 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ io.axoniq.platform framework-client-parent - 5.1.0 + 5.1.0-SNAPSHOT framework-client-api diff --git a/spring-boot-starter/pom.xml b/spring-boot-starter/pom.xml index 86e42e08..6da1e212 100644 --- a/spring-boot-starter/pom.xml +++ b/spring-boot-starter/pom.xml @@ -21,7 +21,7 @@ io.axoniq.platform framework-client-parent - 5.1.0 + 5.1.0-SNAPSHOT 4.0.0