FunctionBatchingConversionTests.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rabbit;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.function.context.config.MessageConverterHelper;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinder;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.converter.MessageConversionException;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
*/
public class FunctionBatchingConversionTests {
@SuppressWarnings("unchecked")
@Test
void testBatchHeadersMatchingPayload() {
TestChannelBinderConfiguration.applicationContextRunner(BatchFunctionConfiguration.class)
.withPropertyValues("spring.cloud.stream.function.definition=func",
"spring.cloud.stream.bindings.func-in-0.consumer.batch-mode=true",
"spring.cloud.stream.rabbit.bindings.func-in-0.consumer.enable-batching=true")
.run(context -> {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
List<byte[]> payloads = List.of("hello".getBytes(StandardCharsets.UTF_8),
"{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8),
"{\"name\":\"Julien\"}".getBytes(StandardCharsets.UTF_8),
"{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8),
"hello".getBytes(StandardCharsets.UTF_8));
List<Map<String, String>> amqpBatchHeaders = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, String> batchHeaders = new LinkedHashMap<>();
batchHeaders.put("amqp_receivedDeliveryMode", "PERSISTENT");
batchHeaders.put("index", String.valueOf(i));
amqpBatchHeaders.add(batchHeaders);
}
var message = MessageBuilder.withPayload(payloads)
.setHeader("amqp_batchedHeaders", amqpBatchHeaders)
.setHeader("deliveryAttempt", new AtomicInteger(1)).build();
inputDestination.send(message);
Message<byte[]> resultMessage = outputDestination.receive();
JacksonMapper mapper = context.getBean(JacksonMapper.class);
List<?> resultPayloads = mapper.fromJson(resultMessage.getPayload(), List.class);
assertThat(resultPayloads).hasSize(3);
List<Map<String, String>> amqpBatchedHeaders = (List<Map<String, String>>) resultMessage
.getHeaders().get("amqp_batchedHeaders");
assertThat(amqpBatchedHeaders).hasSize(resultPayloads.size());
assertThat(amqpBatchedHeaders.get(0).get("index")).isEqualTo("1");
assertThat(amqpBatchedHeaders.get(1).get("index")).isEqualTo("2");
assertThat(amqpBatchedHeaders.get(2).get("index")).isEqualTo("3");
context.stop();
});
}
@Test
void testBatchHeadersForcingFatalFailureOnConversiionException() {
TestChannelBinderConfiguration
.applicationContextRunner(BatchFunctionConfigurationWithAdditionalConversionHelper.class)
.withPropertyValues("spring.cloud.stream.function.definition=func",
"spring.cloud.stream.bindings.func-in-0.consumer.batch-mode=true",
"spring.cloud.stream.bindings.func-in-0.consumer.max-attempts=1",
"spring.cloud.stream.rabbit.bindings.func-in-0.consumer.enable-batching=true")
.run(context -> {
InputDestination inputDestination = context.getBean(InputDestination.class);
List<byte[]> payloads = List.of("hello".getBytes(StandardCharsets.UTF_8),
"{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8),
"{\"name\":\"Julien\"}".getBytes(StandardCharsets.UTF_8),
"{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8),
"hello".getBytes(StandardCharsets.UTF_8));
List<Map<String, String>> amqpBatchHeaders = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, String> batchHeaders = new LinkedHashMap<>();
batchHeaders.put("amqp_receivedDeliveryMode", "PERSISTENT");
batchHeaders.put("index", String.valueOf(i));
amqpBatchHeaders.add(batchHeaders);
}
var message = MessageBuilder.withPayload(payloads)
.setHeader("amqp_batchedHeaders", amqpBatchHeaders)
.setHeader("deliveryAttempt", new AtomicInteger(1)).build();
inputDestination.send(message);
TestChannelBinder binder = context.getBean(TestChannelBinder.class);
assertThat(binder.getLastError().getPayload()).isInstanceOf(MessageHandlingException.class);
MessageHandlingException exception = (MessageHandlingException) binder.getLastError().getPayload();
assertThat(exception.getCause()).isInstanceOf(MessageConversionException.class);
context.stop();
});
}
@Configuration
@EnableAutoConfiguration
public static class BatchFunctionConfiguration {
@Bean
public Function<Message<List<Person>>, Message<List<Person>>> func() {
return x -> {
return x;
};
}
}
@Configuration
@EnableAutoConfiguration
public static class BatchFunctionConfigurationWithAdditionalConversionHelper {
@Bean
public MessageConverterHelper helper() {
return new MessageConverterHelper() {
public boolean shouldFailIfCantConvert(Message<?> message) {
return true;
}
};
}
@Bean
public Function<Message<List<Person>>, Message<List<Person>>> func() {
return x -> {
return x;
};
}
}
static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return "name: " + name;
}
}
}