TestStripedFileAppend.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Tests append on erasure coded file.
 */
public class TestStripedFileAppend {
  public static final Logger LOG = LoggerFactory.getLogger(TestStripedFileAppend.class);

  static {
    DFSTestUtil.setNameNodeLogLevel(Level.TRACE);
  }

  private static final int NUM_DATA_BLOCKS =
      StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
  private static final int CELL_SIZE =
      StripedFileTestUtil.getDefaultECPolicy().getCellSize();
  private static final int NUM_DN = 9;
  private static final int STRIPES_PER_BLOCK = 4;
  private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
  private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
  private static final Random RANDOM = new Random();

  private MiniDFSCluster cluster;
  private DistributedFileSystem dfs;
  private Path dir = new Path("/TestFileAppendStriped");
  private HdfsConfiguration conf = new HdfsConfiguration();

  @Before
  public void setup() throws IOException {
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).build();
    cluster.waitActive();
    dfs = cluster.getFileSystem();
    dfs.mkdirs(dir);
    dfs.setErasureCodingPolicy(dir, null);
  }

  @After
  public void tearDown() throws IOException {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  /**
   * test simple append to a closed striped file, with NEW_BLOCK flag enabled.
   */
  @Test
  public void testAppendToNewBlock() throws IOException {
    int fileLength = 0;
    int totalSplit = 6;
    byte[] expected =
        StripedFileTestUtil.generateBytes(BLOCK_GROUP_SIZE * totalSplit);

    Path file = new Path(dir, "testAppendToNewBlock");
    FSDataOutputStream out;
    for (int split = 0; split < totalSplit; split++) {
      if (split == 0) {
        out = dfs.create(file);
      } else {
        out = dfs.append(file,
            EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
      }
      int splitLength = RANDOM.nextInt(BLOCK_GROUP_SIZE);
      out.write(expected, fileLength, splitLength);
      fileLength += splitLength;
      out.close();
    }
    expected = Arrays.copyOf(expected, fileLength);
    LocatedBlocks lbs =
        dfs.getClient().getLocatedBlocks(file.toString(), 0L, Long.MAX_VALUE);
    assertEquals(totalSplit, lbs.getLocatedBlocks().size());
    StripedFileTestUtil.verifyStatefulRead(dfs, file, fileLength, expected,
        new byte[4096]);
    StripedFileTestUtil.verifySeek(dfs, file, fileLength,
        StripedFileTestUtil.getDefaultECPolicy(), totalSplit);
  }

  @Test
  public void testAppendWithoutNewBlock() throws IOException {
    Path file = new Path(dir, "testAppendWithoutNewBlock");

    // Create file
    FSDataOutputStream out = dfs.create(file);
    out.write("testAppendWithoutNewBlock".getBytes());
    out.close();

    // Append file
    try {
      out = dfs.append(file, EnumSet.of(CreateFlag.APPEND), 4096, null);
      out.write("testAppendWithoutNewBlock".getBytes());
      fail("Should throw unsupported operation");
    } catch (Exception e) {
      assertTrue(e.getMessage()
          .contains("Append on EC file without new block is not supported"));
    }

    List<OpenFilesType> types = new ArrayList<>();
    types.add(OpenFilesType.ALL_OPEN_FILES);

    RemoteIterator<OpenFileEntry> listOpenFiles = dfs
        .listOpenFiles(EnumSet.copyOf(types), file.toString());
    assertFalse("No file should be open after append failure",
        listOpenFiles.hasNext());
  }
}