ConnectionFactory.java
package redis.clients.jedis;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Supplier;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.authentication.AuthXManager;
import redis.clients.jedis.authentication.JedisAuthenticationException;
import redis.clients.jedis.authentication.AuthXEventListener;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConnection;
import redis.clients.jedis.exceptions.JedisException;
/**
* PoolableObjectFactory custom impl.
*/
public class ConnectionFactory implements PooledObjectFactory<Connection> {
public static class Builder {
private JedisClientConfig clientConfig;
private Connection.Builder connectionBuilder;
private JedisSocketFactory jedisSocketFactory;
private Cache cache;
private HostAndPort hostAndPort;
// Fluent API methods (preferred)
public Builder clientConfig(JedisClientConfig clientConfig) {
this.clientConfig = clientConfig;
return this;
}
public Builder connectionBuilder(Connection.Builder connectionBuilder) {
this.connectionBuilder = connectionBuilder;
return this;
}
public Builder socketFactory(JedisSocketFactory jedisSocketFactory) {
this.jedisSocketFactory = jedisSocketFactory;
return this;
}
public Builder cache(Cache cache) {
this.cache = cache;
return this;
}
public Builder hostAndPort(HostAndPort hostAndPort) {
this.hostAndPort = hostAndPort;
return this;
}
public Connection.Builder getConnectionBuilder() {
return connectionBuilder;
}
public JedisSocketFactory getJedisSocketFactory() {
return jedisSocketFactory;
}
public JedisClientConfig getClientConfig() {
return clientConfig;
}
public Cache getCache() {
return cache;
}
public ConnectionFactory build() {
withDefaults();
return new ConnectionFactory(this);
}
private Builder withDefaults() {
if (jedisSocketFactory == null) {
this.jedisSocketFactory = createDefaultSocketFactory();
}
if (connectionBuilder == null) {
this.connectionBuilder = createDefaultConnectionBuilder();
}
return this;
}
private JedisSocketFactory createDefaultSocketFactory() {
if (clientConfig == null) {
clientConfig = DefaultJedisClientConfig.builder().build();
}
if (hostAndPort == null) {
throw new IllegalStateException("HostAndPort is required when no socketFactory is provided");
}
return new DefaultJedisSocketFactory(hostAndPort, clientConfig);
}
private Connection.Builder createDefaultConnectionBuilder() {
Connection.Builder connBuilder = cache == null ? Connection.builder() : CacheConnection.builder(cache);
connBuilder.socketFactory(jedisSocketFactory).clientConfig(clientConfig);
return connBuilder;
}
}
public static Builder builder() {
return new Builder();
}
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);
private final JedisClientConfig clientConfig;
private Supplier<Connection> objectMaker;
private Connection.Builder connectionBuilder;
private AuthXEventListener authXEventListener;
public ConnectionFactory(final HostAndPort hostAndPort) {
this(builder().hostAndPort(hostAndPort).withDefaults());
}
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(builder().hostAndPort(hostAndPort).clientConfig(clientConfig).withDefaults());
}
@Experimental
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache csCache) {
this(builder().hostAndPort(hostAndPort).clientConfig(clientConfig).cache(csCache).withDefaults());
}
public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
this(builder().socketFactory(jedisSocketFactory).clientConfig(clientConfig).withDefaults());
}
public ConnectionFactory(Builder builder) {
this.clientConfig = builder.getClientConfig();
this.connectionBuilder = builder.getConnectionBuilder();
initAuthXManager();
}
private void initAuthXManager() {
AuthXManager authXManager = clientConfig.getAuthXManager();
if (authXManager == null) {
this.objectMaker = () -> build();
this.authXEventListener = AuthXEventListener.NOOP_LISTENER;
} else {
this.objectMaker = () -> (Connection) authXManager.addConnection(build());
this.authXEventListener = authXManager.getListener();
authXManager.start();
}
}
private Connection build() {
return connectionBuilder.build();
}
@Override
public void activateObject(PooledObject<Connection> pooledConnection) throws Exception {
// what to do ??
}
@Override
public void destroyObject(PooledObject<Connection> pooledConnection) throws Exception {
final Connection jedis = pooledConnection.getObject();
if (jedis.isConnected()) {
try {
jedis.close();
} catch (RuntimeException e) {
logger.debug("Error while close", e);
}
}
}
@Override
public PooledObject<Connection> makeObject() throws Exception {
try {
Connection jedis = objectMaker.get();
return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
logger.debug("Error while makeObject", je);
throw je;
}
}
@Override
public void passivateObject(PooledObject<Connection> pooledConnection) throws Exception {
// TODO maybe should select db 0? Not sure right now.
Connection jedis = pooledConnection.getObject();
reAuthenticate(jedis);
}
@Override
public boolean validateObject(PooledObject<Connection> pooledConnection) {
final Connection jedis = pooledConnection.getObject();
try {
// check HostAndPort ??
if (!jedis.isConnected()) {
return false;
}
reAuthenticate(jedis);
return jedis.ping();
} catch (final Exception e) {
logger.warn("Error while validating pooled Connection object.", e);
return false;
}
}
private void reAuthenticate(Connection jedis) throws Exception {
try {
String result = jedis.reAuthenticate();
if (result != null && !result.equals("OK")) {
String msg = "Re-authentication failed with server response: " + result;
Exception failedAuth = new JedisAuthenticationException(msg);
logger.error(failedAuth.getMessage(), failedAuth);
authXEventListener.onConnectionAuthenticationError(failedAuth);
return;
}
} catch (Exception e) {
logger.error("Error while re-authenticating connection", e);
authXEventListener.onConnectionAuthenticationError(e);
throw e;
}
}
}