TestDynamometerInfra.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools.dynamometer;

import org.apache.hadoop.util.Sets;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.test.PlatformAssumptions;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.tools.dynamometer.DynoInfraUtils.fetchHadoopTarball;
import static org.apache.hadoop.hdfs.MiniDFSCluster.PROP_TEST_BUILD_DATA;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Start a Dynamometer cluster in a MiniYARNCluster. Ensure that the NameNode is
 * able to start correctly, exit safemode, and run some commands. Subsequently
 * the workload job is launched and it is verified that it completes
 * successfully and is able to replay commands as expected.
 *
 * To run this test JAVA_HOME must be set correctly, and the {@code tar} utility
 * must be available.
 *
 * You can optionally specify which version of HDFS should be started within the
 * Dynamometer cluster; the default is {@value HADOOP_BIN_VERSION_DEFAULT}. This
 * can be adjusted by setting the {@value HADOOP_BIN_VERSION_KEY} property. This
 * will automatically download the correct Hadoop tarball for the specified
 * version. It downloads from an Apache mirror (by default
 * {@value DynoInfraUtils#APACHE_DOWNLOAD_MIRROR_DEFAULT}); which mirror is used
 * can be controlled with the {@value DynoInfraUtils#APACHE_DOWNLOAD_MIRROR_KEY}
 * property. Note that mirrors normally contain only the latest releases on any
 * given release line; you may need to use
 * {@code http://archive.apache.org/dist/} for older releases. The downloaded
 * tarball will be stored in the test directory and can be reused between test
 * executions. Alternatively, you can specify the {@value HADOOP_BIN_PATH_KEY}
 * property to point directly to a Hadoop tarball which is present locally and
 * no download will occur.
 */
@Ignore
public class TestDynamometerInfra {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestDynamometerInfra.class);

  private static final int MINICLUSTER_NUM_NMS = 3;
  private static final int MINICLUSTER_NUM_DNS = 1;

  private static final String HADOOP_BIN_PATH_KEY = "dyno.hadoop.bin.path";
  private static final String HADOOP_BIN_VERSION_KEY =
      "dyno.hadoop.bin.version";
  private static final String HADOOP_BIN_VERSION_DEFAULT = "3.1.4";
  private static final String FSIMAGE_FILENAME = "fsimage_0000000000000061740";
  private static final String VERSION_FILENAME = "VERSION";

  private static final String HADOOP_BIN_UNPACKED_DIR_PREFIX =
      "hadoop_unpacked_";

  private static final String NAMENODE_NODELABEL = "dyno_namenode";
  private static final String DATANODE_NODELABEL = "dyno_datanode";

  private static final String OUTPUT_PATH = "/tmp/trace_output_direct";

  private static MiniDFSCluster miniDFSCluster;
  private static MiniYARNCluster miniYARNCluster;
  private static YarnClient yarnClient;
  private static FileSystem fs;
  private static Configuration conf;
  private static Configuration yarnConf;
  private static Path fsImageTmpPath;
  private static Path fsVersionTmpPath;
  private static Path blockImageOutputDir;
  private static Path auditTraceDir;
  private static Path confZip;
  private static File testBaseDir;
  private static File hadoopTarballPath;
  private static File hadoopUnpackedDir;

  private ApplicationId infraAppId;

  @BeforeClass
  public static void setupClass() throws Exception {
    PlatformAssumptions.assumeNotWindows("Dynamometer will not run on Windows");
    Assume.assumeThat("JAVA_HOME must be set properly",
        System.getenv("JAVA_HOME"), notNullValue());
    try {
      Shell.ShellCommandExecutor tarCheck = new Shell.ShellCommandExecutor(
          new String[]{"bash", "-c", "command -v tar"});
      tarCheck.execute();
      Assume.assumeTrue("tar command is not available",
          tarCheck.getExitCode() == 0);
    } catch (IOException ioe) {
      Assume.assumeNoException("Unable to execute a shell command", ioe);
    }

    conf = new Configuration();
    // Follow the conventions of MiniDFSCluster
    testBaseDir = new File(
        System.getProperty(PROP_TEST_BUILD_DATA, "build/test/data"));
    String hadoopBinVersion = System.getProperty(HADOOP_BIN_VERSION_KEY,
        HADOOP_BIN_VERSION_DEFAULT);
    if (System.getProperty(HADOOP_BIN_PATH_KEY) == null) {
      hadoopTarballPath = fetchHadoopTarball(testBaseDir, hadoopBinVersion,
          conf, LOG);
    } else {
      hadoopTarballPath = new File(System.getProperty(HADOOP_BIN_PATH_KEY));
    }
    if (testBaseDir.exists()) {
      // Delete any old unpacked bin dirs that weren't previously cleaned up
      File[] oldUnpackedDirs = testBaseDir.listFiles(
          (dir, name) -> name.startsWith(HADOOP_BIN_UNPACKED_DIR_PREFIX));
      if (oldUnpackedDirs != null) {
        for (File oldDir : oldUnpackedDirs) {
          FileUtils.deleteQuietly(oldDir);
        }
      }
    }
    // Set up the Hadoop binary to be used as the system-level Hadoop install
    hadoopUnpackedDir = new File(testBaseDir,
        HADOOP_BIN_UNPACKED_DIR_PREFIX + UUID.randomUUID());
    assertTrue("Failed to make temporary directory",
        hadoopUnpackedDir.mkdirs());
    Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
        new String[] {"tar", "xzf", hadoopTarballPath.getAbsolutePath(), "-C",
            hadoopUnpackedDir.getAbsolutePath()});
    shexec.execute();
    if (shexec.getExitCode() != 0) {
      fail("Unable to execute tar to expand Hadoop binary");
    }

    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
    for (String q : new String[] {"root", "root.default"}) {
      conf.setInt(CapacitySchedulerConfiguration.PREFIX + q + "."
          + CapacitySchedulerConfiguration.CAPACITY, 100);
      String accessibleNodeLabelPrefix = CapacitySchedulerConfiguration.PREFIX
          + q + "." + CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS;
      conf.set(accessibleNodeLabelPrefix,
          CapacitySchedulerConfiguration.ALL_ACL);
      conf.setInt(accessibleNodeLabelPrefix + "." + DATANODE_NODELABEL + "."
          + CapacitySchedulerConfiguration.CAPACITY, 100);
      conf.setInt(accessibleNodeLabelPrefix + "." + NAMENODE_NODELABEL + "."
          + CapacitySchedulerConfiguration.CAPACITY, 100);
    }
    // This is necessary to have the RM respect our vcore allocation request
    conf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class, ResourceCalculator.class);
    conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
    miniYARNCluster = new MiniYARNCluster(TestDynamometerInfra.class.getName(),
        1, MINICLUSTER_NUM_NMS, 1, 1);
    miniYARNCluster.init(conf);
    miniYARNCluster.start();

    yarnConf = miniYARNCluster.getConfig();
    miniDFSCluster = new MiniDFSCluster.Builder(conf).format(true)
        .numDataNodes(MINICLUSTER_NUM_DNS).build();
    miniDFSCluster.waitClusterUp();
    FileSystem.setDefaultUri(conf, miniDFSCluster.getURI());
    FileSystem.setDefaultUri(yarnConf, miniDFSCluster.getURI());
    fs = miniDFSCluster.getFileSystem();

    URL url = Thread.currentThread().getContextClassLoader()
        .getResource("yarn-site.xml");
    if (url == null) {
      throw new RuntimeException(
          "Could not find 'yarn-site.xml' dummy file in classpath");
    }
    yarnConf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
        new File(url.getPath()).getParent());
    // Write the XML to a buffer before writing to the file. writeXml() can
    // trigger a read of the existing yarn-site.xml, so writing directly could
    // trigger a read of the file while it is in an inconsistent state
    // (partially written)
    try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream()) {
      yarnConf.writeXml(bytesOut);
      try (OutputStream fileOut = new FileOutputStream(
          new File(url.getPath()))) {
        fileOut.write(bytesOut.toByteArray());
      }
    }

    yarnClient = YarnClient.createYarnClient();
    yarnClient.init(new Configuration(yarnConf));
    yarnClient.start();

    fsImageTmpPath = fs.makeQualified(new Path("/tmp/" + FSIMAGE_FILENAME));
    fsVersionTmpPath = fs.makeQualified(new Path("/tmp/" + VERSION_FILENAME));
    blockImageOutputDir = fs.makeQualified(new Path("/tmp/blocks"));
    auditTraceDir = fs.makeQualified(new Path("/tmp/audit_trace_direct"));
    confZip = fs.makeQualified(new Path("/tmp/conf.zip"));

    uploadFsimageResourcesToHDFS(hadoopBinVersion);

    miniYARNCluster.waitForNodeManagersToConnect(30000);

    RMNodeLabelsManager nodeLabelManager = miniYARNCluster.getResourceManager()
        .getRMContext().getNodeLabelManager();
    nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(
        Sets.newHashSet(NAMENODE_NODELABEL, DATANODE_NODELABEL));
    Map<NodeId, Set<String>> nodeLabels = new HashMap<>();
    nodeLabels.put(miniYARNCluster.getNodeManager(0).getNMContext().getNodeId(),
        Sets.newHashSet(NAMENODE_NODELABEL));
    nodeLabels.put(miniYARNCluster.getNodeManager(1).getNMContext().getNodeId(),
        Sets.newHashSet(DATANODE_NODELABEL));
    nodeLabelManager.addLabelsToNode(nodeLabels);
  }

  @AfterClass
  public static void teardownClass() throws Exception {
    if (miniDFSCluster != null) {
      miniDFSCluster.shutdown(true);
      miniDFSCluster = null;
    }
    if (yarnClient != null) {
      yarnClient.stop();
      yarnClient = null;
    }
    if (miniYARNCluster != null) {
      miniYARNCluster.getResourceManager().stop();
      miniYARNCluster.getResourceManager().waitForServiceToStop(30000);
      miniYARNCluster.stop();
      miniYARNCluster.waitForServiceToStop(30000);
      FileUtils.deleteDirectory(miniYARNCluster.getTestWorkDir());
      miniYARNCluster = null;
    }
    if (hadoopUnpackedDir != null) {
      FileUtils.deleteDirectory(hadoopUnpackedDir);
    }
  }

  @After
  public void tearDown() throws Exception {
    if (infraAppId != null && yarnClient != null) {
      yarnClient.killApplication(infraAppId);
    }
    infraAppId = null;
  }

  @Test(timeout = 15 * 60 * 1000)
  public void testNameNodeInYARN() throws Exception {
    Configuration localConf = new Configuration(yarnConf);
    localConf.setLong(AuditLogDirectParser.AUDIT_START_TIMESTAMP_KEY, 60000);

    final Client client = createAndStartClient(localConf);

    awaitApplicationStartup();

    long startTime = System.currentTimeMillis();
    long maxWaitTimeMs = TimeUnit.MINUTES.toMillis(10);
    Supplier<Boolean> exitCheckSupplier = () -> {
      if (System.currentTimeMillis() - startTime > maxWaitTimeMs) {
        // Wait at most 10 minutes for the NameNode to start and be ready
        return true;
      }
      try {
        // Exit immediately if the YARN app fails
        return yarnClient.getApplicationReport(infraAppId)
            .getYarnApplicationState() == YarnApplicationState.FAILED;
      } catch (IOException | YarnException e) {
        return true;
      }
    };
    Optional<Properties> namenodeProperties = DynoInfraUtils
        .waitForAndGetNameNodeProperties(exitCheckSupplier, localConf,
            client.getNameNodeInfoPath(), LOG);
    if (!namenodeProperties.isPresent()) {
      fail("Unable to fetch NameNode properties");
    }

    DynoInfraUtils.waitForNameNodeReadiness(namenodeProperties.get(), 3, false,
        exitCheckSupplier, localConf, LOG);

    assertClusterIsFunctional(localConf, namenodeProperties.get());

    Map<ContainerId, Container> namenodeContainers = miniYARNCluster
        .getNodeManager(0).getNMContext().getContainers();
    Map<ContainerId, Container> datanodeContainers = miniYARNCluster
        .getNodeManager(1).getNMContext().getContainers();
    Map<ContainerId, Container> amContainers = miniYARNCluster.getNodeManager(2)
        .getNMContext().getContainers();
    assertEquals(1, namenodeContainers.size());
    assertEquals(2,
        namenodeContainers.keySet().iterator().next().getContainerId());
    assertEquals(2, datanodeContainers.size());
    assertEquals(1, amContainers.size());
    assertEquals(1, amContainers.keySet().iterator().next().getContainerId());

    LOG.info("Waiting for workload job to start and complete");
    GenericTestUtils.waitFor(() -> {
      try {
        return client.getWorkloadJob() != null
            && client.getWorkloadJob().isComplete();
      } catch (IOException | IllegalStateException e) {
        return false;
      }
    }, 3000, 60000);
    LOG.info("Workload job completed");

    if (!client.getWorkloadJob().isSuccessful()) {
      fail("Workload job failed");
    }
    Counters counters = client.getWorkloadJob().getCounters();
    assertEquals(6,
        counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS)
            .getValue());
    assertEquals(1,
        counters
            .findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS)
            .getValue());

    LOG.info("Waiting for infra application to exit");
    GenericTestUtils.waitFor(() -> {
      try {
        ApplicationReport report = yarnClient
            .getApplicationReport(infraAppId);
        return report
              .getYarnApplicationState() == YarnApplicationState.KILLED;
      } catch (IOException | YarnException e) {
        return false;
      }
    }, 3000, 300000);

    LOG.info("Waiting for metrics file to be ready");
    // Try to read the metrics file
    Path hdfsStoragePath = new Path(fs.getHomeDirectory(),
        DynoConstants.DYNAMOMETER_STORAGE_DIR + "/" + infraAppId);
    final Path metricsPath = new Path(hdfsStoragePath, "namenode_metrics");
    GenericTestUtils.waitFor(() -> {
      try {
        FSDataInputStream in = fs.open(metricsPath);
        String metricsOutput = in.readUTF();
        in.close();
        // Just assert that there is some metrics content in there
        assertTrue(metricsOutput.contains("JvmMetrics"));
        return true;
      } catch (IOException ioe) {
        return false;
      }
    }, 3000, 60000);
    assertTrue(fs.exists(new Path(OUTPUT_PATH)));
  }

  private void assertClusterIsFunctional(Configuration localConf,
      Properties namenodeProperties) throws IOException {
    // Test that we can successfully write to / read from the cluster
    try {
      URI nameNodeUri = DynoInfraUtils.getNameNodeHdfsUri(namenodeProperties);
      DistributedFileSystem dynoFS =
          (DistributedFileSystem) FileSystem.get(nameNodeUri, localConf);
      Path testFile = new Path("/tmp/test/foo");
      dynoFS.mkdir(testFile.getParent(), FsPermission.getDefault());
      FSDataOutputStream out = dynoFS.create(testFile, (short) 1);
      out.write(42);
      out.hsync();
      out.close();
      FileStatus[] stats = dynoFS.listStatus(testFile.getParent());
      assertEquals(1, stats.length);
      assertEquals("foo", stats[0].getPath().getName());
    } catch (IOException e) {
      LOG.error("Failed to write or read", e);
      throw e;
    }
  }

  private void awaitApplicationStartup()
      throws TimeoutException, InterruptedException {
    LOG.info("Waiting for application ID to become available");
    GenericTestUtils.waitFor(() -> {
      try {
        List<ApplicationReport> apps = yarnClient.getApplications();
        if (apps.size() == 1) {
          infraAppId = apps.get(0).getApplicationId();
          return true;
        } else if (apps.size() > 1) {
          fail("Unexpected: more than one application");
        }
      } catch (IOException | YarnException e) {
        fail("Unexpected exception: " + e);
      }
      return false;
    }, 1000, 60000);
  }

  private Client createAndStartClient(Configuration localConf) {
    final Client client = new Client(JarFinder.getJar(ApplicationMaster.class),
        JarFinder.getJar(Assert.class));
    client.setConf(localConf);
    Thread appThread = new Thread(() -> {
      try {
        client.run(new String[] {"-" + Client.MASTER_MEMORY_MB_ARG, "128",
            "-" + Client.CONF_PATH_ARG, confZip.toString(),
            "-" + Client.BLOCK_LIST_PATH_ARG,
            blockImageOutputDir.toString(), "-" + Client.FS_IMAGE_DIR_ARG,
            fsImageTmpPath.getParent().toString(),
            "-" + Client.HADOOP_BINARY_PATH_ARG,
            hadoopTarballPath.getAbsolutePath(),
            "-" + AMOptions.DATANODES_PER_CLUSTER_ARG, "2",
            "-" + AMOptions.DATANODE_MEMORY_MB_ARG, "128",
            "-" + AMOptions.DATANODE_NODELABEL_ARG, DATANODE_NODELABEL,
            "-" + AMOptions.NAMENODE_MEMORY_MB_ARG, "256",
            "-" + AMOptions.NAMENODE_METRICS_PERIOD_ARG, "1",
            "-" + AMOptions.NAMENODE_NODELABEL_ARG, NAMENODE_NODELABEL,
            "-" + AMOptions.SHELL_ENV_ARG,
            "HADOOP_HOME=" + getHadoopHomeLocation(),
            "-" + AMOptions.SHELL_ENV_ARG,
            "HADOOP_CONF_DIR=" + getHadoopHomeLocation() + "/etc/hadoop",
            "-" + Client.WORKLOAD_REPLAY_ENABLE_ARG,
            "-" + Client.WORKLOAD_INPUT_PATH_ARG,
            fs.makeQualified(new Path("/tmp/audit_trace_direct")).toString(),
            "-" + Client.WORKLOAD_OUTPUT_PATH_ARG,
            fs.makeQualified(new Path(OUTPUT_PATH)).toString(),
            "-" + Client.WORKLOAD_THREADS_PER_MAPPER_ARG, "1",
            "-" + Client.WORKLOAD_START_DELAY_ARG, "10s",
            "-" + AMOptions.NAMENODE_ARGS_ARG,
            "-Ddfs.namenode.safemode.extension=0"});
      } catch (Exception e) {
        LOG.error("Error running client", e);
      }
    });

    appThread.start();
    return client;
  }

  private static URI getResourcePath(String resourceName) {
    try {
      return TestDynamometerInfra.class.getClassLoader()
          .getResource(resourceName).toURI();
    } catch (URISyntaxException e) {
      return null;
    }
  }

  /**
   * Get the Hadoop home location (i.e. for {@code HADOOP_HOME}) as the only
   * directory within the unpacked location of the Hadoop tarball.
   *
   * @return The absolute path to the Hadoop home directory.
   */
  private String getHadoopHomeLocation() {
    File[] files = hadoopUnpackedDir.listFiles();
    if (files == null || files.length != 1) {
      fail("Should be 1 directory within the Hadoop unpacked dir");
    }
    return files[0].getAbsolutePath();
  }

  /**
   * Look for the resource files relevant to {@code hadoopBinVersion} and upload
   * them onto the MiniDFSCluster's HDFS for use by the subsequent jobs.
   *
   * @param hadoopBinVersion
   *          The version string (e.g. "3.1.1") for which to look for resources.
   */
  private static void uploadFsimageResourcesToHDFS(String hadoopBinVersion)
      throws IOException {
    // Keep only the major/minor version for the resources path
    String[] versionComponents = hadoopBinVersion.split("\\.");
    if (versionComponents.length < 2) {
      fail(
          "At least major and minor version are required to be specified; got: "
              + hadoopBinVersion);
    }
    String hadoopResourcesPath = "hadoop_" + versionComponents[0] + "_"
        + versionComponents[1];
    String fsImageResourcePath = hadoopResourcesPath + "/" + FSIMAGE_FILENAME;
    fs.copyFromLocalFile(new Path(getResourcePath(fsImageResourcePath)),
        fsImageTmpPath);
    fs.copyFromLocalFile(
        new Path(getResourcePath(fsImageResourcePath + ".md5")),
        fsImageTmpPath.suffix(".md5"));
    fs.copyFromLocalFile(
        new Path(getResourcePath(hadoopResourcesPath + "/" + VERSION_FILENAME)),
        fsVersionTmpPath);
    fs.mkdirs(auditTraceDir);
    IOUtils.copyBytes(
        TestDynamometerInfra.class.getClassLoader()
            .getResourceAsStream("audit_trace_direct/audit0"),
        fs.create(new Path(auditTraceDir, "audit0")), conf, true);
    fs.mkdirs(blockImageOutputDir);
    for (String blockFile : new String[] {"dn0-a-0-r-00000", "dn1-a-0-r-00001",
        "dn2-a-0-r-00002"}) {
      IOUtils.copyBytes(
          TestDynamometerInfra.class.getClassLoader()
              .getResourceAsStream("blocks/" + blockFile),
          fs.create(new Path(blockImageOutputDir, blockFile)), conf, true);
    }
    File tempConfZip = new File(testBaseDir, "conf.zip");
    ZipOutputStream zos = new ZipOutputStream(
        new FileOutputStream(tempConfZip));
    for (String file : new String[] {"core-site.xml", "hdfs-site.xml",
        "log4j.properties"}) {
      zos.putNextEntry(new ZipEntry("etc/hadoop/" + file));
      InputStream is = TestDynamometerInfra.class.getClassLoader()
          .getResourceAsStream("conf/etc/hadoop/" + file);
      IOUtils.copyBytes(is, zos, conf, false);
      is.close();
      zos.closeEntry();
    }
    zos.close();
    fs.copyFromLocalFile(new Path(tempConfZip.toURI()), confZip);
    tempConfZip.delete();
  }

}