ClientManagerImpl.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_CLIENT_CREATION;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.util.Preconditions.checkState;
import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
/**
* Client manager for on-demand creation of S3 clients,
* with parallelized close of them in {@link #serviceStop()}.
* Updates {@link org.apache.hadoop.fs.s3a.Statistic#STORE_CLIENT_CREATION}
* to track count and duration of client creation.
*/
public class ClientManagerImpl
extends AbstractService
implements ClientManager {
public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);
/**
* Client factory to invoke.
*/
private final S3ClientFactory clientFactory;
/**
* Client factory to invoke for unencrypted client.
*/
private final S3ClientFactory unencryptedClientFactory;
/**
* Parameters to create sync/async clients.
*/
private final S3ClientFactory.S3ClientCreationParameters clientCreationParameters;
/**
* Duration tracker factory for creation.
*/
private final DurationTrackerFactory durationTrackerFactory;
/**
* Core S3 client.
*/
private final LazyAutoCloseableReference<S3Client> s3Client;
/** Async client is used for transfer manager. */
private final LazyAutoCloseableReference<S3AsyncClient> s3AsyncClient;
/**
* Unencrypted S3 client.
* This is used for unencrypted operations when CSE is enabled with V1 compatibility.
*/
private final LazyAutoCloseableReference<S3Client> unencryptedS3Client;
/** Transfer manager. */
private final LazyAutoCloseableReference<S3TransferManager> transferManager;
/**
* Constructor.
* <p>
* This does not create any clients.
* <p>
* It does disable noisy logging from the S3 Transfer Manager.
* @param clientFactory client factory to invoke
* @param unencryptedClientFactory client factory to invoke
* @param clientCreationParameters creation parameters.
* @param durationTrackerFactory duration tracker.
*/
public ClientManagerImpl(
final S3ClientFactory clientFactory,
final S3ClientFactory unencryptedClientFactory,
final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
final DurationTrackerFactory durationTrackerFactory) {
super("ClientManager");
this.clientFactory = requireNonNull(clientFactory);
this.unencryptedClientFactory = unencryptedClientFactory;
this.clientCreationParameters = requireNonNull(clientCreationParameters);
this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
this.s3Client = new LazyAutoCloseableReference<>(createS3Client());
this.s3AsyncClient = new LazyAutoCloseableReference<>(createAsyncClient());
this.unencryptedS3Client = new LazyAutoCloseableReference<>(createUnencryptedS3Client());
this.transferManager = new LazyAutoCloseableReference<>(createTransferManager());
// fix up SDK logging.
AwsSdkWorkarounds.prepareLogging();
}
/**
* Create the function to create the S3 client.
* @return a callable which will create the client.
*/
private CallableRaisingIOE<S3Client> createS3Client() {
return trackDurationOfOperation(
durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(),
() -> clientFactory.createS3Client(getUri(), clientCreationParameters));
}
/**
* Create the function to create the S3 Async client.
* @return a callable which will create the client.
*/
private CallableRaisingIOE<S3AsyncClient> createAsyncClient() {
return trackDurationOfOperation(
durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(),
() -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters));
}
/**
* Create the function to create the unencrypted S3 client.
* @return a callable which will create the client.
*/
private CallableRaisingIOE<S3Client> createUnencryptedS3Client() {
return trackDurationOfOperation(
durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(),
() -> unencryptedClientFactory.createS3Client(getUri(), clientCreationParameters));
}
/**
* Create the function to create the Transfer Manager.
* @return a callable which will create the component.
*/
private CallableRaisingIOE<S3TransferManager> createTransferManager() {
return () -> {
final S3AsyncClient asyncClient = s3AsyncClient.eval();
return trackDuration(durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(), () ->
clientFactory.createS3TransferManager(asyncClient));
};
}
@Override
public synchronized S3Client getOrCreateS3Client() throws IOException {
checkNotClosed();
return s3Client.eval();
}
/**
* Get the S3Client, raising a failure to create as an UncheckedIOException.
* @return the S3 client
* @throws UncheckedIOException failure to create the client.
*/
@Override
public synchronized S3Client getOrCreateS3ClientUnchecked() throws UncheckedIOException {
checkNotClosed();
return s3Client.get();
}
@Override
public synchronized S3AsyncClient getOrCreateAsyncClient() throws IOException {
checkNotClosed();
return s3AsyncClient.eval();
}
/**
* Get the AsyncS3Client, raising a failure to create as an UncheckedIOException.
* @return the S3 client
* @throws UncheckedIOException failure to create the client.
*/
@Override
public synchronized S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException {
checkNotClosed();
return s3Client.get();
}
/**
* Get or create an unencrypted S3 client.
* This is used for unencrypted operations when CSE is enabled with V1 compatibility.
* @return unencrypted S3 client
* @throws IOException on any failure
*/
@Override
public synchronized S3Client getOrCreateUnencryptedS3Client() throws IOException {
checkNotClosed();
return unencryptedS3Client.eval();
}
@Override
public synchronized S3TransferManager getOrCreateTransferManager() throws IOException {
checkNotClosed();
return transferManager.eval();
}
@Override
protected void serviceStop() throws Exception {
// queue the closures.
List<Future<Object>> l = new ArrayList<>();
l.add(closeAsync(transferManager));
l.add(closeAsync(s3AsyncClient));
l.add(closeAsync(s3Client));
l.add(closeAsync(unencryptedS3Client));
// once all are queued, await their completion;
// exceptions will be swallowed.
awaitAllFutures(l);
super.serviceStop();
}
/**
* Check that the client manager is not closed.
* @throws IllegalStateException if it is closed.
*/
private void checkNotClosed() {
checkState(!isInState(STATE.STOPPED), "Client manager is closed");
}
/**
* Get the URI of the filesystem.
* @return URI to use when creating clients.
*/
public URI getUri() {
return clientCreationParameters.getPathUri();
}
/**
* Queue closing a closeable, logging any exception, and returning null
* to use in when awaiting a result.
* @param reference closeable.
* @param <T> type of closeable
* @return null
*/
private <T extends AutoCloseable> CompletableFuture<Object> closeAsync(
LazyAutoCloseableReference<T> reference) {
if (!reference.isSet()) {
// no-op
return completedFuture(null);
}
return supplyAsync(() -> {
try {
reference.close();
} catch (Exception e) {
LOG.warn("Failed to close {}", reference, e);
}
return null;
});
}
@Override
public String toString() {
return "ClientManagerImpl{" +
"state=" + getServiceState() +
", s3Client=" + s3Client +
", s3AsyncClient=" + s3AsyncClient +
", unencryptedS3Client=" + unencryptedS3Client +
", transferManager=" + transferManager +
'}';
}
}