Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.expression.Expression;
Expand Down Expand Up @@ -238,6 +239,54 @@ public static RedisQueueInboundGatewaySpec queueInboundGateway(String queueName,
return new RedisQueueInboundGatewaySpec(queueName, connectionFactory);
}

/**
* The factory to produce a {@link RedisStreamOutboundChannelAdapterSpec}.
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to build on
* @param streamKey The streamKey of the Redis stream to build on
* @return the {@link RedisStreamOutboundChannelAdapterSpec} instance
*/
public static RedisStreamOutboundChannelAdapterSpec streamOutboundChannelAdapter(
ReactiveRedisConnectionFactory connectionFactory, String streamKey) {

return new RedisStreamOutboundChannelAdapterSpec(connectionFactory, streamKey);
}

/**
* The factory to produce a {@link RedisStreamOutboundChannelAdapterSpec}.
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to build on
* @param streamExpression The streamKey expression of the Redis stream to build on
* @return the {@link RedisStreamOutboundChannelAdapterSpec} instance
*/
public static RedisStreamOutboundChannelAdapterSpec streamOutboundChannelAdapter(
ReactiveRedisConnectionFactory connectionFactory, Expression streamExpression) {

return new RedisStreamOutboundChannelAdapterSpec(connectionFactory, streamExpression);
}

/**
* The factory to produce a {@link RedisStreamOutboundChannelAdapterSpec}.
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to build on
* @param streamFunction The streamKey function of the Redis stream to build on
* @return the {@link RedisStreamOutboundChannelAdapterSpec} instance
*/
public static RedisStreamOutboundChannelAdapterSpec streamOutboundChannelAdapter(
ReactiveRedisConnectionFactory connectionFactory, Function<Message<?>, String> streamFunction) {

return streamOutboundChannelAdapter(connectionFactory, new FunctionExpression<>(streamFunction));
}

/**
* The factory to produce a {@link RedisStreamInboundChannelAdapterSpec}.
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to build on
* @param streamKey The streamKey of the Redis stream to build on
* @return the {@link RedisStreamInboundChannelAdapterSpec} instance
*/
public static RedisStreamInboundChannelAdapterSpec streamInboundChannelAdapter(
ReactiveRedisConnectionFactory connectionFactory, String streamKey) {

return new RedisStreamInboundChannelAdapterSpec(connectionFactory, streamKey);
}

private Redis() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.dsl;

import java.time.Duration;
import java.util.function.Function;

import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;

import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.stream.StreamReceiver;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.redis.inbound.ReactiveRedisStreamMessageProducer;

/**
* A {@link MessageProducerSpec} for a {@link ReactiveRedisStreamMessageProducer}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisStreamInboundChannelAdapterSpec extends
MessageProducerSpec<RedisStreamInboundChannelAdapterSpec, ReactiveRedisStreamMessageProducer> {

protected RedisStreamInboundChannelAdapterSpec(ReactiveRedisConnectionFactory connectionFactory,
String streamKey) {

this.target = new ReactiveRedisStreamMessageProducer(connectionFactory, streamKey);
}

/**
* Specify the offset from which to read message.
* @param readOffset the readOffset
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setReadOffset(ReadOffset)
*/
public RedisStreamInboundChannelAdapterSpec readOffset(ReadOffset readOffset) {
this.target.setReadOffset(readOffset);
return this;
}

/**
* Specify whether extract payload.
* @param extractPayload the extractPayload
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setExtractPayload(boolean)
*/
public RedisStreamInboundChannelAdapterSpec extractPayload(boolean extractPayload) {
this.target.setExtractPayload(extractPayload);
return this;
}

/**
* Specify whether acknowledge message read in the Consumer Group.
* @param autoAck the acknowledge option
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setAutoAck(boolean)
*/
public RedisStreamInboundChannelAdapterSpec autoAck(boolean autoAck) {
this.target.setAutoAck(autoAck);
return this;
}

/**
* Specify the name of the consumer group.
* @param consumerGroup the consumerGroup
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setConsumerGroup(String)
*/
public RedisStreamInboundChannelAdapterSpec consumerGroup(String consumerGroup) {
this.target.setConsumerGroup(consumerGroup);
return this;
}

/**
* Specify the name of the consumer.
* @param consumerName the consumerName
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setConsumerName(String)
*/
public RedisStreamInboundChannelAdapterSpec consumerName(@Nullable String consumerName) {
this.target.setConsumerName(consumerName);
return this;
}

/**
* Specify whether create the consumer group.
* @param createConsumerGroup the createConsumerGroup
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setCreateConsumerGroup(boolean)
*/
public RedisStreamInboundChannelAdapterSpec createConsumerGroup(boolean createConsumerGroup) {
this.target.setCreateConsumerGroup(createConsumerGroup);
return this;
}

