ITestListPerformance.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.junit.Assume;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
import org.apache.hadoop.fs.contract.ContractTestUtils;
/**
* Test list performance.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ITestListPerformance extends AbstractAzureScaleTest {
private static final Logger LOG = LoggerFactory.getLogger(
ITestListPerformance.class);
private static final Path TEST_DIR_PATH = new Path(
"DirectoryWithManyFiles");
private static final int NUMBER_OF_THREADS = 10;
private static final int NUMBER_OF_FILES_PER_THREAD = 1000;
private int threads;
private int filesPerThread;
private int expectedFileCount;
@Override
public void setUp() throws Exception {
super.setUp();
Configuration conf = getConfiguration();
// fail fast
threads = AzureTestUtils.getTestPropertyInt(conf,
"fs.azure.scale.test.list.performance.threads", NUMBER_OF_THREADS);
filesPerThread = AzureTestUtils.getTestPropertyInt(conf,
"fs.azure.scale.test.list.performance.files", NUMBER_OF_FILES_PER_THREAD);
expectedFileCount = threads * filesPerThread;
LOG.info("Thread = {}, Files per Thread = {}, expected files = {}",
threads, filesPerThread, expectedFileCount);
conf.set("fs.azure.io.retry.max.retries", "1");
conf.set("fs.azure.delete.threads", "16");
createTestAccount();
}
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
return AzureBlobStorageTestAccount.create(
"itestlistperformance",
EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
null,
true);
}
@Test
public void test_0101_CreateDirectoryWithFiles() throws Exception {
Assume.assumeFalse("Test path exists; skipping", fs.exists(TEST_DIR_PATH));
ExecutorService executorService = Executors.newFixedThreadPool(threads);
CloudBlobContainer container = testAccount.getRealContainer();
final String basePath = (fs.getWorkingDirectory().toUri().getPath() + "/" + TEST_DIR_PATH + "/").substring(1);
ArrayList<Callable<Integer>> tasks = new ArrayList<>(threads);
fs.mkdirs(TEST_DIR_PATH);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
for (int i = 0; i < threads; i++) {
tasks.add(
new Callable<Integer>() {
public Integer call() {
int written = 0;
for (int j = 0; j < filesPerThread; j++) {
String blobName = basePath + UUID.randomUUID().toString();
try {
CloudBlockBlob blob = container.getBlockBlobReference(
blobName);
blob.uploadText("");
written ++;
} catch (Exception e) {
LOG.error("Filed to write {}", blobName, e);
break;
}
}
LOG.info("Thread completed with {} files written", written);
return written;
}
}
);
}
List<Future<Integer>> futures = executorService.invokeAll(tasks,
getTestTimeoutMillis(), TimeUnit.MILLISECONDS);
long elapsedMs = timer.elapsedTimeMs();
LOG.info("time to create files: {} millis", elapsedMs);
for (Future<Integer> future : futures) {
assertTrue("Future timed out", future.isDone());
assertEquals("Future did not write all files timed out",
filesPerThread, future.get().intValue());
}
}
@Test
public void test_0200_ListStatusPerformance() throws Exception {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
FileStatus[] fileList = fs.listStatus(TEST_DIR_PATH);
long elapsedMs = timer.elapsedTimeMs();
LOG.info(String.format(
"files=%1$d, elapsedMs=%2$d",
fileList.length,
elapsedMs));
Map<Path, FileStatus> foundInList =new HashMap<>(expectedFileCount);
for (FileStatus fileStatus : fileList) {
foundInList.put(fileStatus.getPath(), fileStatus);
LOG.info("{}: {}", fileStatus.getPath(),
fileStatus.isDirectory() ? "dir" : "file");
}
assertEquals("Mismatch between expected files and actual",
expectedFileCount, fileList.length);
// now do a listFiles() recursive
ContractTestUtils.NanoTimer initialStatusCallTimer
= new ContractTestUtils.NanoTimer();
RemoteIterator<LocatedFileStatus> listing
= fs.listFiles(TEST_DIR_PATH, true);
long initialListTime = initialStatusCallTimer.elapsedTimeMs();
timer = new ContractTestUtils.NanoTimer();
while (listing.hasNext()) {
FileStatus fileStatus = listing.next();
Path path = fileStatus.getPath();
FileStatus removed = foundInList.remove(path);
assertNotNull("Did not find " + path + "{} in the previous listing",
removed);
}
elapsedMs = timer.elapsedTimeMs();
LOG.info("time for listFiles() initial call: {} millis;"
+ " time to iterate: {} millis", initialListTime, elapsedMs);
assertEquals("Not all files from listStatus() were found in listFiles()",
0, foundInList.size());
}
@Test
public void test_0300_BulkDeletePerformance() throws Exception {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
fs.delete(TEST_DIR_PATH,true);
long elapsedMs = timer.elapsedTimeMs();
LOG.info("time for delete(): {} millis; {} nanoS per file",
elapsedMs, timer.nanosPerOperation(expectedFileCount));
}
}