TestKafkaSaslConfig.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.kafka.security;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.testng.annotations.Test;

import javax.validation.constraints.AssertTrue;

import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static com.facebook.airlift.testing.ValidationAssertions.assertFailsValidation;
import static com.facebook.presto.kafka.security.KafkaKeystoreTruststoreType.JKS;
import static com.facebook.presto.kafka.security.KafkaKeystoreTruststoreType.PKCS12;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL;
import static org.testng.Assert.assertTrue;

public class TestKafkaSaslConfig
{
    private static final String KAFKA_SASL_JAAS_CONFIG = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"testuser\" password=\"testpassword\"";
    private static final String KAFKA_SASL_MECHANISM = "SCRAM-SHA-512";
    private static final String KAFKA_TRUSTSTORE_PATH = "/some/path/to/truststore";
    private static final String KAFKA_TRUSTSTORE_PASSWORD = "my-password";
    private static final String KAFKA_TRUSTSTORE_TYPE = "PKCS12";

    @Test
    public void testDefaults()
    {
        assertRecordedDefaults(recordDefaults(KafkaSaslConfig.class)
                .setSaslJaasConfig(null)
                .setSaslMechanism(null)
                .setTruststorePath(null)
                .setTruststorePassword(null)
                .setTruststoreType(JKS));
    }

    @Test
    public void testExplicitPropertyMappings()
            throws Exception
    {
        String secret = "confluent";
        Path truststorePath = Files.createTempFile("truststore", ".p12");
        writeToFile(truststorePath, secret);

        Map<String, String> properties = new ImmutableMap.Builder<String, String>()
                .put("kafka.sasl.jaas.config", KAFKA_SASL_JAAS_CONFIG)
                .put("kafka.sasl.mechanism", KAFKA_SASL_MECHANISM)
                .put("kafka.truststore.path", truststorePath.toString())
                .put("kafka.truststore.password", KAFKA_TRUSTSTORE_PASSWORD)
                .put("kafka.truststore.type", KAFKA_TRUSTSTORE_TYPE)
                .build();

        KafkaSaslConfig expected = new KafkaSaslConfig()
                .setSaslJaasConfig(KAFKA_SASL_JAAS_CONFIG)
                .setSaslMechanism(KAFKA_SASL_MECHANISM)
                .setTruststorePath(truststorePath.toString())
                .setTruststorePassword(KAFKA_TRUSTSTORE_PASSWORD)
                .setTruststoreType(PKCS12);

        assertFullMapping(properties, expected);
    }

    @Test
    public void testAllConfigPropertiesAreContained()
    {
        KafkaSaslConfig config = new KafkaSaslConfig()
                .setSaslJaasConfig(KAFKA_SASL_JAAS_CONFIG)
                .setSaslMechanism(KAFKA_SASL_MECHANISM)
                .setTruststorePath(KAFKA_TRUSTSTORE_PATH)
                .setTruststorePassword(KAFKA_TRUSTSTORE_PASSWORD)
                .setTruststoreType(JKS);

        Map<String, Object> securityProperties = config.getKafkaSaslProperties();
        // Since security related properties are all passed to the underlying kafka-clients library,
        // the property names must match those expected by kafka-clients
        Map<String, String> map = Stream.of(
                        new AbstractMap.SimpleImmutableEntry<>(SASL_JAAS_CONFIG, KAFKA_SASL_JAAS_CONFIG),
                        new AbstractMap.SimpleImmutableEntry<>(SASL_MECHANISM, KAFKA_SASL_MECHANISM),
                        new AbstractMap.SimpleImmutableEntry<>(SSL_TRUSTSTORE_LOCATION_CONFIG, KAFKA_TRUSTSTORE_PATH),
                        new AbstractMap.SimpleImmutableEntry<>(SSL_TRUSTSTORE_PASSWORD_CONFIG, KAFKA_TRUSTSTORE_PASSWORD),
                        new AbstractMap.SimpleImmutableEntry<>(SSL_TRUSTSTORE_TYPE_CONFIG, JKS.name()),
                        new AbstractMap.SimpleImmutableEntry<>(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        assertTrue(Maps.difference(map, securityProperties).areEqual());
    }

    @Test
    public void testFailOnMissingSaslJaasConfigWithSaslMechanism()
    {
        KafkaSaslConfig config = new KafkaSaslConfig();
        config.setSaslMechanism(KAFKA_SASL_MECHANISM);

        assertFailsValidation(
                config,
                "saslJaasConfigValid",
                "kafka.sasl.jaas.config must be set when kafka.sasl.mechanism is given",
                AssertTrue.class);
    }

    @Test
    public void testFailOnMissingTruststorePasswordWithTruststoreLocationSet()
            throws Exception
    {
        String secret = "confluent";
        Path truststorePath = Files.createTempFile("truststore", ".p12");

        writeToFile(truststorePath, secret);

        KafkaSaslConfig config = new KafkaSaslConfig();
        config.setTruststorePath(truststorePath.toString());

        assertFailsValidation(
                config,
                "truststorePasswordValid",
                "kafka.ssl.truststore.password must be set when kafka.ssl.truststore.location is given",
                AssertTrue.class);
    }

    private void writeToFile(Path filepath, String content)
            throws IOException
    {
        try (FileWriter writer = new FileWriter(filepath.toFile())) {
            writer.write(content);
        }
    }
}