KeepAliveCache.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.Closeable;
import java.io.IOException;
import java.util.Stack;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.http.HttpClientConnection;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS;
/**
* Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
* instance of FileSystem has its own KeepAliveCache.
* <p>
* Why this implementation is required in comparison to {@link org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
* connection-pooling:
* <ol>
* <li>PoolingHttpClientConnectionManager heuristic caches all the reusable connections it has created.
* JDK's implementation only caches a limited number of connections. The limit is given by JVM system
* property "http.maxConnections". If there is no system-property, it defaults to 5.</li>
* <li>In PoolingHttpClientConnectionManager, it expects the application to provide `setMaxPerRoute` and `setMaxTotal`,
* which the implementation uses as the total number of connections it can create. For application using ABFS, it is not
* feasible to provide a value in the initialisation of the connectionManager. JDK's implementation has no cap on the
* number of connections it can create.</li>
* </ol>
*/
class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
implements
Closeable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(KeepAliveCache.class);
/**
* Scheduled timer that evicts idle connections.
*/
private final transient Timer timer;
/**
* Task provided to the timer that owns eviction logic.
*/
private final transient TimerTask timerTask;
/**
* Flag to indicate if the cache is closed.
*/
private final AtomicBoolean isClosed = new AtomicBoolean(false);
/**
* Counter to keep track of the number of KeepAliveCache instances created.
*/
private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
/**
* Maximum number of connections that can be cached.
*/
private final int maxConn;
/**
* Time-to-live for an idle connection.
*/
private final long connectionIdleTTL;
/**
* Flag to indicate if the eviction thread is paused.
*/
private final AtomicBoolean isPaused = new AtomicBoolean(false);
/**
* Account name for which the cache is created. To be used only in exception
* messages.
*/
private final String accountNamePath;
@VisibleForTesting
synchronized void pauseThread() {
isPaused.set(true);
}
@VisibleForTesting
synchronized void resumeThread() {
isPaused.set(false);
}
/**
* @return connectionIdleTTL.
*/
@VisibleForTesting
public long getConnectionIdleTTL() {
return connectionIdleTTL;
}
/**
* Creates an {@link KeepAliveCache} instance using filesystem's configuration.
* <p>
* The size of the cache is determined by the configuration
* {@value org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE}.
* If the configuration is not set, the system-property {@value org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants#HTTP_MAX_CONN_SYS_PROP}.
* If the system-property is not set or set to 0, the default value
* {@value org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations#DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS} is used.
* <p>
* This schedules an eviction thread to run every connectionIdleTTL milliseconds
* given by the configuration {@link AbfsConfiguration#getMaxApacheHttpClientConnectionIdleTime()}.
* @param abfsConfiguration Configuration of the filesystem.
*/
KeepAliveCache(AbfsConfiguration abfsConfiguration) {
accountNamePath = abfsConfiguration.getAccountName();
this.timer = new Timer("abfs-kac-" + KAC_COUNTER.getAndIncrement(), true);
int sysPropMaxConn = Integer.parseInt(System.getProperty(HTTP_MAX_CONN_SYS_PROP, "0"));
final int defaultMaxConn;
if (sysPropMaxConn > 0) {
defaultMaxConn = sysPropMaxConn;
} else {
defaultMaxConn = DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS;
}
this.maxConn = abfsConfiguration.getInt(
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE,
defaultMaxConn);
this.connectionIdleTTL
= abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
this.timerTask = new TimerTask() {
@Override
public void run() {
if (isPaused.get() || isClosed.get()) {
return;
}
evictIdleConnection();
}
};
timer.schedule(timerTask, 0, connectionIdleTTL);
}
/**
* Iterate over the cache and evict the idle connections. An idle connection is
* one that has been in the cache for more than connectionIdleTTL milliseconds.
*/
synchronized void evictIdleConnection() {
long currentTime = System.currentTimeMillis();
int i;
for (i = 0; i < size(); i++) {
KeepAliveEntry e = elementAt(i);
if ((currentTime - e.idleStartTime) > connectionIdleTTL
|| e.httpClientConnection.isStale()) {
HttpClientConnection hc = e.httpClientConnection;
closeHttpClientConnection(hc);
} else {
break;
}
}
subList(0, i).clear();
}
/**
* Safe close of the HttpClientConnection.
*
* @param hc HttpClientConnection to be closed
*/
private void closeHttpClientConnection(final HttpClientConnection hc) {
try {
hc.close();
} catch (IOException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Close failed for connection: {}", hc, ex);
}
}
}
/**
* Close all connections in cache and cancel the eviction timer.
*/
@Override
public synchronized void close() {
boolean closed = isClosed.getAndSet(true);
if (closed) {
return;
}
closeInternal();
}
@VisibleForTesting
void closeInternal() {
timerTask.cancel();
timer.purge();
while (!empty()) {
KeepAliveEntry e = pop();
closeHttpClientConnection(e.httpClientConnection);
}
}
/**
* <p>
* Gets the latest added HttpClientConnection from the cache. The returned connection
* is non-stale and has been in the cache for less than connectionIdleTTL milliseconds.
* <p>
* The cache is checked from the top of the stack. If the connection is stale or has been
* in the cache for more than connectionIdleTTL milliseconds, it is closed and the next
* connection is checked. Once a valid connection is found, it is returned.
* @return HttpClientConnection: if a valid connection is found, else null.
* @throws IOException if the cache is closed.
*/
public synchronized HttpClientConnection get()
throws IOException {
if (isClosed.get()) {
throw new ClosedIOException(accountNamePath, KEEP_ALIVE_CACHE_CLOSED);
}
if (empty()) {
return null;
}
HttpClientConnection hc = null;
long currentTime = System.currentTimeMillis();
do {
KeepAliveEntry e = pop();
if ((currentTime - e.idleStartTime) > connectionIdleTTL
|| e.httpClientConnection.isStale()) {
closeHttpClientConnection(e.httpClientConnection);
} else {
hc = e.httpClientConnection;
}
} while ((hc == null) && (!empty()));
return hc;
}
/**
* Puts the HttpClientConnection in the cache. If the size of cache is equal to
* maxConn, the oldest connection is closed and removed from the cache, which
* will make space for the new connection. If the cache is closed or of zero size,
* the connection is closed and not added to the cache.
*
* @param httpClientConnection HttpClientConnection to be cached
* @return true if the HttpClientConnection is added in active cache, false otherwise.
*/
public synchronized boolean put(HttpClientConnection httpClientConnection) {
if (isClosed.get() || maxConn == 0) {
closeHttpClientConnection(httpClientConnection);
return false;
}
if (size() == maxConn) {
closeHttpClientConnection(get(0).httpClientConnection);
subList(0, 1).clear();
}
KeepAliveEntry entry = new KeepAliveEntry(httpClientConnection,
System.currentTimeMillis());
push(entry);
return true;
}
@Override
public synchronized boolean equals(final Object o) {
return super.equals(o);
}
@Override
public synchronized int hashCode() {
return super.hashCode();
}
/**
* Entry data-structure in the cache.
*/
static class KeepAliveEntry {
/**HttpClientConnection in the cache entry.*/
private final HttpClientConnection httpClientConnection;
/**Time at which the HttpClientConnection was added to the cache.*/
private final long idleStartTime;
KeepAliveEntry(HttpClientConnection hc, long idleStartTime) {
this.httpClientConnection = hc;
this.idleStartTime = idleStartTime;
}
}
}