/**
* Specify the streamReceiverOptions to customize the {@link StreamReceiver}.
* @param streamReceiverOptions the streamReceiverOptions
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setStreamReceiverOptions(StreamReceiver.StreamReceiverOptions)
*/
public RedisStreamInboundChannelAdapterSpec streamReceiverOptions(
StreamReceiver.@Nullable StreamReceiverOptions<String, ?> streamReceiverOptions) {

this.target.setStreamReceiverOptions(streamReceiverOptions);
return this;
}

/**
* Specify the poll timeout for the BLOCK option.
* @param pollTimeout the pollTimeout
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setPollTimeout(Duration)
*/
public RedisStreamInboundChannelAdapterSpec pollTimeout(Duration pollTimeout) {
this.target.setPollTimeout(pollTimeout);
return this;
}

/**
* Specify the batch size for the COUNT option.
* @param recordsPerPoll the recordsPerPoll
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setBatchSize(int)
*/
public RedisStreamInboundChannelAdapterSpec batchSize(int recordsPerPoll) {
this.target.setBatchSize(recordsPerPoll);
return this;
}

/**
* Specify the resume Function when polling the stream fails.
* @param resumeFunction the resumeFunction
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setOnErrorResume(Function)
*/
public RedisStreamInboundChannelAdapterSpec errorResumeFunction(
Function<? super Throwable, ? extends Publisher<Void>> resumeFunction) {

this.target.setOnErrorResume(resumeFunction);
return this;
}

/**
* Specify the key, hash key and hash value serializer.
* @param pair the pair
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setSerializer(RedisSerializationContext.SerializationPair)
*/
public RedisStreamInboundChannelAdapterSpec serializer(RedisSerializationContext.SerializationPair<?> pair) {
this.target.setSerializer(pair);
return this;
}

/**
* Specify the hash target type.
* @param targetType the targetType
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setTargetType(Class)
*/
public RedisStreamInboundChannelAdapterSpec targetType(Class<?> targetType) {
this.target.setTargetType(targetType);
return this;
}

/**
* Specify the hashMapper.
* @param hashMapper the hashMapper
* @return the spec
* @see ReactiveRedisStreamMessageProducer#setObjectMapper(HashMapper)
*/
public RedisStreamInboundChannelAdapterSpec objectMapper(HashMapper<?, ?, ?> hashMapper) {
this.target.setObjectMapper(hashMapper);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.dsl;

import java.util.function.Function;

import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.dsl.ReactiveMessageHandlerSpec;
import org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandler;
import org.springframework.messaging.Message;

/**
* A {@link ReactiveMessageHandlerSpec} for a {@link ReactiveRedisStreamMessageHandler}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisStreamOutboundChannelAdapterSpec extends
ReactiveMessageHandlerSpec<RedisStreamOutboundChannelAdapterSpec, ReactiveRedisStreamMessageHandler> {

protected RedisStreamOutboundChannelAdapterSpec(ReactiveRedisConnectionFactory connectionFactory,
String streamKey) {

super(new ReactiveRedisStreamMessageHandler(connectionFactory, streamKey));
}

protected RedisStreamOutboundChannelAdapterSpec(ReactiveRedisConnectionFactory connectionFactory,
Expression streamExpression) {

super(new ReactiveRedisStreamMessageHandler(connectionFactory, streamExpression));
}

/**
* Specify the serialization context.
* @param serializationContext the serializationContext
* @return the spec
* @see ReactiveRedisStreamMessageHandler#setSerializationContext(RedisSerializationContext)
*/
public RedisStreamOutboundChannelAdapterSpec serializationContext(RedisSerializationContext<String, ?> serializationContext) {
this.reactiveMessageHandler.setSerializationContext(serializationContext);
return this;
}

/**
* Specify the hashMapper for {@link org.springframework.data.redis.core.ReactiveStreamOperations}.
* @param hashMapper the hashMapper
* @return the spec
* @see ReactiveRedisStreamMessageHandler#setHashMapper(HashMapper)
*/
public RedisStreamOutboundChannelAdapterSpec hashMapper(HashMapper<String, ?, ?> hashMapper) {
this.reactiveMessageHandler.setHashMapper(hashMapper);
return this;
}

/**
* Specify whether extract payload.
* @param extractPayload the extractPayload
* @return the spec
* @see ReactiveRedisStreamMessageHandler#setExtractPayload(boolean)
*/
public RedisStreamOutboundChannelAdapterSpec extractPayload(boolean extractPayload) {
this.reactiveMessageHandler.setExtractPayload(extractPayload);
return this;
}

/**
* Specify the function to create a {@link RedisStreamCommands.XAddOptions}.
* @param addOptionsFunction the addOptionsFunction
* @return the spec
* @see ReactiveRedisStreamMessageHandler#setAddOptionsFunction(Function)
*/
public RedisStreamOutboundChannelAdapterSpec addOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) {
this.reactiveMessageHandler.setAddOptionsFunction(addOptionsFunction);
return this;
}

}
Loading