BulkDeleteOperationCallbacksImpl.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Error;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.util.functional.Tuples;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.hadoop.fs.s3a.Invoker.once;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.Tuples.pair;
/**
* Callbacks for the bulk delete operation.
*/
public class BulkDeleteOperationCallbacksImpl implements
BulkDeleteOperation.BulkDeleteOperationCallbacks {
/**
* Path for logging.
*/
private final String path;
/** Page size for bulk delete. */
private final int pageSize;
/** span for operations. */
private final AuditSpan span;
/**
* Store.
*/
private final S3AStore store;
public BulkDeleteOperationCallbacksImpl(final S3AStore store,
String path, int pageSize, AuditSpan span) {
this.span = span;
this.pageSize = pageSize;
this.path = path;
this.store = store;
}
@Override
@Retries.RetryTranslated
public List<Map.Entry<String, String>> bulkDelete(final List<ObjectIdentifier> keysToDelete)
throws IOException, IllegalArgumentException {
span.activate();
final int size = keysToDelete.size();
checkArgument(size <= pageSize,
"Too many paths to delete in one operation: %s", size);
if (size == 0) {
return emptyList();
}
if (size == 1) {
return deleteSingleObject(keysToDelete.get(0).key());
}
final DeleteObjectsResponse response = once("bulkDelete", path, () ->
store.deleteObjects(store.getRequestFactory()
.newBulkDeleteRequestBuilder(keysToDelete)
.build())).getValue();
final List<S3Error> errors = response.errors();
if (errors.isEmpty()) {
// all good.
return emptyList();
} else {
return errors.stream()
.map(e -> pair(e.key(), e.toString()))
.collect(Collectors.toList());
}
}
/**
* Delete a single object.
* @param key key to delete
* @return list of keys which failed to delete: length 0 or 1.
* @throws IOException IO problem other than AccessDeniedException
*/
@Retries.RetryTranslated
private List<Map.Entry<String, String>> deleteSingleObject(final String key) throws IOException {
try {
once("bulkDelete", path, () ->
store.deleteObject(store.getRequestFactory()
.newDeleteObjectRequestBuilder(key)
.build()));
} catch (AccessDeniedException e) {
return singletonList(pair(key, e.toString()));
}
return emptyList();
}
}