GoogleCloudStorage.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.gs;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.Math.toIntExact;
import static org.apache.hadoop.fs.gs.GoogleCloudStorageExceptions.createFileNotFoundException;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.gax.paging.Page;
import com.google.auth.Credentials;
import com.google.cloud.storage.*;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.thirdparty.com.google.common.io.BaseEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A wrapper around <a href="https://github.com/googleapis/java-storage">Google cloud storage
* client</a>.
*/
class GoogleCloudStorage {
static final Logger LOG = LoggerFactory.getLogger(GoogleCloudStorage.class);
static final List<Storage.BlobField> BLOB_FIELDS =
ImmutableList.of(
Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING,
Storage.BlobField.CONTENT_TYPE, Storage.BlobField.CRC32C, Storage.BlobField.GENERATION,
Storage.BlobField.METADATA, Storage.BlobField.MD5HASH, Storage.BlobField.METAGENERATION,
Storage.BlobField.NAME, Storage.BlobField.SIZE, Storage.BlobField.TIME_CREATED,
Storage.BlobField.UPDATED);
static final CreateObjectOptions EMPTY_OBJECT_CREATE_OPTIONS =
CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
.setEnsureEmptyObjectsMetadataMatch(false)
.build();
private final Storage storage;
private final GoogleHadoopFileSystemConfiguration configuration;
/**
* Having an instance of gscImpl to redirect calls to Json client while new client implementation
* is in WIP.
*/
GoogleCloudStorage(GoogleHadoopFileSystemConfiguration configuration, Credentials credentials)
throws IOException {
this.storage = createStorage(configuration.getProjectId(), credentials);
this.configuration = configuration;
}
private static Storage createStorage(String projectId, Credentials credentials) {
StorageOptions.Builder builder = StorageOptions.newBuilder();
if (projectId != null) {
builder.setProjectId(projectId);
}
return builder.setCredentials(credentials).build().getService();
}
WritableByteChannel create(final StorageResourceId resourceId, final CreateFileOptions options)
throws IOException {
LOG.trace("create({})", resourceId);
checkArgument(resourceId.isStorageObject(), "Expected full StorageObject id, got %s",
resourceId);
// Update resourceId if generationId is missing
StorageResourceId resourceIdWithGeneration = resourceId;
if (!resourceId.hasGenerationId()) {
resourceIdWithGeneration =
new StorageResourceId(resourceId.getBucketName(), resourceId.getObjectName(),
getWriteGeneration(resourceId, options.isOverwriteExisting()));
}
return new GoogleCloudStorageClientWriteChannel(storage, resourceIdWithGeneration, options);
}
/**
* Gets the object generation for a write operation
*
* <p>making getItemInfo call even if overwrite is disabled to fail fast in case file is existing.
*
* @param resourceId object for which generation info is requested
* @param overwrite whether existing object should be overwritten
* @return the generation of the object
* @throws IOException if the object already exists and cannot be overwritten
*/
private long getWriteGeneration(StorageResourceId resourceId, boolean overwrite)
throws IOException {
LOG.trace("getWriteGeneration({}, {})", resourceId, overwrite);
GoogleCloudStorageItemInfo info = getItemInfo(resourceId);
if (!info.exists()) {
return 0L;
}
if (info.exists() && overwrite) {
long generation = info.getContentGeneration();
checkState(generation != 0, "Generation should not be 0 for an existing item");
return generation;
}
throw new FileAlreadyExistsException(String.format("Object %s already exists.", resourceId));
}
void close() {
try {
storage.close();
} catch (Exception e) {
LOG.warn("Error occurred while closing the storage client", e);
}
}
GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws IOException {
LOG.trace("getItemInfo({})", resourceId);
// Handle ROOT case first.
if (resourceId.isRoot()) {
return GoogleCloudStorageItemInfo.ROOT_INFO;
}
GoogleCloudStorageItemInfo itemInfo = null;
if (resourceId.isBucket()) {
Bucket bucket = getBucket(resourceId.getBucketName());
if (bucket != null) {
itemInfo = createItemInfoForBucket(resourceId, bucket);
} else {
LOG.debug("getBucket({}): not found", resourceId.getBucketName());
}
} else {
Blob blob = getBlob(resourceId);
if (blob != null) {
itemInfo = createItemInfoForBlob(resourceId, blob);
} else {
LOG.debug("getObject({}): not found", resourceId);
}
}
if (itemInfo == null) {
itemInfo = GoogleCloudStorageItemInfo.createNotFound(resourceId);
}
LOG.debug("getItemInfo: {}", itemInfo);
return itemInfo;
}
/**
* Gets the bucket with the given name.
*
* @param bucketName name of the bucket to get
* @return the bucket with the given name or null if bucket not found
* @throws IOException if the bucket exists but cannot be accessed
*/
@Nullable
private Bucket getBucket(String bucketName) throws IOException {
LOG.debug("getBucket({})", bucketName);
checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty");
try {
return storage.get(bucketName);
} catch (StorageException e) {
if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.NOT_FOUND) {
return null;
}
throw new IOException("Error accessing Bucket " + bucketName, e);
}
}
private static GoogleCloudStorageItemInfo createItemInfoForBlob(StorageResourceId resourceId,
Blob blob) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(blob != null, "object must not be null");
checkArgument(resourceId.isStorageObject(),
"resourceId must be a StorageObject. resourceId: %s", resourceId);
checkArgument(resourceId.getBucketName().equals(blob.getBucket()),
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'",
resourceId.getBucketName(), blob.getBucket());
checkArgument(resourceId.getObjectName().equals(blob.getName()),
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'",
resourceId.getObjectName(), blob.getName());
Map<String, byte[]> decodedMetadata =
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata());
byte[] md5Hash = null;
byte[] crc32c = null;
if (!isNullOrEmpty(blob.getCrc32c())) {
crc32c = BaseEncoding.base64().decode(blob.getCrc32c());
}
if (!isNullOrEmpty(blob.getMd5())) {
md5Hash = BaseEncoding.base64().decode(blob.getMd5());
}
return GoogleCloudStorageItemInfo.createObject(resourceId,
blob.getCreateTimeOffsetDateTime() == null ?
0 :
blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getUpdateTimeOffsetDateTime() == null ?
0 :
blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getSize() == null ? 0 : blob.getSize(), blob.getContentType(),
blob.getContentEncoding(), decodedMetadata,
blob.getGeneration() == null ? 0 : blob.getGeneration(),
blob.getMetageneration() == null ? 0 : blob.getMetageneration(),
new VerificationAttributes(md5Hash, crc32c));
}
static Map<String, byte[]> decodeMetadata(Map<String, String> metadata) {
return Maps.transformValues(metadata, GoogleCloudStorage::decodeMetadataValues);
}
@Nullable
private static byte[] decodeMetadataValues(String value) {
try {
return BaseEncoding.base64().decode(value);
} catch (IllegalArgumentException iae) {
LOG.error("Failed to parse base64 encoded attribute value {}", value, iae);
return null;
}
}
/**
* Gets the object with the given resourceId.
*
* @param resourceId identifies a StorageObject
* @return the object with the given name or null if object not found
* @throws IOException if the object exists but cannot be accessed
*/
@Nullable
Blob getBlob(StorageResourceId resourceId) throws IOException {
checkArgument(resourceId.isStorageObject(), "Expected full StorageObject id, got %s",
resourceId);
String bucketName = resourceId.getBucketName();
String objectName = resourceId.getObjectName();
Blob blob;
try {
blob = storage.get(BlobId.of(bucketName, objectName),
Storage.BlobGetOption.fields(BLOB_FIELDS.toArray(new Storage.BlobField[0])));
} catch (StorageException e) {
throw new IOException("Error accessing " + resourceId, e);
}
return blob;
}
private static GoogleCloudStorageItemInfo createItemInfoForBucket(StorageResourceId resourceId,
Bucket bucket) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(bucket != null, "bucket must not be null");
checkArgument(resourceId.isBucket(), "resourceId must be a Bucket. resourceId: %s", resourceId);
checkArgument(resourceId.getBucketName().equals(bucket.getName()),
"resourceId.getBucketName() must equal bucket.getName(): '%s' vs '%s'",
resourceId.getBucketName(), bucket.getName());
return GoogleCloudStorageItemInfo.createBucket(resourceId,
bucket.asBucketInfo().getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
bucket.asBucketInfo().getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
bucket.getLocation(),
bucket.getStorageClass() == null ? null : bucket.getStorageClass().name());
}
private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
long generationId = blob.getGeneration() == null ? 0L : blob.getGeneration();
StorageResourceId resourceId =
new StorageResourceId(blob.getBucket(), blob.getName(), generationId);
return createItemInfoForBlob(resourceId, blob);
}
void createBucket(String bucketName, CreateBucketOptions options) throws IOException {
LOG.trace("createBucket({})", bucketName);
checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty");
checkNotNull(options, "options must not be null");
BucketInfo.Builder bucketInfoBuilder =
BucketInfo.newBuilder(bucketName).setLocation(options.getLocation());
if (options.getStorageClass() != null) {
bucketInfoBuilder.setStorageClass(
StorageClass.valueOfStrict(options.getStorageClass().toUpperCase()));
}
if (options.getTtl() != null) {
bucketInfoBuilder.setLifecycleRules(
Collections.singletonList(
new BucketInfo.LifecycleRule(
BucketInfo.LifecycleRule.LifecycleAction.newDeleteAction(),
BucketInfo.LifecycleRule.LifecycleCondition.newBuilder()
.setAge(toIntExact(options.getTtl().toDays()))
.build())));
}
try {
storage.create(bucketInfoBuilder.build());
} catch (StorageException e) {
if (ErrorTypeExtractor.bucketAlreadyExists(e)) {
throw (FileAlreadyExistsException)
new FileAlreadyExistsException(String.format("Bucket '%s' already exists.", bucketName))
.initCause(e);
}
throw new IOException(e);
}
}
void createEmptyObject(StorageResourceId resourceId) throws IOException {
LOG.trace("createEmptyObject({})", resourceId);
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
createEmptyObject(resourceId, EMPTY_OBJECT_CREATE_OPTIONS);
}
void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options)
throws IOException {
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
try {
createEmptyObjectInternal(resourceId, options);
} catch (StorageException e) {
if (canIgnoreExceptionForEmptyObject(e, resourceId, options)) {
LOG.info(
"Ignoring exception of type {}; verified object already exists with desired state.",
e.getClass().getSimpleName());
LOG.trace("Ignored exception while creating empty object: {}", resourceId, e);
} else {
if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.ALREADY_EXISTS) {
throw (FileAlreadyExistsException)
new FileAlreadyExistsException(
String.format("Object '%s' already exists.", resourceId)
).initCause(e);
}
throw new IOException(e);
}
}
}
GoogleCloudStorageItemInfo composeObjects(
List<StorageResourceId> sources, StorageResourceId destination, CreateObjectOptions options)
throws IOException {
LOG.trace("composeObjects({}, {}, {})", sources, destination, options);
for (StorageResourceId inputId : sources) {
if (!destination.getBucketName().equals(inputId.getBucketName())) {
throw new IOException(
String.format(
"Bucket doesn't match for source '%s' and destination '%s'!",
inputId, destination));
}
}
Storage.ComposeRequest request =
Storage.ComposeRequest.newBuilder()
.addSource(
sources.stream().map(StorageResourceId::getObjectName).collect(Collectors.toList()))
.setTarget(
BlobInfo.newBuilder(destination.getBucketName(), destination.getObjectName())
.setContentType(options.getContentType())
.setContentEncoding(options.getContentEncoding())
.setMetadata(encodeMetadata(options.getMetadata()))
.build())
.setTargetOptions(
Storage.BlobTargetOption.generationMatch(
destination.hasGenerationId()
? destination.getGenerationId()
: getWriteGeneration(destination, true)))
.build();
Blob composedBlob;
try {
composedBlob = storage.compose(request);
} catch (StorageException e) {
throw new IOException(e);
}
GoogleCloudStorageItemInfo compositeInfo = createItemInfoForBlob(destination, composedBlob);
LOG.trace("composeObjects() done, returning: {}", compositeInfo);
return compositeInfo;
}
/**
* Helper to check whether an empty object already exists with the expected metadata specified in
* {@code options}, to be used to determine whether it's safe to ignore an exception that was
* thrown when trying to create the object, {@code exceptionOnCreate}.
*/
private boolean canIgnoreExceptionForEmptyObject(
StorageException exceptionOnCreate, StorageResourceId resourceId, CreateObjectOptions options)
throws IOException {
ErrorTypeExtractor.ErrorType errorType = ErrorTypeExtractor.getErrorType(exceptionOnCreate);
if (shouldBackoff(resourceId, errorType)) {
GoogleCloudStorageItemInfo existingInfo;
Duration maxWaitTime = Duration.ofSeconds(3); // TODO: make this configurable
BackOff backOff =
!maxWaitTime.isZero() && !maxWaitTime.isNegative()
? new ExponentialBackOff.Builder()
.setMaxElapsedTimeMillis(toIntExact(maxWaitTime.toMillis()))
.setMaxIntervalMillis(500)
.setInitialIntervalMillis(100)
.setMultiplier(1.5)
.setRandomizationFactor(0.15)
.build()
: BackOff.STOP_BACKOFF;
long nextSleep = 0L;
do {
if (nextSleep > 0) {
try {
Sleeper.DEFAULT.sleep(nextSleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
nextSleep = BackOff.STOP;
}
}
existingInfo = getItemInfo(resourceId);
nextSleep = nextSleep == BackOff.STOP ? BackOff.STOP : backOff.nextBackOffMillis();
} while (!existingInfo.exists() && nextSleep != BackOff.STOP);
// Compare existence, size, and metadata; for 429 errors creating an empty object,
// we don't care about metaGeneration/contentGeneration as long as the metadata
// matches, since we don't know for sure whether our low-level request succeeded
// first or some other client succeeded first.
if (existingInfo.exists() && existingInfo.getSize() == 0) {
if (options.isEnsureEmptyObjectsMetadataMatch()) {
return existingInfo.metadataEquals(options.getMetadata());
}
return true;
}
}
return false;
}
private static boolean shouldBackoff(StorageResourceId resourceId,
ErrorTypeExtractor.ErrorType errorType) {
return errorType == ErrorTypeExtractor.ErrorType.RESOURCE_EXHAUSTED
|| errorType == ErrorTypeExtractor.ErrorType.INTERNAL ||
(resourceId.isDirectory() && errorType == ErrorTypeExtractor.ErrorType.FAILED_PRECONDITION);
}
private void createEmptyObjectInternal(
StorageResourceId resourceId, CreateObjectOptions createObjectOptions) throws IOException {
Map<String, String> rewrittenMetadata = encodeMetadata(createObjectOptions.getMetadata());
List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>();
blobTargetOptions.add(Storage.BlobTargetOption.disableGzipContent());
if (resourceId.hasGenerationId()) {
blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(resourceId.getGenerationId()));
} else if (resourceId.isDirectory() || !createObjectOptions.isOverwriteExisting()) {
blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist());
}
// TODO: Set encryption key and related properties
storage.create(
BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName()))
.setMetadata(rewrittenMetadata)
.setContentEncoding(createObjectOptions.getContentEncoding())
.setContentType(createObjectOptions.getContentType())
.build(),
blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
}
private static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
return Maps.transformValues(metadata, GoogleCloudStorage::encodeMetadataValues);
}
private static String encodeMetadataValues(byte[] bytes) {
return bytes == null ? null : BaseEncoding.base64().encode(bytes);
}
List<GoogleCloudStorageItemInfo> listDirectoryRecursive(String bucketName, String objectName)
throws IOException {
// TODO: Take delimiter from config
// TODO: Set specific fields
checkArgument(
objectName == null || objectName.endsWith("/"),
String.format("%s should end with /", objectName));
try {
List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectName, storage)
.forRecursiveListing().build()
.execute();
List<GoogleCloudStorageItemInfo> result = new ArrayList<>();
for (Blob blob : blobs) {
result.add(createItemInfoForBlob(blob));
}
return result;
} catch (StorageException e) {
throw new IOException(
String.format("Listing '%s' failed", BlobId.of(bucketName, objectName)), e);
}
}
void deleteObjects(List<StorageResourceId> fullObjectNames) throws IOException {
LOG.trace("deleteObjects({})", fullObjectNames);
if (fullObjectNames.isEmpty()) {
return;
}
// Validate that all the elements represent StorageObjects.
for (StorageResourceId toDelete : fullObjectNames) {
checkArgument(
toDelete.isStorageObject(),
"Expected full StorageObject names only, got: %s",
toDelete);
}
// TODO: Do this concurrently
// TODO: There is duplication. fix it
for (StorageResourceId toDelete : fullObjectNames) {
try {
LOG.trace("Deleting Object ({})", toDelete);
if (toDelete.hasGenerationId() && toDelete.getGenerationId() != 0) {
storage.delete(
BlobId.of(toDelete.getBucketName(), toDelete.getObjectName()),
Storage.BlobSourceOption.generationMatch(toDelete.getGenerationId()));
} else {
// TODO: Remove delete without generationId
storage.delete(BlobId.of(toDelete.getBucketName(), toDelete.getObjectName()));
LOG.trace("Deleting Object without generationId ({})", toDelete);
}
} catch (StorageException e) {
throw new IOException(String.format("Deleting resource %s failed.", toDelete), e);
}
}
}
List<GoogleCloudStorageItemInfo> listBucketInfo() throws IOException {
List<Bucket> allBuckets = listBucketsInternal();
List<GoogleCloudStorageItemInfo> bucketInfos = new ArrayList<>(allBuckets.size());
for (Bucket bucket : allBuckets) {
bucketInfos.add(createItemInfoForBucket(new StorageResourceId(bucket.getName()), bucket));
}
return bucketInfos;
}
private List<Bucket> listBucketsInternal() throws IOException {
checkNotNull(configuration.getProjectId(), "projectId must not be null");
List<Bucket> allBuckets = new ArrayList<>();
try {
Page<Bucket> buckets =
storage.list(
Storage.BucketListOption.pageSize(configuration.getMaxListItemsPerCall()),
Storage.BucketListOption.fields(
Storage.BucketField.LOCATION,
Storage.BucketField.STORAGE_CLASS,
Storage.BucketField.TIME_CREATED,
Storage.BucketField.UPDATED));
// Loop to fetch all the items.
for (Bucket bucket : buckets.iterateAll()) {
allBuckets.add(bucket);
}
} catch (StorageException e) {
throw new IOException(e);
}
return allBuckets;
}
SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo,
GoogleHadoopFileSystemConfiguration config) throws IOException {
LOG.trace("open({})", itemInfo);
checkNotNull(itemInfo, "itemInfo should not be null");
StorageResourceId resourceId = itemInfo.getResourceId();
checkArgument(
resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
return open(resourceId, itemInfo, config);
}
private SeekableByteChannel open(
StorageResourceId resourceId,
GoogleCloudStorageItemInfo itemInfo,
GoogleHadoopFileSystemConfiguration config)
throws IOException {
return new GoogleCloudStorageClientReadChannel(
storage,
itemInfo == null ? getItemInfo(resourceId) : itemInfo,
config);
}
void move(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap)
throws IOException {
validateMoveArguments(sourceToDestinationObjectsMap);
if (sourceToDestinationObjectsMap.isEmpty()) {
return;
}
for (Map.Entry<StorageResourceId, StorageResourceId> entry :
sourceToDestinationObjectsMap.entrySet()) {
StorageResourceId srcObject = entry.getKey();
StorageResourceId dstObject = entry.getValue();
// TODO: Do this concurrently
moveInternal(
srcObject.getBucketName(),
srcObject.getGenerationId(),
srcObject.getObjectName(),
dstObject.getGenerationId(),
dstObject.getObjectName());
}
}
private void moveInternal(
String srcBucketName,
long srcContentGeneration,
String srcObjectName,
long dstContentGeneration,
String dstObjectName) throws IOException {
Storage.MoveBlobRequest.Builder moveRequestBuilder =
createMoveRequestBuilder(
srcBucketName,
srcObjectName,
dstObjectName,
srcContentGeneration,
dstContentGeneration);
try {
String srcString = StringPaths.fromComponents(srcBucketName, srcObjectName);
String dstString = StringPaths.fromComponents(srcBucketName, dstObjectName);
Blob movedBlob = storage.moveBlob(moveRequestBuilder.build());
if (movedBlob != null) {
LOG.trace("Successfully moved {} to {}", srcString, dstString);
}
} catch (StorageException e) {
if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.NOT_FOUND) {
throw createFileNotFoundException(srcBucketName, srcObjectName, new IOException(e));
} else {
throw
new IOException(
String.format(
"Error moving '%s'",
StringPaths.fromComponents(srcBucketName, srcObjectName)),
e);
}
}
}
/** Creates a builder for a blob move request. */
private Storage.MoveBlobRequest.Builder createMoveRequestBuilder(
String srcBucketName,
String srcObjectName,
String dstObjectName,
long srcContentGeneration,
long dstContentGeneration) {
Storage.MoveBlobRequest.Builder moveRequestBuilder =
Storage.MoveBlobRequest.newBuilder().setSource(BlobId.of(srcBucketName, srcObjectName));
moveRequestBuilder.setTarget(BlobId.of(srcBucketName, dstObjectName));
List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>();
List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>();
if (srcContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
blobSourceOptions.add(Storage.BlobSourceOption.generationMatch(srcContentGeneration));
}
if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(dstContentGeneration));
}
// TODO: Add encryption support
moveRequestBuilder.setSourceOptions(blobSourceOptions);
moveRequestBuilder.setTargetOptions(blobTargetOptions);
return moveRequestBuilder;
}
/**
* Validates basic argument constraints like non-null, non-empty Strings, using {@code
* Preconditions} in addition to checking for src/dst bucket equality.
*/
static void validateMoveArguments(
Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) throws IOException {
checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null");
if (sourceToDestinationObjectsMap.isEmpty()) {
return;
}
for (Map.Entry<StorageResourceId, StorageResourceId> entry :
sourceToDestinationObjectsMap.entrySet()) {
StorageResourceId source = entry.getKey();
StorageResourceId destination = entry.getValue();
String srcBucketName = source.getBucketName();
String dstBucketName = destination.getBucketName();
// Avoid move across buckets.
if (!srcBucketName.equals(dstBucketName)) {
throw new UnsupportedOperationException(
"This operation is not supported across two different buckets.");
}
checkArgument(
!isNullOrEmpty(source.getObjectName()), "srcObjectName must not be null or empty");
checkArgument(
!isNullOrEmpty(destination.getObjectName()), "dstObjectName must not be null or empty");
if (srcBucketName.equals(dstBucketName)
&& source.getObjectName().equals(destination.getObjectName())) {
throw new IllegalArgumentException(
String.format(
"Move destination must be different from source for %s.",
StringPaths.fromComponents(srcBucketName, source.getObjectName())));
}
}
}
void copy(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap)
throws IOException {
validateCopyArguments(sourceToDestinationObjectsMap, this);
if (sourceToDestinationObjectsMap.isEmpty()) {
return;
}
for (Map.Entry<StorageResourceId, StorageResourceId> entry :
sourceToDestinationObjectsMap.entrySet()) {
StorageResourceId srcObject = entry.getKey();
StorageResourceId dstObject = entry.getValue();
// TODO: Do this concurrently
copyInternal(
srcObject.getBucketName(),
srcObject.getObjectName(),
dstObject.getGenerationId(),
dstObject.getBucketName(),
dstObject.getObjectName());
}
}
private void copyInternal(
String srcBucketName,
String srcObjectName,
long dstContentGeneration,
String dstBucketName,
String dstObjectName) throws IOException {
Storage.CopyRequest.Builder copyRequestBuilder =
Storage.CopyRequest.newBuilder().setSource(BlobId.of(srcBucketName, srcObjectName));
if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
copyRequestBuilder.setTarget(
BlobId.of(dstBucketName, dstObjectName),
Storage.BlobTargetOption.generationMatch(dstContentGeneration));
} else {
copyRequestBuilder.setTarget(BlobId.of(dstBucketName, dstObjectName));
}
// TODO: Add support for encryption key
if (configuration.getMaxRewriteChunkSize() > 0) {
copyRequestBuilder.setMegabytesCopiedPerChunk(
// Convert raw byte size into Mib.
configuration.getMaxRewriteChunkSize() / (1024 * 1024));
}
String srcString = StringPaths.fromComponents(srcBucketName, srcObjectName);
String dstString = StringPaths.fromComponents(dstBucketName, dstObjectName);
try {
CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
while (!copyWriter.isDone()) {
copyWriter.copyChunk();
LOG.trace(
"Copy ({} to {}) did not complete. Resuming...", srcString, dstString);
}
LOG.trace("Successfully copied {} to {}", srcString, dstString);
} catch (StorageException e) {
if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.NOT_FOUND) {
throw createFileNotFoundException(srcBucketName, srcObjectName, new IOException(e));
} else {
throw new IOException(String.format("copy(%s->%s) failed.", srcString, dstString), e);
}
}
}
static void validateCopyArguments(
Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap,
GoogleCloudStorage gcsImpl)
throws IOException {
checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null");
if (sourceToDestinationObjectsMap.isEmpty()) {
return;
}
Map<StorageResourceId, GoogleCloudStorageItemInfo> bucketInfoCache = new HashMap<>();
for (Map.Entry<StorageResourceId, StorageResourceId> entry :
sourceToDestinationObjectsMap.entrySet()) {
StorageResourceId source = entry.getKey();
StorageResourceId destination = entry.getValue();
String srcBucketName = source.getBucketName();
String dstBucketName = destination.getBucketName();
// Avoid copy across locations or storage classes.
if (!srcBucketName.equals(dstBucketName)) {
StorageResourceId srcBucketResourceId = new StorageResourceId(srcBucketName);
GoogleCloudStorageItemInfo srcBucketInfo =
getGoogleCloudStorageItemInfo(gcsImpl, bucketInfoCache, srcBucketResourceId);
if (!srcBucketInfo.exists()) {
throw new FileNotFoundException("Bucket not found: " + srcBucketName);
}
StorageResourceId dstBucketResourceId = new StorageResourceId(dstBucketName);
GoogleCloudStorageItemInfo dstBucketInfo =
getGoogleCloudStorageItemInfo(gcsImpl, bucketInfoCache, dstBucketResourceId);
if (!dstBucketInfo.exists()) {
throw new FileNotFoundException("Bucket not found: " + dstBucketName);
}
// TODO: Restrict this only when copy-with-rewrite is enabled
if (!srcBucketInfo.getLocation().equals(dstBucketInfo.getLocation())) {
throw new UnsupportedOperationException(
"This operation is not supported across two different storage locations.");
}
if (!srcBucketInfo.getStorageClass().equals(dstBucketInfo.getStorageClass())) {
throw new UnsupportedOperationException(
"This operation is not supported across two different storage classes.");
}
}
checkArgument(
!isNullOrEmpty(source.getObjectName()), "srcObjectName must not be null or empty");
checkArgument(
!isNullOrEmpty(destination.getObjectName()), "dstObjectName must not be null or empty");
if (srcBucketName.equals(dstBucketName)
&& source.getObjectName().equals(destination.getObjectName())) {
throw new IllegalArgumentException(
String.format(
"Copy destination must be different from source for %s.",
StringPaths.fromComponents(srcBucketName, source.getObjectName())));
}
}
}
private static GoogleCloudStorageItemInfo getGoogleCloudStorageItemInfo(
GoogleCloudStorage gcsImpl,
Map<StorageResourceId, GoogleCloudStorageItemInfo> bucketInfoCache,
StorageResourceId resourceId)
throws IOException {
GoogleCloudStorageItemInfo storageItemInfo = bucketInfoCache.get(resourceId);
if (storageItemInfo != null) {
return storageItemInfo;
}
storageItemInfo = gcsImpl.getItemInfo(resourceId);
bucketInfoCache.put(resourceId, storageItemInfo);
return storageItemInfo;
}
List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> resourceIds)
throws IOException {
LOG.trace("getItemInfos({})", resourceIds);
if (resourceIds.isEmpty()) {
return new ArrayList<>();
}
List<GoogleCloudStorageItemInfo> result = new ArrayList<>(resourceIds.size());
for (StorageResourceId resourceId : resourceIds) {
// TODO: Do this concurrently
result.add(getItemInfo(resourceId));
}
return result;
}
List<GoogleCloudStorageItemInfo> listDirectory(String bucketName, String objectNamePrefix)
throws IOException {
checkArgument(
objectNamePrefix == null || objectNamePrefix.endsWith("/"),
String.format("%s should end with /", objectNamePrefix));
try {
List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectNamePrefix, storage)
.forCurrentDirectoryListing().build()
.execute();
ListOperationResult result = new ListOperationResult();
for (Blob blob : blobs) {
result.add(blob);
}
return result.getItems();
} catch (StorageException e) {
throw new IOException(
String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)),
e);
}
}
void compose(
String bucketName, List<String> sources, String destination, String contentType)
throws IOException {
LOG.trace("compose({}, {}, {}, {})", bucketName, sources, destination, contentType);
List<StorageResourceId> sourceIds =
sources.stream()
.map(objectName -> new StorageResourceId(bucketName, objectName))
.collect(Collectors.toList());
StorageResourceId destinationId = new StorageResourceId(bucketName, destination);
CreateObjectOptions options =
CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
.setContentType(contentType)
.setEnsureEmptyObjectsMetadataMatch(false)
.build();
composeObjects(sourceIds, destinationId, options);
}
/**
* Get metadata for the given resourceId. The resourceId can be a file or a directory.
*
* For a resourceId gs://b/foo/a, it can be a file or a directory (gs:/b/foo/a/).
* This method checks for both and return the one that is found. "NotFound" is returned
* if not found.
*/
GoogleCloudStorageItemInfo getFileOrDirectoryInfo(StorageResourceId resourceId) {
BlobId blobId = resourceId.toBlobId();
if (resourceId.isDirectory()) {
// Do not check for "file" for directory paths.
Blob blob = storage.get(blobId);
if (blob != null) {
return createItemInfoForBlob(blob);
}
} else {
BlobId dirId = resourceId.toDirectoryId().toBlobId();
// Check for both file and directory.
List<Blob> blobs = storage.get(blobId, dirId);
for (Blob blob : blobs) {
if (blob != null) {
return createItemInfoForBlob(blob);
}
}
}
return GoogleCloudStorageItemInfo.createNotFound(resourceId);
}
/**
* Check if any "implicit" directory exists for the given resourceId.
*
* Note that GCS object store does not have a concept of directories for non-HNS buckets.
* For e.g. one could create an object gs://bucket/foo/bar/a.txt, without creating the
* parent directories (i.e. placeholder emtpy files ending with a /). In this case we might
* want to treat gs://bucket/foo/ and gs://bucket/foo/bar/ as directories.
*
* This method helps check if a given resourceId (e.g. gs://bucket/foo/bar/) is an "implicit"
* directory.
*
* Note that this will result in a list operation and is more expensive than "get metadata".
*/
GoogleCloudStorageItemInfo getImplicitDirectory(StorageResourceId resourceId) {
List<Blob> blobs = new GcsListOperation
.Builder(resourceId.getBucketName(), resourceId.getObjectName(), storage)
.forImplicitDirectoryCheck().build()
.execute();
if (blobs.isEmpty()) {
return GoogleCloudStorageItemInfo.createNotFound(resourceId);
}
return GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
}
public void deleteBuckets(List<String> bucketNames) throws IOException {
LOG.trace("deleteBuckets({})", bucketNames);
// Validate all the inputs first.
for (String bucketName : bucketNames) {
checkArgument(!Strings.isNullOrEmpty(bucketName), "bucketName must not be null or empty");
}
// Gather exceptions to wrap in a composite exception at the end.
List<IOException> innerExceptions = new ArrayList<>();
for (String bucketName : bucketNames) {
try {
boolean isDeleted = storage.delete(bucketName);
if (!isDeleted) {
innerExceptions.add(createFileNotFoundException(bucketName, null, null));
}
} catch (StorageException e) {
innerExceptions.add(
new IOException(String.format("Error deleting '%s' bucket", bucketName), e));
}
}
if (!innerExceptions.isEmpty()) {
throw GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
}
}
// Helper class to capture the results of list operation.
private class ListOperationResult {
private final Map<String, Blob> prefixes = new HashMap<>();
private final List<Blob> objects = new ArrayList<>();
private final Set<String> objectsSet = new HashSet<>();
void add(Blob blob) {
String path = blob.getBlobId().toGsUtilUri();
if (blob.getGeneration() != null) {
prefixes.remove(path);
objects.add(blob);
objectsSet.add(path);
} else if (!objectsSet.contains(path)) {
prefixes.put(path, blob);
}
}
List<GoogleCloudStorageItemInfo> getItems() {
List<GoogleCloudStorageItemInfo> result = new ArrayList<>(prefixes.size() + objects.size());
for (Blob blob : objects) {
result.add(createItemInfoForBlob(blob));
}
for (Blob blob : prefixes.values()) {
result.add(createItemInfoForBlob(blob));
}
return result;
}
}
}