TestFsShellMoveToTrashWithSnapshots.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
/**
* Testing snapshots with FsShell move-to-trash feature.
*/
public class TestFsShellMoveToTrashWithSnapshots {
static {
SnapshotTestHelper.disableLogs();
}
private static final Logger LOG =
LoggerFactory.getLogger("XXX");
private static final String TMP = ".tmp";
private static final String WAREHOUSE_DIR = "/warehouse/sub/";
private static final String TO_BE_REMOVED = "TMP/";
private static SnapshotTestHelper.MyCluster cluster;
@Before
public void setUp() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 100);
cluster = new SnapshotTestHelper.MyCluster(conf);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
static class MyDirs {
private final Path base;
private final boolean[] moved;
private final List<Integer> renames = new ArrayList<>();
MyDirs(Path base, int depth) {
this.base = base;
this.moved = new boolean[depth];
for (int i = 0; i < depth; i++) {
renames.add(i);
}
Collections.shuffle(renames);
}
int depth() {
return moved.length;
}
DeleteSnapshotOp rename() throws Exception {
final int i = renames.remove(renames.size() - 1);
final String snapshot = cluster.rename(getSubPath(i + 1), getSubPath(i));
moved[i] = true;
return new DeleteSnapshotOp(snapshot);
}
Path getSubPath(int n) {
if (n == 0) {
return base;
}
final StringBuilder b = new StringBuilder();
for (int i = 0; i < n; i++) {
if (!moved[i]) {
b.append(TO_BE_REMOVED);
}
b.append("dir").append(i).append("/");
}
return new Path(base, b.toString());
}
Path getPath() {
return getSubPath(moved.length);
}
}
static class MyFile {
private final Path tmp;
private Path dst;
private Path trash;
MyFile(String filePath) {
this.tmp = new Path(filePath + TMP);
}
@Override
public String toString() {
return "MyFile{" +
"tmp=" + tmp +
", dst=" + dst +
", trash=" + trash +
'}';
}
synchronized Path getPath() {
return trash != null ? trash
: dst != null ? dst
: tmp;
}
synchronized String moveFromTmp2Dst(Path dstDir) throws Exception {
final String tmpName = tmp.getName();
dst = new Path(dstDir, tmpName.substring(0, tmpName.length() - 4));
final String snapshot = cluster.rename(tmp, dst);
trash = cluster.getTrashPath(dst);
return snapshot;
}
}
MyFile createTmp(String filePath) throws Exception {
final MyFile f = new MyFile(filePath);
cluster.createFile(f.tmp);
return f;
}
DeleteSnapshotOp moveFromTmp2Dst(MyFile file, Path dstDir) throws Exception {
final String snapshot = file.moveFromTmp2Dst(dstDir);
return new DeleteSnapshotOp(snapshot);
}
List<MyFile> runTestMoveToTrashWithShell(
Path dbDir, Path tmpDir, int numFiles)
throws Exception {
return runTestMoveToTrashWithShell(dbDir, tmpDir, numFiles, 4, null);
}
List<MyFile> runTestMoveToTrashWithShell(
Path dbDir, Path tmpDir, int numFiles, int depth, Integer randomSleepMaxMs)
throws Exception {
LOG.info("dbDir={}", dbDir);
LOG.info("tmpDir={}", tmpDir);
LOG.info("numFiles={}, depth={}, randomSleepMaxMs={}", numFiles, depth, randomSleepMaxMs);
cluster.setPrintTree(numFiles < 10);
final List<Op> ops = new ArrayList<>();
createSnapshot(ops);
//swap sub1 and sub2
Path sub1 = cluster.mkdirs(new Path(dbDir, "sub1"));
Path sub2 = cluster.mkdirs(new Path(sub1, "sub2"));
ops.add(new DeleteSnapshotOp(cluster.rename(sub2, dbDir)));
sub2 = new Path(dbDir, "sub2");
ops.add(new DeleteSnapshotOp(cluster.rename(sub1, sub2)));
sub1 = new Path(sub2, "sub1");
final MyDirs dirs = new MyDirs(sub1, depth);
cluster.mkdirs(dirs.getPath());
final List<MyFile> buckets = new ArrayList<>();
for (int i = 0; i < dirs.depth() / 2; i++) {
ops.add(dirs.rename());
}
final int offset = numFiles / 4;
for (int i = 0; i < numFiles; i++) {
final String bucket = tmpDir + String.format("/bucket_%04d", i);
createSnapshot(ops);
buckets.add(createTmp(bucket));
if (i >= offset) {
final int j = i - offset;
ops.add(moveFromTmp2Dst(buckets.get(j), dirs.getPath()));
}
if (randomSleepMaxMs != null) {
Thread.sleep(ThreadLocalRandom.current().nextInt(randomSleepMaxMs));
}
}
for (int i = dirs.depth() / 2; i < dirs.depth(); i++) {
ops.add(dirs.rename());
}
ops.add(new DeleteSnapshotOp(cluster.rename(dirs.getSubPath(1), sub2)));
ops.add(new DeleteSnapshotOp(cluster.rename(sub1, dbDir)));
sub1 = new Path(dbDir, "sub1");
ops.add(new DeleteSnapshotOp(cluster.rename(sub2, sub1)));
sub2 = new Path(sub1, "sub2");
ops.add(new DeleteSnapshotOp(cluster.rename(sub2, new Path(sub1, "sub1"))));
ops.add(new DeleteSnapshotOp(cluster.rename(sub1, new Path(dbDir, "sub2"))));
final MoveToTrashOp m = new MoveToTrashOp(dbDir);
m.trashPath.thenAccept(p -> updateTrashPath(p, buckets));
ops.add(m);
LOG.info("ops count: {}", ops.size());
while (!ops.isEmpty()) {
runOneOp(ops);
}
cluster.printFs("END");
return buckets;
}
static Path removeSubstring(Path p) {
if (p == null) {
return null;
}
return new Path(p.toUri().getPath().replace(TO_BE_REMOVED, ""));
}
void updateTrashPath(String trashPathPrefix, List<MyFile> files) {
final String commonPrefix;
final int j = trashPathPrefix.lastIndexOf('/');
commonPrefix = trashPathPrefix.substring(0, j + 1);
for (MyFile f : files) {
final String original = f.trash.toUri().getPath();
if (!original.startsWith(trashPathPrefix)) {
Assert.assertTrue(original.startsWith(commonPrefix));
final int i = original.indexOf('/', commonPrefix.length());
final String suffix = original.substring(i + 1);
f.trash = new Path(trashPathPrefix, suffix);
}
}
}
@Test(timeout = 300_000)
public void test100tasks20files() throws Exception {
runMultipleTasks(100, 20);
}
@Test(timeout = 300_000)
public void test10tasks200files() throws Exception {
runMultipleTasks(10, 200);
}
void runMultipleTasks(int numTasks, int filesPerTask) throws Exception {
final List<Future<List<MyFile>>> futures = new ArrayList<>();
final List<MyFile> buckets = new ArrayList<>();
final ExecutorService executor = Executors.newFixedThreadPool(10);
try {
for (int i = 0; i < numTasks; i++) {
final String db = "db" + i;
final String tmp = "tmp" + i;
futures.add(executor.submit(() -> {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + db);
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + tmp);
return runTestMoveToTrashWithShell(dbDir, tmpDir, filesPerTask, 4, 100);
}));
}
for (Future<List<MyFile>> f : futures) {
buckets.addAll(f.get());
}
} finally {
executor.shutdown();
}
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
@Test(timeout = 100_000)
public void test4files() throws Exception {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db");
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp");
final List<MyFile> buckets = runTestMoveToTrashWithShell(
dbDir, tmpDir, 4, 2, null);
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
@Test(timeout = 300_000)
public void test200files() throws Exception {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db");
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp");
final List<MyFile> buckets = runTestMoveToTrashWithShell(
dbDir, tmpDir, 200);
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
@Test(timeout = 300_000)
public void test50files10times() throws Exception {
final Path tmpDir = cluster.mkdirs(WAREHOUSE_DIR + "tmp");
final List<MyFile> buckets = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final Path dbDir = cluster.mkdirs(WAREHOUSE_DIR + "db");
buckets.addAll(runTestMoveToTrashWithShell(dbDir, tmpDir, 50));
}
cluster.setPrintTree(true);
cluster.printFs("test_10files_10times");
assertExists(buckets, f -> removeSubstring(f.getPath()));
}
static void createSnapshot(List<Op> ops) throws Exception {
if (ThreadLocalRandom.current().nextBoolean()) {
ops.add(new DeleteSnapshotOp(cluster.createSnapshot()));
}
}
void runOneOp(List<Op> ops) throws Exception {
Collections.shuffle(ops);
final Op op = ops.remove(ops.size() - 1);
if (op instanceof MoveToTrashOp) {
createSnapshot(ops);
}
op.execute();
}
static abstract class Op {
private final AtomicBoolean executed = new AtomicBoolean();
final void execute() throws Exception {
if (executed.compareAndSet(false, true)) {
executeImpl();
}
}
final boolean isExecuted() {
return executed.get();
}
abstract void executeImpl() throws Exception;
}
static class MoveToTrashOp extends Op {
private final Path path;
private final CompletableFuture<String> trashPath = new CompletableFuture<>();
MoveToTrashOp(Path path) {
this.path = path;
}
@Override
public void executeImpl() throws Exception {
final Path p = cluster.moveToTrash(path, true);
LOG.info("MoveToTrash: {} -> {}", path, p);
trashPath.complete(p.toUri().getPath());
}
}
static class DeleteSnapshotOp extends Op {
private final String name;
DeleteSnapshotOp(String name) {
this.name = name;
}
@Override
void executeImpl() throws Exception {
cluster.deleteSnapshot(name);
}
}
void assertExists(List<MyFile> files, Function<MyFile, Path> getPath)
throws Exception {
for (MyFile f : files) {
final Path p = getPath.apply(f);
final boolean exists = cluster.assertExists(p);
if (cluster.getPrintTree()) {
LOG.info("{} exists? {}, {}", p, exists, f);
}
}
}
}