RedisJedisManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.redis;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeManager;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Map;

import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static javax.net.ssl.TrustManagerFactory.getDefaultAlgorithm;

/**
 * Manages connections to the Redis nodes
 */
public class RedisJedisManager
{
    private static final Logger log = Logger.get(RedisJedisManager.class);

    private static final String TLS_PROTOCOL = "TLS";
    private static final int JEDIS_CONN_TIMEOUT = 2000;
    private static final int JEDIS_SO_TIMEOUT = 2000;
    private static final int JEDIS_MIN_IDLE_CONNECTIONS = 1;
    private static final int JEDIS_MAX_IDLE_CONNECTIONS = 5;

    private final LoadingCache<HostAddress, JedisPool> jedisPoolCache;

    private final RedisConnectorConfig redisConnectorConfig;
    private final JedisPoolConfig jedisPoolConfig;

    @Inject
    RedisJedisManager(
            RedisConnectorConfig redisConnectorConfig,
            NodeManager nodeManager)
    {
        this.redisConnectorConfig = requireNonNull(redisConnectorConfig, "redisConfig is null");
        this.jedisPoolCache = CacheBuilder.newBuilder().build(CacheLoader.from(this::createJedisPool));
        this.jedisPoolConfig = createJedisPoolConfig();
    }

    @PreDestroy
    public void tearDown()
    {
        for (Map.Entry<HostAddress, JedisPool> entry : jedisPoolCache.asMap().entrySet()) {
            try {
                entry.getValue().destroy();
            }
            catch (Exception e) {
                log.warn(e, "While destroying JedisPool %s:", entry.getKey());
            }
        }
    }

    public RedisConnectorConfig getRedisConnectorConfig()
    {
        return redisConnectorConfig;
    }

    public JedisPool getJedisPool(HostAddress host)
    {
        requireNonNull(host, "host is null");
        return jedisPoolCache.getUnchecked(host);
    }

    /**
     * Creates a new JedisPool for the specified host.
     * Chooses between TLS or non-TLS configuration based on redisConnectorConfig.
     */
    private JedisPool createJedisPool(HostAddress host)
    {
        boolean isTlsEnabled = redisConnectorConfig.isTlsEnabled();
        SSLContext sslContext = null;

        if (isTlsEnabled) {
            KeyStore trustStore = loadTrustStore();
            sslContext = createSslContext(trustStore);
        }

        return buildJedisPool(host, isTlsEnabled, sslContext);
    }
    /**
     * Creates SSLContext initialized with the given truststore.
     */
    private SSLContext createSslContext(KeyStore trustStore)
    {
        if (trustStore == null) {
            throw new IllegalStateException("Truststore must not be null for TLS connections");
        }

        try {
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(getDefaultAlgorithm());
            tmf.init(trustStore);

            SSLContext sslContext = SSLContext.getInstance(TLS_PROTOCOL);
            sslContext.init(null, tmf.getTrustManagers(), null);

            return sslContext;
        }
        catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
            throw new RuntimeException("Failed to initialize SSLContext", e);
        }
    }

    private JedisPoolConfig createJedisPoolConfig()
    {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMinIdle(JEDIS_MIN_IDLE_CONNECTIONS);
        config.setMaxTotal(JEDIS_MAX_IDLE_CONNECTIONS);
        return config;
    }
    /**
     * Loads the truststore containing Redis server certificate.
     * Returns null if truststore path is not configured.
     */
    private KeyStore loadTrustStore()
    {
        if (redisConnectorConfig.getTruststorePath() == null) {
            log.info("No truststore path configured, skipping TLS truststore loading");
            return null;
        }

        try {
            KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
            try (InputStream in = Files.newInputStream(redisConnectorConfig.getTruststorePath().toPath())) {
                trustStore.load(null, null);
                CertificateFactory cf = CertificateFactory.getInstance("X.509");
                X509Certificate cert = (X509Certificate) cf.generateCertificate(in);
                trustStore.setCertificateEntry("redis-server", cert);
            }
            log.info("Loaded truststore from %s", redisConnectorConfig.getTruststorePath());
            return trustStore;
        }
        catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException e) {
            throw new RuntimeException("Failed to load truststore", e);
        }
    }

    private JedisPool buildJedisPool(HostAddress host, boolean useTls, SSLContext sslContext)
    {
        log.info("Creating new %s JedisPool for %s", useTls ? "TLS" : "non-TLS", host);

        return new JedisPool(
                jedisPoolConfig,
                host.getHostText(),
                host.getPort(),
                toIntExact(redisConnectorConfig.getRedisConnectTimeout().toMillis()),
                JEDIS_SO_TIMEOUT,
                JEDIS_CONN_TIMEOUT,
                redisConnectorConfig.getRedisUser(),
                redisConnectorConfig.getRedisPassword(),
                redisConnectorConfig.getRedisDataBaseIndex(),
                null,
                useTls,
                useTls ? sslContext.getSocketFactory() : null,
                null,
                null);
    }
}