PrefixStorage.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.object;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.apache.hadoop.util.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class PrefixStorage implements DirectoryStorage {
private final ObjectStorage storage;
private final String prefix;
public PrefixStorage(ObjectStorage storage, String prefix) {
this.storage = storage;
this.prefix = prefix;
}
@Override
public String scheme() {
return storage.scheme();
}
@Override
public BucketInfo bucket() {
return storage.bucket();
}
@Override
public void initialize(Configuration conf, String bucket) {
storage.initialize(conf, bucket);
}
@Override
public Configuration conf() {
return storage.conf();
}
@Override
public ObjectContent get(String key, long offset, long limit) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return storage.get(prefix + key, offset, limit);
}
@Override
public byte[] put(String key, InputStreamProvider streamProvider, long contentLength) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return storage.put(prefix + key, streamProvider, contentLength);
}
@Override
public byte[] append(String key, InputStreamProvider streamProvider, long contentLength) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return storage.append(prefix + key, streamProvider, contentLength);
}
@Override
public void delete(String key) {
Preconditions.checkArgument(key != null, "Object key cannot be null or empty.");
storage.delete(prefix + key);
}
@Override
public List<String> batchDelete(List<String> keys) {
return storage.batchDelete(keys.stream().map(key -> prefix + key).collect(Collectors.toList()));
}
@Override
public void deleteAll(String prefixToDelete) {
storage.deleteAll(this.prefix + prefixToDelete);
}
@Override
public ObjectInfo head(String key) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return removePrefix(storage.head(prefix + key));
}
private ListObjectsResponse removePrefix(ListObjectsResponse response) {
List<ObjectInfo> objects = response.objects().stream()
.map(this::removePrefix)
.collect(Collectors.toList());
List<String> commonPrefixKeys = response.commonPrefixes().stream()
.map(this::removePrefix)
.collect(Collectors.toList());
return new ListObjectsResponse(objects, commonPrefixKeys);
}
@Override
public Iterable<ListObjectsResponse> list(ListObjectsRequest request) {
String startAfter = Strings.isNullOrEmpty(request.startAfter()) ?
request.startAfter() : prefix + request.startAfter();
ListObjectsRequest newReq = ListObjectsRequest.builder()
.prefix(prefix + request.prefix())
.startAfter(startAfter)
.maxKeys(request.maxKeys())
.delimiter(request.delimiter())
.build();
return Iterables.transform(storage.list(newReq), this::removePrefix);
}
@Override
public MultipartUpload createMultipartUpload(String key) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return removePrefix(storage.createMultipartUpload(prefix + key));
}
@Override
public Part uploadPart(
String key, String uploadId, int partNum,
InputStreamProvider streamProvider, long contentLength) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return storage.uploadPart(prefix + key, uploadId, partNum, streamProvider, contentLength);
}
@Override
public byte[] completeUpload(String key, String uploadId, List<Part> uploadParts) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
return storage.completeUpload(prefix + key, uploadId, uploadParts);
}
@Override
public void abortMultipartUpload(String key, String uploadId) {
Preconditions.checkArgument(key != null && key.length() > 0,
"Object key cannot be null or empty.");
storage.abortMultipartUpload(prefix + key, uploadId);
}
@Override
public Iterable<MultipartUpload> listUploads(String keyPrefix) {
return Iterables.transform(storage.listUploads(prefix + keyPrefix), this::removePrefix);
}
@Override
public Part uploadPartCopy(
String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
long copySourceRangeEnd) {
return storage.uploadPartCopy(prefix + srcKey, prefix + dstKey, uploadId, partNum,
copySourceRangeStart, copySourceRangeEnd);
}
@Override
public void copy(String srcKey, String dstKey) {
storage.copy(prefix + srcKey, prefix + dstKey);
}
@Override
public void rename(String srcKey, String dstKey) {
storage.rename(prefix + srcKey, prefix + dstKey);
}
private ObjectInfo removePrefix(ObjectInfo o) {
if (o == null) {
return null;
}
return new ObjectInfo(removePrefix(o.key()), o.size(), o.mtime(), o.checksum(), o.isDir());
}
private MultipartUpload removePrefix(MultipartUpload u) {
if (u == null) {
return null;
}
return new MultipartUpload(removePrefix(u.key()), u.uploadId(), u.minPartSize(),
u.maxPartCount());
}
private String removePrefix(String key) {
if (key == null) {
return null;
} else if (key.startsWith(prefix)) {
return key.substring(prefix.length());
} else {
return key;
}
}
@Override
public void putTags(String key, Map<String, String> newTags) {
storage.putTags(prefix + key, newTags);
}
@Override
public Map<String, String> getTags(String key) {
return storage.getTags(prefix + key);
}
@Override
public ObjectInfo objectStatus(String key) {
Preconditions.checkArgument(key != null && !key.isEmpty(),
"Object key cannot be null or empty.");
return removePrefix(storage.objectStatus(prefix + key));
}
@Override
public ChecksumInfo checksumInfo() {
return storage.checksumInfo();
}
@Override
public void close() throws IOException {
storage.close();
}
@Override
public Iterable<ObjectInfo> listDir(String key, boolean recursive) {
Preconditions.checkArgument(storage instanceof DirectoryStorage);
return Iterables.transform(((DirectoryStorage) storage).listDir(prefix + key, recursive),
this::removePrefix);
}
@Override
public void deleteDir(String key, boolean recursive) {
Preconditions.checkArgument(storage instanceof DirectoryStorage);
((DirectoryStorage) storage).deleteDir(prefix + key, recursive);
}
@Override
public boolean isEmptyDir(String key) {
Preconditions.checkArgument(storage instanceof DirectoryStorage);
return ((DirectoryStorage) storage).isEmptyDir(prefix + key);
}
}