AbfsLease.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_FUTURE_EXISTS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation.ONE_THOUSAND;
/**
* AbfsLease manages an Azure blob lease. It acquires an infinite lease on instantiation and
* releases the lease when free() is called. Use it to prevent writes to the blob by other
* processes that don't have the lease.
*
* Creating a new Lease object blocks the caller until the Azure blob lease is acquired. It will
* retry a fixed number of times before failing if there is a problem acquiring the lease.
*
* Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease
* will need to be called before another client will be able to write to the file.
*/
public final class AbfsLease {
private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class);
// Number of retries for acquiring lease
static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7;
// Retry interval for acquiring lease in secs
static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10;
private final AbfsClient client;
private final String path;
private final TracingContext tracingContext;
// Lease status variables
private volatile boolean leaseFreed;
private volatile String leaseID = null;
private volatile Throwable exception = null;
private volatile int acquireRetryCount = 0;
private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
private final long leaseRefreshDuration;
private final int leaseRefreshDurationInSeconds;
private final Timer timer;
private LeaseTimerTask leaseTimerTask;
private final boolean isAsync;
public static class LeaseException extends AzureBlobFileSystemException {
public LeaseException(Throwable t) {
super(ERR_ACQUIRING_LEASE + ": " + t, t);
}
public LeaseException(String s) {
super(s);
}
}
/**
* Create a new lease object and acquire a lease on the given path.
*
* @param client AbfsClient
* @param path Path to acquire lease on
* @param isAsync Whether to acquire lease asynchronously
* @param leaseRefreshDuration Duration in milliseconds to renew the lease
* @param eTag ETag of the file
* @param tracingContext Tracing context
* @throws AzureBlobFileSystemException if the lease cannot be acquired
*/
public AbfsLease(AbfsClient client, String path,
final boolean isAsync, final long leaseRefreshDuration,
final String eTag, TracingContext tracingContext) throws AzureBlobFileSystemException {
this(client, path, isAsync, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES,
DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL, leaseRefreshDuration, eTag, tracingContext);
}
/**
* Create a new lease object and acquire a lease on the given path.
*
* @param client AbfsClient
* @param path Path to acquire lease on
* @param isAsync Whether to acquire lease asynchronously
* @param acquireMaxRetries Maximum number of retries to acquire lease
* @param acquireRetryInterval Retry interval in seconds to acquire lease
* @param leaseRefreshDuration Duration in milliseconds to renew the lease
* @param eTag ETag of the file
* @param tracingContext Tracing context
* @throws AzureBlobFileSystemException if the lease cannot be acquired
*/
@VisibleForTesting
public AbfsLease(AbfsClient client, String path, final boolean isAsync, int acquireMaxRetries,
int acquireRetryInterval, final long leaseRefreshDuration,
final String eTag,
TracingContext tracingContext) throws AzureBlobFileSystemException {
this.leaseFreed = false;
this.client = client;
this.path = path;
this.tracingContext = tracingContext;
this.leaseRefreshDuration = leaseRefreshDuration;
this.leaseRefreshDurationInSeconds = (int) leaseRefreshDuration / ONE_THOUSAND;
this.isAsync = isAsync;
if (isAsync && client.getNumLeaseThreads() < 1) {
throw new LeaseException(ERR_NO_LEASE_THREADS);
}
// Try to get the lease a specified number of times, else throw an error
RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS);
this.timer = new Timer(
String.format("lease-refresh-timer-%s", path), true);
acquireLease(retryPolicy, 0, acquireRetryInterval, 0, eTag,
new TracingContext(tracingContext));
while (leaseID == null && exception == null) {
try {
future.get();
} catch (Exception e) {
LOG.debug("Got exception waiting for acquire lease future. Checking if lease ID or "
+ "exception have been set", e);
}
}
if (exception != null) {
LOG.error("Failed to acquire lease on {}", path);
throw new LeaseException(exception);
}
LOG.debug("Acquired lease {} on {}", leaseID, path);
}
/**
* Acquire a lease on the given path.
*
* @param retryPolicy Retry policy
* @param numRetries Number of retries
* @param retryInterval Retry interval in seconds
* @param delay Delay in seconds
* @param eTag ETag of the file
* @param tracingContext Tracing context
* @throws LeaseException if the lease cannot be acquired
*/
private void acquireLease(RetryPolicy retryPolicy, int numRetries,
int retryInterval, long delay, final String eTag, TracingContext tracingContext)
throws LeaseException {
LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries);
if (future != null && !future.isDone()) {
throw new LeaseException(ERR_LEASE_FUTURE_EXISTS);
}
FutureCallback<AbfsRestOperation> acquireCallback = new FutureCallback<AbfsRestOperation>() {
@Override
public void onSuccess(@Nullable AbfsRestOperation op) {
leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
if (leaseRefreshDuration != INFINITE_LEASE_DURATION) {
leaseTimerTask = new LeaseTimerTask(client, path,
leaseID, tracingContext);
timer.scheduleAtFixedRate(leaseTimerTask, leaseRefreshDuration / 2,
leaseRefreshDuration / 2);
}
LOG.debug("Acquired lease {} on {}", leaseID, path);
}
@Override
public void onFailure(Throwable throwable) {
try {
if (RetryPolicy.RetryAction.RetryDecision.RETRY
== retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable);
acquireRetryCount++;
acquireLease(retryPolicy, numRetries + 1, retryInterval,
retryInterval, eTag, tracingContext);
} else {
exception = throwable;
}
} catch (Exception e) {
exception = throwable;
}
}
};
if (!isAsync) {
try {
AbfsRestOperation op = client.acquireLease(path,
leaseRefreshDurationInSeconds, eTag, tracingContext);
acquireCallback.onSuccess(op);
return;
} catch (AzureBlobFileSystemException ex) {
acquireCallback.onFailure(ex);
}
}
future = client.schedule(() -> client.acquireLease(path,
INFINITE_LEASE_DURATION, eTag, tracingContext),
delay, TimeUnit.SECONDS);
client.addCallback(future, acquireCallback);
}
/**
* Cancel future and free the lease. If an exception occurs while releasing the lease, the error
* will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to
* be called before another client will be able to write to the file.
*/
public void free() {
if (leaseFreed) {
return;
}
try {
LOG.debug("Freeing lease: path {}, lease id {}", path, leaseID);
if (future != null && !future.isDone()) {
future.cancel(true);
}
cancelTimer();
TracingContext tracingContext = new TracingContext(this.tracingContext);
tracingContext.setOperation(FSOperationType.RELEASE_LEASE);
client.releaseLease(path, leaseID, tracingContext);
} catch (IOException e) {
LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}",
leaseID, path, e.getMessage());
} finally {
// Even if releasing the lease fails (e.g. because the file was deleted),
// make sure to record that we freed the lease
leaseFreed = true;
LOG.debug("Freed lease {} on {}", leaseID, path);
}
}
/**
* Cancel the lease renewal timer.
* Also purge the lease refresh timer.
*/
public void cancelTimer() {
if (leaseTimerTask != null) {
leaseTimerTask.cancel();
}
timer.purge();
}
/**
* Check if the lease has been freed.
*
* @return true if the lease has been freed
*/
public boolean isFreed() {
return leaseFreed;
}
/**
* Get the lease ID.
*
* @return lease ID
*/
public String getLeaseID() {
return leaseID;
}
/**
* Get the number of times the lease was retried.
*
* @return number of acquired retry count
*/
@VisibleForTesting
public int getAcquireRetryCount() {
return acquireRetryCount;
}
/**
* Get Tracing Context.
*
* @return TracingContext tracing context
*/
@VisibleForTesting
public TracingContext getTracingContext() {
return tracingContext;
}
/**
* Class to track lease renewal.
* If the lease is not renewed, the lease will expire and the file will be available for write.
*/
private static class LeaseTimerTask extends TimerTask {
private final AbfsClient client;
private final String path;
private final String leaseID;
private final TracingContext tracingContext;
LeaseTimerTask(AbfsClient client, String path, String leaseID, TracingContext tracingContext) {
this.client = client;
this.path = path;
this.leaseID = leaseID;
this.tracingContext = tracingContext;
}
@Override
public void run() {
try {
client.renewLease(path, leaseID, tracingContext);
} catch (Exception e) {
LOG.error("Failed to renew lease on {}", path, e);
}
}
}
}