Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion framework-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.axoniq.platform</groupId>
<artifactId>framework-client-parent</artifactId>
<version>5.1.0</version>
<version>5.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2025. AxonIQ B.V.
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,4 +35,10 @@ data class ClientSettingsV2(
val processorReportInterval: Long,
val handlerReportInterval: Long,
val applicationReportInterval: Long,
/**
* Codecs the server can decode on inbound frames and produce on outbound. Null when the server is
* older and predates compression negotiation; clients must treat null as "no compression".
* Values are [CompressionCodec.wireName] tokens, e.g. "gzip".
*/
val supportedCompressionCodecs: List<String>? = null,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.api

/**
* Custom RSocket composite-metadata MIME type used to flag a payload's data as compressed with a specific codec.
* The metadata content is a single byte equal to the [CompressionCodec.id] used.
*/
const val COMPRESSION_METADATA_MIME: String = "application/vnd.axoniq.platform.compression.v0"

/**
* Compression codecs supported on the Axoniq Console wire protocol.
*
* The codec is identified on the wire by [id]; the [wireName] is the lowercase token used in
* [ClientSettingsV2.supportedCompressionCodecs] for capability advertisement.
*/
enum class CompressionCodec(val id: Byte, val wireName: String) {
GZIP(1, "gzip");

companion object {
fun fromId(id: Byte): CompressionCodec? = entries.firstOrNull { it.id == id }
fun fromWireName(name: String): CompressionCodec? = entries.firstOrNull { it.wireName.equals(name, ignoreCase = true) }
}
}
2 changes: 1 addition & 1 deletion framework-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.axoniq.platform</groupId>
<artifactId>framework-client-parent</artifactId>
<version>5.1.0</version>
<version>5.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.axoniq.platform.framework.client.ServerProcessorReporter;
import io.axoniq.platform.framework.client.SetupPayloadCreator;
import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy;
import io.axoniq.platform.framework.client.strategy.CompressingEncodingStrategy;
import io.axoniq.platform.framework.client.strategy.RSocketPayloadEncodingStrategy;
import io.axoniq.platform.framework.eventprocessor.AxoniqPlatformEventHandlingComponent;
import io.axoniq.platform.framework.eventprocessor.EventProcessorManager;
Expand Down Expand Up @@ -88,6 +89,10 @@ public void enhance(ComponentRegistry registry) {
.registerComponent(ComponentDefinition
.ofType(RSocketPayloadEncodingStrategy.class)
.withBuilder(c -> createEncodingStrategy()))
.registerDecorator(DecoratorDefinition.forType(RSocketPayloadEncodingStrategy.class)
.with((c, name, delegate) ->
new CompressingEncodingStrategy(delegate, c.getComponent(PlatformClientConnectionService.class)))
.order(Integer.MAX_VALUE))
.registerComponent(ComponentDefinition
.ofType(RSocketHandlerRegistrar.class)
.withBuilder(c -> new RSocketHandlerRegistrar(c.getComponent(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.client.strategy

import io.axoniq.platform.framework.api.ClientSettingsV2
import io.axoniq.platform.framework.api.ClientStatus
import io.axoniq.platform.framework.api.CompressionCodec
import io.axoniq.platform.framework.client.PlatformClientConnectionObserver
import io.axoniq.platform.framework.client.PlatformClientConnectionService
import io.axoniq.platform.framework.client.addCompressionMetadata
import io.axoniq.platform.framework.client.readCompressionCodec
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.rsocket.Payload
import io.rsocket.metadata.WellKnownMimeType
import io.rsocket.util.DefaultPayload
import java.util.concurrent.atomic.AtomicReference

/**
* Decorator [RSocketPayloadEncodingStrategy] that gzips outbound payloads above a fixed size threshold and
* transparently decompresses inbound frames flagged with the compression metadata entry. Mime type, payload
* shape, and the rest of the encoding contract delegate straight through to [delegate].
*
* The decorator subscribes itself to [PlatformClientConnectionService] so the active codec follows the
* server's settings handshake — a server that doesn't advertise compression keeps the connection in raw mode.
*/
class CompressingEncodingStrategy(
private val delegate: RSocketPayloadEncodingStrategy,
platformClientConnectionService: PlatformClientConnectionService,
) : RSocketPayloadEncodingStrategy, PlatformClientConnectionObserver {

init {
platformClientConnectionService.subscribeToSettings(this)
}

private val outboundCodec = AtomicReference<CompressionCodec?>(null)

override fun getMimeType(): WellKnownMimeType = delegate.getMimeType()

Check warning on line 53 in framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace with interface delegation using "by" in the class header.

See more on https://sonarcloud.io/project/issues?id=AxonIQ_console-framework-client&issues=AZ4HAv_a_ZHWsTC8ygG9&open=AZ4HAv_a_ZHWsTC8ygG9&pullRequest=154

fun enableCompression(codec: CompressionCodec) = outboundCodec.set(codec)

fun disableCompression() = outboundCodec.set(null)

/**
* Picks the first wire-name from [ClientSettingsV2.supportedCompressionCodecs] that this client knows
* how to speak and enables it; if the server advertises nothing (older server) compression stays off.
*/
override fun onConnected(clientStatus: ClientStatus, settings: ClientSettingsV2) {
val codec = settings.supportedCompressionCodecs
?.asSequence()
?.mapNotNull { CompressionCodec.fromWireName(it) }
?.firstOrNull()
if (codec != null) enableCompression(codec) else disableCompression()
}

override fun onDisconnected() = disableCompression()

override fun encode(payload: Any, metadata: ByteBuf?): Payload {
val inner = delegate.encode(payload, metadata)
val codec = outboundCodec.get() ?: return inner
if (inner.data.remaining() < COMPRESSION_THRESHOLD) return inner

val raw = ByteArray(inner.data.remaining()).also { inner.data.duplicate().get(it) }

Check warning on line 78 in framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace function call with indexed accessor.

See more on https://sonarcloud.io/project/issues?id=AxonIQ_console-framework-client&issues=AZ4HAv_a_ZHWsTC8ygG-&open=AZ4HAv_a_ZHWsTC8ygG-&pullRequest=154
val compressed = Compression.compress(codec, raw)
if (compressed.size >= raw.size) return inner

val newData = ByteBufAllocator.DEFAULT.buffer(compressed.size).writeBytes(compressed)
val newMetadata = appendCompressionFlag(inner.sliceMetadata(), codec)
return DefaultPayload.create(newData, newMetadata)
}

override fun <T> decode(payload: Payload, expectedType: Class<T>): T {
val codec = payload.sliceMetadata().readCompressionCodec()
?: return delegate.decode(payload, expectedType)

val raw = ByteArray(payload.data.remaining()).also { payload.data.duplicate().get(it) }

Check warning on line 91 in framework-client/src/main/java/io/axoniq/platform/framework/client/strategy/CompressingEncodingStrategy.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace function call with indexed accessor.

See more on https://sonarcloud.io/project/issues?id=AxonIQ_console-framework-client&issues=AZ4HAv_a_ZHWsTC8ygG_&open=AZ4HAv_a_ZHWsTC8ygG_&pullRequest=154
val decompressed = Compression.decompress(codec, raw)
val rebuilt = DefaultPayload.create(
ByteBufAllocator.DEFAULT.buffer(decompressed.size).writeBytes(decompressed),
payload.sliceMetadata().copy()
)
return delegate.decode(rebuilt, expectedType)
}

private fun appendCompressionFlag(existing: ByteBuf, codec: CompressionCodec): CompositeByteBuf {
val out = ByteBufAllocator.DEFAULT.compositeBuffer()
if (existing.readableBytes() > 0) out.addComponent(true, existing.copy())
out.addCompressionMetadata(codec)
return out
}

companion object {
const val COMPRESSION_THRESHOLD: Int = 256
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.client.strategy

import io.axoniq.platform.framework.api.CompressionCodec
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream

internal object Compression {
fun compress(codec: CompressionCodec, data: ByteArray): ByteArray = when (codec) {
CompressionCodec.GZIP -> gzip(data)
}

fun decompress(codec: CompressionCodec, data: ByteArray): ByteArray = when (codec) {
CompressionCodec.GZIP -> gunzip(data)
}

private fun gzip(data: ByteArray): ByteArray {
val out = ByteArrayOutputStream(data.size / 2)
GZIPOutputStream(out).use { it.write(data) }
return out.toByteArray()
}

private fun gunzip(data: ByteArray): ByteArray {
return GZIPInputStream(data.inputStream()).use { it.readBytes() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package io.axoniq.platform.framework.client

import io.axoniq.platform.framework.api.COMPRESSION_METADATA_MIME
import io.axoniq.platform.framework.api.CompressionCodec
import io.axoniq.platform.framework.api.PlatformClientAuthentication
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.rsocket.metadata.CompositeMetadata
import io.rsocket.metadata.CompositeMetadataCodec
import io.rsocket.metadata.TaggingMetadataCodec
import io.rsocket.metadata.WellKnownMimeType
Expand All @@ -44,3 +48,34 @@ fun CompositeByteBuf.addAuthMetadata(auth: PlatformClientAuthentication) {
authMetadata
)
}

/**
* Appends a compression-flag entry to this composite metadata. The peer will read [codec] as a single byte
* and decompress the data buffer accordingly.
*/
fun CompositeByteBuf.addCompressionMetadata(codec: CompressionCodec) {
val content = ByteBufAllocator.DEFAULT.buffer(1).writeByte(codec.id.toInt())
CompositeMetadataCodec.encodeAndAddMetadata(
this,
ByteBufAllocator.DEFAULT,
COMPRESSION_METADATA_MIME,
content
)
}

/**
* Reads the compression flag from RSocket composite metadata, if present. Returns null when the peer did not
* compress this frame or when the metadata buffer is null.
*/
fun ByteBuf?.readCompressionCodec(): CompressionCodec? {
if (this == null || this.readableBytes() == 0) return null
val composite = CompositeMetadata(this, false)
for (entry in composite) {
if (entry.mimeType == COMPRESSION_METADATA_MIME) {
val content = entry.content
if (content.readableBytes() < 1) return null
return CompressionCodec.fromId(content.getByte(content.readerIndex()))
}
}
return null
}
Loading