StreamBridgeCloseChannelsTests.java
/*
* Copyright 2023-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test demonstrates that closeChannelsGracefully() closes producers
*/
@EmbeddedKafka
@DirtiesContext
class StreamBridgeCloseChannelsTests {
@Test
void testCloseChannelsGracefullyClosesProducersButKeepsCache() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestConfig.class)
.web(WebApplicationType.NONE)
.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
// 1. Get the method
Method closeMethod = StreamBridge.class.getDeclaredMethod("closeChannelsGracefully");
closeMethod.setAccessible(true);
// 2. Get the cache
Field cacheField = StreamBridge.class.getDeclaredField("channelCache");
cacheField.setAccessible(true);
Map<String, MessageChannel> cache = (Map<String, MessageChannel>) cacheField.get(streamBridge);
streamBridge.send("test-topic", "message-1");
assertThat(cache).hasSize(1);
MessageChannel originalChannel = cache.get("test-topic");
closeMethod.invoke(streamBridge);
assertThat(cache).hasSize(1);
MessageChannel channelAfterClose = cache.get("test-topic");
assertThat(channelAfterClose).isSameAs(originalChannel);
boolean sent = streamBridge.send("test-topic", "message-2");
assertThat(sent).isTrue();
MessageChannel channelAfterResend = cache.get("test-topic");
assertThat(channelAfterResend).isSameAs(originalChannel);
streamBridge.send("test-topic-2", "message-3");
assertThat(cache).hasSize(2); // Two channels
closeMethod.invoke(streamBridge);
assertThat(cache).hasSize(2);
assertThat(cache).containsKeys("test-topic", "test-topic-2");
}
}
@Test
void testOnApplicationEventClearsCache() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestConfig.class)
.web(WebApplicationType.NONE)
.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
// Get the cache
Field cacheField = StreamBridge.class.getDeclaredField("channelCache");
cacheField.setAccessible(true);
Map<String, MessageChannel> cache = (Map<String, MessageChannel>) cacheField.get(streamBridge);
// Create channels
streamBridge.send("topic-1", "data");
streamBridge.send("topic-2", "data");
streamBridge.send("topic-3", "data");
assertThat(cache).hasSize(3);
// Simulate what onApplicationEvent does
// 1. Call closeChannelsGracefully()
Method closeMethod = StreamBridge.class.getDeclaredMethod("closeChannelsGracefully");
closeMethod.setAccessible(true);
closeMethod.invoke(streamBridge);
// 2. Clear cache (what onApplicationEvent does after)
cache.clear();
assertThat(cache).isEmpty();
// Can create new channels
streamBridge.send("new-topic", "data");
assertThat(cache).hasSize(1);
}
}
@Test
void testTheActualProblemAndSolution() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestConfig.class)
.web(WebApplicationType.NONE)
.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
Field cacheField = StreamBridge.class.getDeclaredField("channelCache");
cacheField.setAccessible(true);
Map<String, MessageChannel> cache = (Map<String, MessageChannel>) cacheField.get(streamBridge);
// Example first usage
streamBridge.send("daily-metrics", "data");
streamBridge.send("user-events", "data");
assertThat(cache).hasSize(2);
// Example second usage
streamBridge.send("payment-events", "data");
assertThat(cache).hasSize(3);
}
}
@EnableAutoConfiguration
@Configuration
public static class TestConfig {
}
}