KafkaSaslConfig.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka.security;
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.ConfigSecuritySensitive;
import com.google.common.collect.ImmutableMap;
import javax.validation.constraints.AssertTrue;
import java.util.Map;
import java.util.Optional;
import static com.facebook.presto.kafka.security.KafkaKeystoreTruststoreType.JKS;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL;
/**
* Manages Kafka SASL authentication and encryption between clients and brokers.
*/
public class KafkaSaslConfig
{
private String saslJaasConfig;
private String saslMechanism;
private String truststorePath;
private String truststorePassword;
private KafkaKeystoreTruststoreType truststoreType = JKS;
public Optional<String> getSaslJaasConfig()
{
return Optional.ofNullable(saslJaasConfig);
}
@Config("kafka.sasl.jaas.config")
@ConfigDescription("The JAAS config of the SASL authentication")
public KafkaSaslConfig setSaslJaasConfig(String saslJaasConfig)
{
this.saslJaasConfig = saslJaasConfig;
return this;
}
public Optional<String> getSaslMechanism()
{
return Optional.ofNullable(saslMechanism);
}
@Config("kafka.sasl.mechanism")
@ConfigDescription("The authentication mechanism of the SASL")
@ConfigSecuritySensitive
public KafkaSaslConfig setSaslMechanism(String saslMechanism)
{
this.saslMechanism = saslMechanism;
return this;
}
public Optional<String> getTruststorePath()
{
return Optional.ofNullable(truststorePath);
}
@Config("kafka.truststore.path")
@ConfigDescription("The path of the trust store file")
public KafkaSaslConfig setTruststorePath(String truststorePath)
{
this.truststorePath = truststorePath;
return this;
}
public Optional<String> getTruststorePassword()
{
return Optional.ofNullable(truststorePassword);
}
@Config("kafka.truststore.password")
@ConfigSecuritySensitive
@ConfigDescription("The password for the trust store file")
public KafkaSaslConfig setTruststorePassword(String truststorePassword)
{
this.truststorePassword = truststorePassword;
return this;
}
public Optional<KafkaKeystoreTruststoreType> getTruststoreType()
{
return Optional.ofNullable(truststoreType);
}
@Config("kafka.truststore.type")
@ConfigDescription("The file format of the trust store file")
public KafkaSaslConfig setTruststoreType(KafkaKeystoreTruststoreType truststoreType)
{
this.truststoreType = truststoreType;
return this;
}
public Map<String, Object> getKafkaSaslProperties()
{
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
getSaslMechanism().ifPresent(v -> properties.put(SASL_MECHANISM, v));
getSaslJaasConfig().ifPresent(v -> properties.put(SASL_JAAS_CONFIG, v));
getTruststorePath().ifPresent(v -> properties.put(SSL_TRUSTSTORE_LOCATION_CONFIG, v));
getTruststorePassword().ifPresent(v -> properties.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, v));
getTruststoreType().ifPresent(v -> properties.put(SSL_TRUSTSTORE_TYPE_CONFIG, v.name()));
properties.put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name());
return properties.buildOrThrow();
}
@AssertTrue(message = "kafka.sasl.jaas.config must be set when kafka.sasl.mechanism is given")
public boolean isSaslJaasConfigValid()
{
return !getSaslMechanism().isPresent() || getSaslJaasConfig().isPresent();
}
@AssertTrue(message = "kafka.ssl.truststore.password must be set when kafka.ssl.truststore.location is given")
public boolean isTruststorePasswordValid()
{
return !getTruststorePath().isPresent() || getTruststorePassword().isPresent();
}
}