KafkaBinderConfigurationPropertiesTest.java
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import com.sun.net.httpserver.HttpServer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.assertj.core.util.Files;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.core.io.ClassPathResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
class KafkaBinderConfigurationPropertiesTest {
@Test
@SuppressWarnings("unchecked")
void defaultRequiredAcksIsAll() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
assertThat(kafkaBinderConfigurationProperties.getRequiredAcks()).isEqualTo("all");
}
@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setGroupId("group1");
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setEnableAutoCommit(true);
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
@SuppressWarnings("unchecked")
void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
@SuppressWarnings("unchecked")
void certificateFilesAreConvertedToAbsolutePathsFromClassPathResources() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("ssl.truststore.location", "classpath:testclient.truststore");
configuration.put("ssl.keystore.location", "classpath:testclient.keystore");
kafkaBinderConfigurationProperties.getKafkaConnectionString();
assertThat(configuration.get("ssl.truststore.location"))
.isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.truststore").toString());
assertThat(configuration.get("ssl.keystore.location"))
.isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.keystore").toString());
deleteTempCertFiles();
}
@Test
@SuppressWarnings("unchecked")
void certificateFilesAreConvertedToAbsolutePathsFromHttpResources() throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress("localhost", 5869), 0);
createContextWithCertFileHandler(server, "testclient.truststore");
createContextWithCertFileHandler(server, "testclient.keystore");
server.setExecutor(null); // creates a default executor
server.start();
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("ssl.truststore.location", "http://localhost:5869/testclient.truststore");
configuration.put("ssl.keystore.location", "http://localhost:5869/testclient.keystore");
configuration.put("schema.registry.ssl.truststore.location", "http://localhost:5869/testclient.truststore");
configuration.put("schema.registry.ssl.keystore.location", "http://localhost:5869/testclient.keystore");
kafkaBinderConfigurationProperties.getKafkaConnectionString();
assertThat(configuration.get("ssl.truststore.location"))
.isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.truststore").toString());
assertThat(configuration.get("ssl.keystore.location"))
.isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.keystore").toString());
assertThat(configuration.get("schema.registry.ssl.truststore.location"))
.isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.truststore").toString());
assertThat(configuration.get("schema.registry.ssl.keystore.location"))
.isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.keystore").toString());
deleteTempCertFiles();
server.stop(0);
}
@Test
@SuppressWarnings("unchecked")
void certificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("ssl.truststore.location", "classpath:testclient.truststore");
configuration.put("ssl.keystore.location", "classpath:testclient.keystore");
kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target");
kafkaBinderConfigurationProperties.getKafkaConnectionString();
assertThat(configuration.get("ssl.truststore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
assertThat(configuration.get("ssl.keystore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
}
@Test
@SuppressWarnings("unchecked")
void certificateFilesAreMovedForSchemaRegistryConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore");
configuration.put("schema.registry.ssl.keystore.location", "classpath:testclient.keystore");
kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target");
kafkaBinderConfigurationProperties.getKafkaConnectionString();
assertThat(configuration.get("schema.registry.ssl.truststore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
assertThat(configuration.get("schema.registry.ssl.keystore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
final Map<String, Object> mergedProducerConfiguration = kafkaBinderConfigurationProperties.mergedProducerConfiguration();
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.truststore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.keystore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
final Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.truststore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.keystore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
}
@Test
@SuppressWarnings("unchecked")
void schemaRegistryPropertiesPropagatedToMergedProducerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("schema.registry.url", "https://localhost:8081,https://localhost:8082");
configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore");
configuration.put("schema.registry.ssl.keystore.location", "classpath:testclient.keystore");
configuration.put("schema.registry.ssl.keystore.password", "generated");
configuration.put("schema.registry.ssl.truststore.password", "generated");
configuration.put("schema.registry.ssl.key.password", "generated");
kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target");
kafkaBinderConfigurationProperties.getKafkaConnectionString();
final Map<String, Object> mergedProducerConfiguration = kafkaBinderConfigurationProperties.mergedProducerConfiguration();
assertThat(mergedProducerConfiguration.get("schema.registry.url")).isEqualTo("https://localhost:8081,https://localhost:8082");
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.keystore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.truststore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.keystore.password")).isEqualTo("generated");
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.truststore.password")).isEqualTo("generated");
assertThat(mergedProducerConfiguration.get("schema.registry.ssl.key.password")).isEqualTo("generated");
final Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration.get("schema.registry.url")).isEqualTo("https://localhost:8081,https://localhost:8082");
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.keystore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.truststore.location")).isEqualTo(
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.keystore.password")).isEqualTo("generated");
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.truststore.password")).isEqualTo("generated");
assertThat(mergedConsumerConfiguration.get("schema.registry.ssl.key.password")).isEqualTo("generated");
}
@Test
@SuppressWarnings("unchecked")
public void testEmptyLocationsAreIgnored() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties, mock(ObjectProvider.class));
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
configuration.put("schema.registry.ssl.truststore.location", "");
configuration.put("schema.registry.ssl.keystore.location", "");
kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target");
kafkaBinderConfigurationProperties.getKafkaConnectionString();
assertThat(configuration.get("schema.registry.ssl.truststore.location")).isEmpty();
assertThat(configuration.get("schema.registry.ssl.keystore.location")).isEmpty();
}
private void createContextWithCertFileHandler(HttpServer server, String path) {
server.createContext("/" + path, exchange -> {
ClassPathResource ts = new ClassPathResource(path);
byte[] response = ts.getContentAsByteArray();
exchange.sendResponseHeaders(200, response.length);
OutputStream os = exchange.getResponseBody();
os.write(response);
os.close();
});
}
private void deleteTempCertFiles() {
Paths.get(System.getProperty("java.io.tmpdir"), "testclient.truststore").toFile().delete();
Paths.get(System.getProperty("java.io.tmpdir"), "testclient.keystore").toFile().delete();
}
}