diff --git a/framework-client-api/pom.xml b/framework-client-api/pom.xml
index 2d2be66e..1aa0ef8e 100644
--- a/framework-client-api/pom.xml
+++ b/framework-client-api/pom.xml
@@ -21,7 +21,7 @@
io.axoniq.platform
framework-client-parent
- 5.1.0
+ 5.1.0-SNAPSHOT
4.0.0
diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt
index 1f27651c..db54dc7f 100644
--- a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt
+++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/clientApi.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022-2025. AxonIQ B.V.
+ * Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,4 +35,10 @@ data class ClientSettingsV2(
val processorReportInterval: Long,
val handlerReportInterval: Long,
val applicationReportInterval: Long,
+ /**
+ * Codecs the server can decode on inbound frames and produce on outbound. Null when the server is
+ * older and predates compression negotiation; clients must treat null as "no compression".
+ * Values are [CompressionCodec.wireName] tokens, e.g. "gzip".
+ */
+ val supportedCompressionCodecs: List? = null,
)
\ No newline at end of file
diff --git a/framework-client-api/src/main/java/io/axoniq/platform/framework/api/compressionApi.kt b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/compressionApi.kt
new file mode 100644
index 00000000..b047ba01
--- /dev/null
+++ b/framework-client-api/src/main/java/io/axoniq/platform/framework/api/compressionApi.kt
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022-2026. AxonIQ B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.axoniq.platform.framework.api
+
+/**
+ * Custom RSocket composite-metadata MIME type used to flag a payload's data as compressed with a specific codec.
+ * The metadata content is a single byte equal to the [CompressionCodec.id] used.
+ */
+const val COMPRESSION_METADATA_MIME: String = "application/vnd.axoniq.platform.compression.v0"
+
+/**
+ * Compression codecs supported on the Axoniq Console wire protocol.
+ *
+ * The codec is identified on the wire by [id]; the [wireName] is the lowercase token used in
+ * [ClientSettingsV2.supportedCompressionCodecs] for capability advertisement.
+ */
+enum class CompressionCodec(val id: Byte, val wireName: String) {
+ GZIP(1, "gzip");
+
+ companion object {
+ fun fromId(id: Byte): CompressionCodec? = entries.firstOrNull { it.id == id }
+ fun fromWireName(name: String): CompressionCodec? = entries.firstOrNull { it.wireName.equals(name, ignoreCase = true) }
+ }
+}
diff --git a/framework-client/pom.xml b/framework-client/pom.xml
index 215bdd72..fe44c45e 100644
--- a/framework-client/pom.xml
+++ b/framework-client/pom.xml
@@ -21,7 +21,7 @@
io.axoniq.platform
framework-client-parent
- 5.1.0
+ 5.1.0-SNAPSHOT
4.0.0
diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java
index b3846e25..3591be7e 100644
--- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java
+++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java
@@ -27,6 +27,7 @@
import io.axoniq.platform.framework.client.ServerProcessorReporter;
import io.axoniq.platform.framework.client.SetupPayloadCreator;
import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy;
+import io.axoniq.platform.framework.client.strategy.CompressingEncodingStrategy;
import io.axoniq.platform.framework.client.strategy.RSocketPayloadEncodingStrategy;
import io.axoniq.platform.framework.eventprocessor.AxoniqPlatformEventHandlingComponent;
import io.axoniq.platform.framework.eventprocessor.EventProcessorManager;
@@ -88,6 +89,10 @@ public void enhance(ComponentRegistry registry) {
.registerComponent(ComponentDefinition
.ofType(RSocketPayloadEncodingStrategy.class)
.withBuilder(c -> createEncodingStrategy()))
+ .registerDecorator(DecoratorDefinition.forType(RSocketPayloadEncodingStrategy.class)
+ .with((c, name, delegate) ->
+ new CompressingEncodingStrategy(delegate, c.getComponent(PlatformClientConnectionService.class)))
+ .order(Integer.MAX_VALUE))
.registerComponent(ComponentDefinition
.ofType(RSocketHandlerRegistrar.class)
.withBuilder(c -> new RSocketHandlerRegistrar(c.getComponent(
diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt
new file mode 100644
index 00000000..9925c497
--- /dev/null
+++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2022-2026. AxonIQ B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.axoniq.platform.framework.client.strategy
+
+import io.axoniq.platform.framework.api.ClientSettingsV2
+import io.axoniq.platform.framework.api.ClientStatus
+import io.axoniq.platform.framework.api.CompressionCodec
+import io.axoniq.platform.framework.client.PlatformClientConnectionObserver
+import io.axoniq.platform.framework.client.PlatformClientConnectionService
+import io.axoniq.platform.framework.client.addCompressionMetadata
+import io.axoniq.platform.framework.client.readCompressionCodec
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.CompositeByteBuf
+import io.rsocket.Payload
+import io.rsocket.metadata.WellKnownMimeType
+import io.rsocket.util.DefaultPayload
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * Decorator [RSocketPayloadEncodingStrategy] that gzips outbound payloads above a fixed size threshold and
+ * transparently decompresses inbound frames flagged with the compression metadata entry. Mime type, payload
+ * shape, and the rest of the encoding contract delegate straight through to [delegate].
+ *
+ * The decorator subscribes itself to [PlatformClientConnectionService] so the active codec follows the
+ * server's settings handshake — a server that doesn't advertise compression keeps the connection in raw mode.
+ */
+class CompressingEncodingStrategy(
+ private val delegate: RSocketPayloadEncodingStrategy,
+ platformClientConnectionService: PlatformClientConnectionService,
+) : RSocketPayloadEncodingStrategy, PlatformClientConnectionObserver {
+
+ init {
+ platformClientConnectionService.subscribeToSettings(this)
+ }
+
+ private val outboundCodec = AtomicReference(null)
+
+ override fun getMimeType(): WellKnownMimeType = delegate.getMimeType()
+
+ fun enableCompression(codec: CompressionCodec) = outboundCodec.set(codec)
+
+ fun disableCompression() = outboundCodec.set(null)
+
+ /**
+ * Picks the first wire-name from [ClientSettingsV2.supportedCompressionCodecs] that this client knows
+ * how to speak and enables it; if the server advertises nothing (older server) compression stays off.
+ */
+ override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) {
+ val codec = settings.supportedCompressionCodecs
+ ?.asSequence()
+ ?.mapNotNull { CompressionCodec.fromWireName(it) }
+ ?.firstOrNull()
+ if (codec != null) enableCompression(codec) else disableCompression()
+ }
+
+ override fun onDisconnected() = disableCompression()
+
+ override fun encode(payload: Any, metadata: ByteBuf?): Payload {
+ val inner = delegate.encode(payload, metadata)
+ val codec = outboundCodec.get() ?: return inner
+ if (inner.data.remaining() < COMPRESSION_THRESHOLD) return inner
+
+ val raw = ByteArray(inner.data.remaining()).also { inner.data.duplicate().get(it) }
+ val compressed = Compression.compress(codec, raw)
+ if (compressed.size >= raw.size) return inner
+
+ val newData = ByteBufAllocator.DEFAULT.buffer(compressed.size).writeBytes(compressed)
+ val newMetadata = appendCompressionFlag(inner.sliceMetadata(), codec)
+ return DefaultPayload.create(newData, newMetadata)
+ }
+
+ override fun decode(payload: Payload, expectedType: Class): T {
+ val codec = payload.sliceMetadata().readCompressionCodec()
+ ?: return delegate.decode(payload, expectedType)
+
+ val raw = ByteArray(payload.data.remaining()).also { payload.data.duplicate().get(it) }
+ val decompressed = Compression.decompress(codec, raw)
+ val rebuilt = DefaultPayload.create(
+ ByteBufAllocator.DEFAULT.buffer(decompressed.size).writeBytes(decompressed),
+ payload.sliceMetadata().copy()
+ )
+ return delegate.decode(rebuilt, expectedType)
+ }
+
+ private fun appendCompressionFlag(existing: ByteBuf, codec: CompressionCodec): CompositeByteBuf {
+ val out = ByteBufAllocator.DEFAULT.compositeBuffer()
+ if (existing.readableBytes() > 0) out.addComponent(true, existing.copy())
+ out.addCompressionMetadata(codec)
+ return out
+ }
+
+ companion object {
+ const val COMPRESSION_THRESHOLD: Int = 256
+ }
+}
diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/Compression.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/Compression.kt
new file mode 100644
index 00000000..4c2af018
--- /dev/null
+++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/Compression.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2022-2026. AxonIQ B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.axoniq.platform.framework.client.strategy
+
+import io.axoniq.platform.framework.api.CompressionCodec
+import java.io.ByteArrayOutputStream
+import java.util.zip.GZIPInputStream
+import java.util.zip.GZIPOutputStream
+
+internal object Compression {
+ fun compress(codec: CompressionCodec, data: ByteArray): ByteArray = when (codec) {
+ CompressionCodec.GZIP -> gzip(data)
+ }
+
+ fun decompress(codec: CompressionCodec, data: ByteArray): ByteArray = when (codec) {
+ CompressionCodec.GZIP -> gunzip(data)
+ }
+
+ private fun gzip(data: ByteArray): ByteArray {
+ val out = ByteArrayOutputStream(data.size / 2)
+ GZIPOutputStream(out).use { it.write(data) }
+ return out.toByteArray()
+ }
+
+ private fun gunzip(data: ByteArray): ByteArray {
+ return GZIPInputStream(data.inputStream()).use { it.readBytes() }
+ }
+}
diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt
index 8c203370..2abfd13a 100644
--- a/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt
+++ b/framework-client/src/main/java/io/axoniq/platform/framework/client/utils.kt
@@ -16,9 +16,13 @@
package io.axoniq.platform.framework.client
+import io.axoniq.platform.framework.api.COMPRESSION_METADATA_MIME
+import io.axoniq.platform.framework.api.CompressionCodec
import io.axoniq.platform.framework.api.PlatformClientAuthentication
+import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
+import io.rsocket.metadata.CompositeMetadata
import io.rsocket.metadata.CompositeMetadataCodec
import io.rsocket.metadata.TaggingMetadataCodec
import io.rsocket.metadata.WellKnownMimeType
@@ -44,3 +48,34 @@ fun CompositeByteBuf.addAuthMetadata(auth: PlatformClientAuthentication) {
authMetadata
)
}
+
+/**
+ * Appends a compression-flag entry to this composite metadata. The peer will read [codec] as a single byte
+ * and decompress the data buffer accordingly.
+ */
+fun CompositeByteBuf.addCompressionMetadata(codec: CompressionCodec) {
+ val content = ByteBufAllocator.DEFAULT.buffer(1).writeByte(codec.id.toInt())
+ CompositeMetadataCodec.encodeAndAddMetadata(
+ this,
+ ByteBufAllocator.DEFAULT,
+ COMPRESSION_METADATA_MIME,
+ content
+ )
+}
+
+/**
+ * Reads the compression flag from RSocket composite metadata, if present. Returns null when the peer did not
+ * compress this frame or when the metadata buffer is null.
+ */
+fun ByteBuf?.readCompressionCodec(): CompressionCodec? {
+ if (this == null || this.readableBytes() == 0) return null
+ val composite = CompositeMetadata(this, false)
+ for (entry in composite) {
+ if (entry.mimeType == COMPRESSION_METADATA_MIME) {
+ val content = entry.content
+ if (content.readableBytes() < 1) return null
+ return CompressionCodec.fromId(content.getByte(content.readerIndex()))
+ }
+ }
+ return null
+}
diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategyTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategyTest.kt
new file mode 100644
index 00000000..52bd2597
--- /dev/null
+++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategyTest.kt
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2022-2026. AxonIQ B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.axoniq.platform.framework.client.strategy
+
+import io.axoniq.platform.framework.api.COMPRESSION_METADATA_MIME
+import io.axoniq.platform.framework.api.CompressionCodec
+import io.axoniq.platform.framework.client.PlatformClientConnectionService
+import io.axoniq.platform.framework.client.addRouteMetadata
+import io.netty.buffer.ByteBufAllocator
+import io.rsocket.metadata.CompositeMetadata
+import io.rsocket.util.DefaultPayload
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+import tools.jackson.dataformat.cbor.CBORMapper
+
+class CompressingEncodingStrategyTest {
+
+ private val strategy = CompressingEncodingStrategy(
+ delegate = CborJackson3EncodingStrategy(),
+ platformClientConnectionService = PlatformClientConnectionService(),
+ )
+
+ private data class Sample(val a: String, val b: List)
+
+ @Test
+ fun `encoded payload is not compressed when codec is not enabled`() {
+ val payload = strategy.encode(largeSample(), routingMetadata())
+
+ assertFalse(compressionEntryPresent(payload.sliceMetadata()))
+ // Round-trip works without enabling compression.
+ val decoded = strategy.decode(payload, Sample::class.java)
+ assertEquals(largeSample(), decoded)
+ }
+
+ @Test
+ fun `small payload is not compressed even when codec is enabled`() {
+ strategy.enableCompression(CompressionCodec.GZIP)
+ val payload = strategy.encode(Sample("a", listOf("b")), routingMetadata())
+
+ assertFalse(compressionEntryPresent(payload.sliceMetadata()))
+ }
+
+ @Test
+ fun `large payload is compressed when codec is enabled and shrunk on the wire`() {
+ strategy.enableCompression(CompressionCodec.GZIP)
+ val sample = largeSample()
+
+ val rawSize = DefaultPayload.create(
+ CBORMapper.builder().build().writeValueAsBytes(sample)
+ ).data.remaining()
+
+ val payload = strategy.encode(sample, routingMetadata())
+
+ assertTrue(compressionEntryPresent(payload.sliceMetadata()))
+ assertTrue(payload.data.remaining() < rawSize, "compressed=${payload.data.remaining()} raw=$rawSize")
+ }
+
+ @Test
+ fun `compressed payload round-trips through encode then decode`() {
+ strategy.enableCompression(CompressionCodec.GZIP)
+ val sample = largeSample()
+
+ val payload = strategy.encode(sample, routingMetadata())
+ val decoded = strategy.decode(payload, Sample::class.java)
+
+ assertEquals(sample, decoded)
+ }
+
+ @Test
+ fun `disableCompression turns off outbound but inbound still decompresses`() {
+ strategy.enableCompression(CompressionCodec.GZIP)
+ val compressedPayload = strategy.encode(largeSample(), routingMetadata())
+ assertTrue(compressionEntryPresent(compressedPayload.sliceMetadata()))
+
+ strategy.disableCompression()
+ val plainPayload = strategy.encode(largeSample(), routingMetadata())
+ assertFalse(compressionEntryPresent(plainPayload.sliceMetadata()))
+
+ // Even with outbound off, inbound flagged frames are still decoded.
+ val decoded = strategy.decode(compressedPayload, Sample::class.java)
+ assertEquals(largeSample(), decoded)
+ }
+
+ private fun largeSample(): Sample {
+ // Repetitive structure so gzip beats CBOR comfortably above the 256-byte threshold.
+ val tokens = List(50) { "handler-name-$it-with-padding" }
+ return Sample("the-route-key-is-quite-verbose", tokens)
+ }
+
+ private fun routingMetadata() = ByteBufAllocator.DEFAULT.compositeBuffer().also {
+ it.addRouteMetadata("test-route")
+ }
+
+ private fun compressionEntryPresent(metadata: io.netty.buffer.ByteBuf?): Boolean {
+ if (metadata == null || metadata.readableBytes() == 0) return false
+ for (entry in CompositeMetadata(metadata, false)) {
+ if (entry.mimeType == COMPRESSION_METADATA_MIME) return true
+ }
+ return false
+ }
+}
diff --git a/pom.xml b/pom.xml
index 2b1c5ab0..deea6e63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
io.axoniq.platform
framework-client-parent
- 5.1.0
+ 5.1.0-SNAPSHOT
framework-client-api
diff --git a/spring-boot-starter/pom.xml b/spring-boot-starter/pom.xml
index 86e42e08..6da1e212 100644
--- a/spring-boot-starter/pom.xml
+++ b/spring-boot-starter/pom.xml
@@ -21,7 +21,7 @@
io.axoniq.platform
framework-client-parent
- 5.1.0
+ 5.1.0-SNAPSHOT
4.0.0