DefaultFsOps.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.ops;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.tosfs.RawFileStatus;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.object.Constants;
import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.fs.tosfs.util.Iterables;
import org.apache.hadoop.util.Lists;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Predicate;
import static org.apache.hadoop.fs.tosfs.object.ObjectUtils.SLASH;
/**
* Provides rename, delete, list capabilities for general purpose bucket.
*/
public class DefaultFsOps implements FsOps {
private final ObjectStorage storage;
private final ExecutorService taskThreadPool;
private final Function<ObjectInfo, RawFileStatus> objMapper;
private final RenameOp renameOp;
private final boolean asyncCreateParentDir;
public DefaultFsOps(
ObjectStorage storage,
Configuration conf,
ExecutorService taskThreadPool,
Function<ObjectInfo, RawFileStatus> objMapper) {
this.storage = storage;
this.taskThreadPool = taskThreadPool;
this.objMapper = objMapper;
this.renameOp = new RenameOp(conf, storage, taskThreadPool);
this.asyncCreateParentDir =
conf.getBoolean(ConfKeys.FS_ASYNC_CREATE_MISSED_PARENT.key(storage.scheme()),
ConfKeys.FS_ASYNC_CREATE_MISSED_PARENT_DEFAULT);
}
@Override
public void renameFile(Path src, Path dst, long length) {
renameOp.renameFile(src, dst, length);
mkdirIfNecessary(src.getParent(), asyncCreateParentDir);
}
@Override
public void renameDir(Path src, Path dst) {
renameOp.renameDir(src, dst);
mkdirIfNecessary(src.getParent(), asyncCreateParentDir);
}
@Override
public void deleteFile(Path file) {
storage.delete(ObjectUtils.pathToKey(file));
mkdirIfNecessary(file.getParent(), asyncCreateParentDir);
}
@Override
public void deleteDir(Path dir, boolean recursive) throws IOException {
String dirKey = ObjectUtils.pathToKey(dir, true);
if (recursive) {
storage.deleteAll(dirKey);
} else {
if (isEmptyDirectory(dir)) {
storage.delete(dirKey);
} else {
throw new PathIsNotEmptyDirectoryException(dir.toString());
}
}
}
@Override
public Iterable<RawFileStatus> listDir(Path dir, boolean recursive,
Predicate<String> postFilter) {
String key = ObjectUtils.pathToKey(dir, true);
String delimiter = recursive ? null : SLASH;
ListObjectsRequest req = ListObjectsRequest.builder()
.prefix(key)
.startAfter(key)
.delimiter(delimiter)
.build();
return Iterables.transform(asObjectInfo(storage.list(req), postFilter), objMapper);
}
@Override
public boolean isEmptyDirectory(Path dir) {
String key = ObjectUtils.pathToKey(dir, true);
ListObjectsRequest req = ListObjectsRequest.builder()
.prefix(key)
.startAfter(key)
.delimiter(SLASH)
.maxKeys(1)
.build();
return !asObjectInfo(storage.list(req), s -> true).iterator().hasNext();
}
@Override
public void mkdirs(Path dir) {
if (dir.isRoot()) {
return;
}
String key = ObjectUtils.pathToKey(dir, true);
storage.put(key, new byte[0]);
// Create parent dir if missed.
Path parentPath = dir.getParent();
String parentKey = ObjectUtils.pathToKey(parentPath, true);
while (!parentPath.isRoot() && storage.head(parentKey) == null) {
storage.put(parentKey, new byte[0]);
parentPath = parentPath.getParent();
parentKey = ObjectUtils.pathToKey(parentPath, true);
}
}
private void mkdirIfNecessary(Path path, boolean async) {
if (path != null) {
CommonUtils.runQuietly(() -> {
Future<?> future = taskThreadPool.submit(() -> {
String key = ObjectUtils.pathToKey(path, true);
if (!key.isEmpty() && storage.head(key) == null) {
mkdirs(ObjectUtils.keyToPath(key));
}
});
if (!async) {
future.get();
}
});
}
}
/**
* Convert ListObjectResponse iterable to FileStatus iterable,
* using file status acceptor to filter the expected objects and common prefixes.
*
* @param listResponses the iterable of ListObjectsResponse
* @param filter the file status acceptor
* @return the iterable of TosFileStatus
*/
private Iterable<ObjectInfo> asObjectInfo(Iterable<ListObjectsResponse> listResponses,
Predicate<String> filter) {
Iterable<List<ObjectInfo>> results = Iterables.transform(listResponses, listResp -> {
List<ObjectInfo> objs = Lists.newArrayList();
// Add object files.
objs.addAll(listResp.objects());
// Add object directories.
for (String prefix : listResp.commonPrefixes()) {
objs.add(new ObjectInfo(prefix, 0, new Date(), Constants.MAGIC_CHECKSUM, true));
}
return objs;
});
return Iterables.filter(Iterables.concat(results), o -> filter.test(o.key()));
}
}