TestKafkaPlugin.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.kafka;

import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.testing.TestingConnectorContext;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import static com.facebook.airlift.testing.Assertions.assertInstanceOf;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.testng.Assert.assertNotNull;

@Test
public class TestKafkaPlugin
{
    @Test
    public void testSpinup()
    {
        KafkaPlugin plugin = new KafkaPlugin();

        ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
        assertInstanceOf(factory, KafkaConnectorFactory.class);

        Connector c = factory.create(
                "test-connector",
                ImmutableMap.<String, String>builder()
                        .put("kafka.table-names", "test")
                        .put("kafka.nodes", "localhost:9092")
                        .build(),
                new TestingConnectorContext());
        assertNotNull(c);
    }

    @Test
    public void testSslSpinup()
            throws IOException
    {
        KafkaPlugin plugin = new KafkaPlugin();

        ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
        assertThat(factory).isInstanceOf(KafkaConnectorFactory.class);

        String secret = "confluent";
        Path truststorePath = Files.createTempFile("truststore", ".jks");

        writeToFile(truststorePath, secret);

        Connector connector = factory.create(
                "test-connector",
                ImmutableMap.<String, String>builder()
                        .put("kafka.table-names", "test")
                        .put("kafka.nodes", "localhost:9092")
                        .put("kafka.security-protocol", "SASL_SSL")
                        .put("kafka.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"testuser\" password=\"testpassword\"")
                        .put("kafka.sasl.mechanism", "SCRAM-SHA-512")
                        .put("kafka.truststore.type", "JKS")
                        .put("kafka.truststore.path", truststorePath.toString())
                        .put("kafka.truststore.password", "truststore-password")
                        .buildOrThrow(),
                new TestingConnectorContext());
        assertThat(connector).isNotNull();
        connector.shutdown();
    }

    @Test
    public void testConfigResourceSpinup()
            throws IOException
    {
        KafkaPlugin plugin = new KafkaPlugin();

        ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
        assertThat(factory).isInstanceOf(KafkaConnectorFactory.class);

        String nativeContent = "security.protocol=SSL";
        Path nativeKafkaResourcePath = Files.createTempFile("native_kafka", ".properties");
        writeToFile(nativeKafkaResourcePath, nativeContent);

        Connector connector = factory.create(
                "test-connector",
                ImmutableMap.<String, String>builder()
                        .put("kafka.table-names", "test")
                        .put("kafka.nodes", "localhost:9092")
                        .put("kafka.config.resources", nativeKafkaResourcePath.toString())
                        .buildOrThrow(),
                new TestingConnectorContext());
        assertThat(connector).isNotNull();
        connector.shutdown();
    }

    @Test
    public void testResourceConfigMissingFileSpindown()
    {
        KafkaPlugin plugin = new KafkaPlugin();

        ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
        assertThat(factory).isInstanceOf(KafkaConnectorFactory.class);

        assertThatThrownBy(() -> factory.create(
                "test-connector",
                ImmutableMap.<String, String>builder()
                        .put("kafka.table-names", "test")
                        .put("kafka.nodes", "localhost:9092")
                        .put("kafka.security-protocol", "SASL_PLAINTEXT")
                        .put("kafka.config.resources", "/not/a/real/path")
                        .buildOrThrow(),
                new TestingConnectorContext()))
                .hasMessageContaining("IllegalArgumentException: File does not exist: /not/a/real/path");
    }

    private void writeToFile(Path filepath, String content)
            throws IOException
    {
        try (FileWriter writer = new FileWriter(filepath.toFile())) {
            writer.write(content);
        }
    }
}