KeepAliveCache.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.http.HttpClientConnection;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
/**
* Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
* instance of FileSystem has its own KeepAliveCache.
* <p>
* Why this implementation is required in comparison to {@link org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
* connection-pooling:
* <ol>
* <li>PoolingHttpClientConnectionManager heuristic caches all the reusable connections it has created.
* JDK's implementation only caches a limited number of connections. The limit is given by JVM system
* property "http.maxConnections". If there is no system-property, it defaults to 5.</li>
* <li>In PoolingHttpClientConnectionManager, it expects the application to provide setMaxPerRoute and setMaxTotal,
* which the implementation uses as the total number of connections it can create. For application using ABFS, it is not
* feasible to provide a value in the initialisation of the connectionManager. JDK's implementation has no cap on the
* number of connections it can create.</li>
* </ol>
*/
class KeepAliveCache extends LinkedBlockingDeque<HttpClientConnection>
implements Closeable {
/**
* Logger instance.
*/
private static final Logger LOG = LoggerFactory.getLogger(
KeepAliveCache.class);
/**
* Flag to indicate if the cache is closed.
*/
private final AtomicBoolean isClosed = new AtomicBoolean(false);
/**
* Maximum number of connections that can be cached.
*/
private final int maxCacheConnections;
/**
* Account name for which the cache is created. To be used only in exception
* messages.
*/
private final String accountNamePath;
/**
* Executor server to trigger connection refresh from cache manager.
*/
private ExecutorService singleThreadPool = null;
/**
* Executor service to trigger async cache warmup.
*/
private ExecutorService fixedThreadPool = null;
/**
* Creates an {@link KeepAliveCache} instance using filesystem's configuration.
* <p>
* The size of the cache is determined by the configuration
* {@value org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE}.
* If the configuration is not set, the default value is
* {@value org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations#DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE}.
* <p>.
*/
KeepAliveCache(AbfsConfiguration abfsConfiguration) {
this.accountNamePath =
abfsConfiguration.getAccountName();
this.maxCacheConnections =
abfsConfiguration.getApacheMaxCacheSize();
// Initialise singleThreadPool if cache refresh is enabled.
if (abfsConfiguration.getApacheCacheRefreshCount() > 0) {
this.singleThreadPool = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName("CacheRefreshThread");
thread.setDaemon(true);
return thread;
});
}
// Initialise fixedThreadPool if cache warmup or cache refresh is enabled.
if (abfsConfiguration.getApacheCacheWarmupCount() > 0
|| abfsConfiguration.getApacheCacheRefreshCount() > 0) {
this.fixedThreadPool = Executors.newFixedThreadPool(Math.min(5,
Math.max(abfsConfiguration.getApacheCacheWarmupCount(),
abfsConfiguration.getApacheCacheRefreshCount())), r -> {
Thread thread = new Thread(r);
thread.setName("AsyncCacheConnectionThread");
thread.setDaemon(true);
return thread;
});
}
}
/**
* Safe close of the HttpClientConnection.
*
* @param hc HttpClientConnection to be closed
*/
private void closeHttpClientConnection(final HttpClientConnection hc) {
try {
hc.close();
} catch (IOException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Close failed for connection: {}", hc, ex);
}
}
}
/**
* Close all connections in cache.
*/
@Override
public void close() {
boolean closed = isClosed.getAndSet(true);
if (closed) {
return;
}
closeInternal();
if (singleThreadPool != null && !singleThreadPool.isShutdown()) {
singleThreadPool.shutdownNow();
}
if (fixedThreadPool != null && !fixedThreadPool.isShutdown()) {
fixedThreadPool.shutdownNow();
}
}
/**
* @return true if the cache is closed, false otherwise.
*/
public boolean getIsClosed() {
return isClosed.get();
}
/**
* @return ExecutorService to trigger connection refresh from cache manager.
*/
public ExecutorService getSingleThreadPool() {
return singleThreadPool;
}
/**
* @return ExecutorService to trigger async create connections and put in cache.
*/
public ExecutorService getFixedThreadPool() {
return fixedThreadPool;
}
/**
* Internal close method to close all connections in the cache.
* This method does not change the state of isClosed.
* It is expected that the caller of this method has set the isClosed flag.
*/
@VisibleForTesting
void closeInternal() {
while (size() != 0) {
closeHttpClientConnection(pollFirst());
}
}
/**
* Gets the oldest added HttpClientConnection from the cache. The returned connection
* is open.
* The cache follows the FIFO strategy. If the connection is not open, it will
* be closed and the next connection is checked. Once a valid connection is found,
* it is returned.
* @return HttpClientConnection: if a valid connection is found, else null.
* @throws IOException if the cache is closed.
*/
public HttpClientConnection get() throws IOException {
if (getIsClosed()) {
LOG.debug("Attempt to get connection from closed cache for account: {}",
accountNamePath);
throw new ClosedIOException(accountNamePath, KEEP_ALIVE_CACHE_CLOSED);
}
HttpClientConnection httpClientConnection;
while ((httpClientConnection = pollFirst()) != null) {
if (!httpClientConnection.isOpen() || httpClientConnection.isStale()) {
closeHttpClientConnection(httpClientConnection);
continue;
}
return httpClientConnection;
}
LOG.debug("No valid connection found in cache for account: {}",
accountNamePath);
return null;
}
/**
* Puts the HttpClientConnection in the cache. If the size of cache is equal to
* maxConn, the oldest connection is closed and removed from the cache, which
* will make space for the new connection. If the cache is closed or of zero size,
* the connection is closed and not added to the cache.
*
* @param conn HttpClientConnection to be cached
* @return true if the HttpClientConnection is added in active cache, false otherwise.
*/
public boolean add(HttpClientConnection conn) {
if (conn == null) {
LOG.warn(
"Attempt to add null HttpClientConnection to the cache for account: {}",
accountNamePath);
return false;
}
if (getIsClosed() || getMaxCacheConnections() <= 0
|| !conn.isOpen() || conn.isStale()) {
LOG.debug(
"Not adding connection to cache. closed: {}, "
+ "maxCacheSize: {}, isOpen: {}, isStale: {} for account: {}",
getIsClosed(), getMaxCacheConnections(), conn.isOpen(),
conn.isStale(), accountNamePath);
closeHttpClientConnection(conn);
return false;
}
while (size() >= getMaxCacheConnections()) {
HttpClientConnection httpClientConnection = pollFirst();
if (httpClientConnection != null) {
closeHttpClientConnection(httpClientConnection);
} else {
break;
}
}
return offerLast(conn);
}
/**
* @return maximum number of connections that can be cached.
*/
@VisibleForTesting
public int getMaxCacheConnections() {
return maxCacheConnections;
}
/**
* @return String representation of the KeepAliveCache instance.
*/
@Override
public String toString() {
return String.format("KeepAliveCache[closed=%s, size=%d, max=%d]",
getIsClosed(), size(), getMaxCacheConnections());
}
}