TestFileAppend3.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.EnumSet;
import java.util.List;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.Answer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;

/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3  {
  {
    DFSTestUtil.setNameNodeLogLevel(Level.TRACE);
    GenericTestUtils.setLogLevel(DataNode.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(InterDatanodeProtocol.LOG, Level.TRACE);
  }

  static final long BLOCK_SIZE = 64 * 1024;
  static final short REPLICATION = 3;
  static final int DATANODE_NUM = 5;

  private static Configuration conf;
  private static int buffersize;
  private static MiniDFSCluster cluster;
  private static DistributedFileSystem fs;

  @BeforeClass
  public static void setUp() throws java.lang.Exception {
    AppendTestUtil.LOG.info("setUp()");
    conf = new HdfsConfiguration();
    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
    buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
    fs = cluster.getFileSystem();
  }
   
  @AfterClass
  public static void tearDown() throws Exception {
    AppendTestUtil.LOG.info("tearDown()");
    if(fs != null) fs.close();
    if(cluster != null) cluster.shutdown();
  }

  /**
   * TC1: Append on block boundary.
   * @throws IOException an exception might be thrown
   */
  @Test
  public void testTC1() throws Exception {
    final Path p = new Path("/TC1/foo");
    System.out.println("p=" + p);

    //a. Create file and write one block of data. Close file.
    final int len1 = (int)BLOCK_SIZE; 
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }

    //   Reopen file to append. Append half block of data. Close file.
    final int len2 = (int)BLOCK_SIZE/2; 
    {
      FSDataOutputStream out = fs.append(p);
      AppendTestUtil.write(out, len1, len2);
      out.close();
    }
    
    //b. Reopen file and read 1.5 blocks worth of data. Close file.
    AppendTestUtil.check(fs, p, len1 + len2);
  }

  @Test
  public void testTC1ForAppend2() throws Exception {
    final Path p = new Path("/TC1/foo2");

    //a. Create file and write one block of data. Close file.
    final int len1 = (int) BLOCK_SIZE;
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
          BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }

    // Reopen file to append. Append half block of data. Close file.
    final int len2 = (int) BLOCK_SIZE / 2;
    {
      FSDataOutputStream out = fs.append(p,
          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
      AppendTestUtil.write(out, len1, len2);
      out.close();
    }

    // b. Reopen file and read 1.5 blocks worth of data. Close file.
    AppendTestUtil.check(fs, p, len1 + len2);
  }

  /**
   * TC2: Append on non-block boundary.
   * @throws IOException an exception might be thrown
   */
  @Test
  public void testTC2() throws Exception {
    final Path p = new Path("/TC2/foo");
    System.out.println("p=" + p);

    //a. Create file with one and a half block of data. Close file.
    final int len1 = (int)(BLOCK_SIZE + BLOCK_SIZE/2); 
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }

    AppendTestUtil.check(fs, p, len1);

    //   Reopen file to append quarter block of data. Close file.
    final int len2 = (int)BLOCK_SIZE/4; 
    {
      FSDataOutputStream out = fs.append(p);
      AppendTestUtil.write(out, len1, len2);
      out.close();
    }

    //b. Reopen file and read 1.75 blocks of data. Close file.
    AppendTestUtil.check(fs, p, len1 + len2);
  }

  @Test
  public void testTC2ForAppend2() throws Exception {
    final Path p = new Path("/TC2/foo2");

    //a. Create file with one and a half block of data. Close file.
    final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2);
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
          BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }

    AppendTestUtil.check(fs, p, len1);

    //   Reopen file to append quarter block of data. Close file.
    final int len2 = (int) BLOCK_SIZE / 4;
    {
      FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
          4096, null);
      AppendTestUtil.write(out, len1, len2);
      out.close();
    }

    // b. Reopen file and read 1.75 blocks of data. Close file.
    AppendTestUtil.check(fs, p, len1 + len2);
    List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
        p.toString(), 0L).getLocatedBlocks();
    Assert.assertEquals(3, blocks.size());
    Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize());
    Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize());
    Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize());
  }

  /**
   * TC5: Only one simultaneous append.
   * @throws IOException an exception might be thrown
   */
  @Test
  public void testTC5() throws Exception {
    final Path p = new Path("/TC5/foo");
    System.out.println("p=" + p);

    //a. Create file on Machine M1. Write half block to it. Close file.
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
      AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
      out.close();
    }

    //b. Reopen file in "append" mode on Machine M1.
    FSDataOutputStream out = fs.append(p);

    //c. On Machine M2, reopen file in "append" mode. This should fail.
    try {
      AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
      fail("This should fail.");
    } catch(IOException ioe) {
      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
    }

    try {
      ((DistributedFileSystem) AppendTestUtil
          .createHdfsWithDifferentUsername(conf)).append(p,
          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
      fail("This should fail.");
    } catch(IOException ioe) {
      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
    }

    //d. On Machine M1, close file.
    out.close();        
  }

  @Test
  public void testTC5ForAppend2() throws Exception {
    final Path p = new Path("/TC5/foo2");

    // a. Create file on Machine M1. Write half block to it. Close file.
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
          BLOCK_SIZE);
      AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
      out.close();
    }

    // b. Reopen file in "append" mode on Machine M1.
    FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
        4096, null);

    // c. On Machine M2, reopen file in "append" mode. This should fail.
    try {
      ((DistributedFileSystem) AppendTestUtil
          .createHdfsWithDifferentUsername(conf)).append(p,
          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
      fail("This should fail.");
    } catch(IOException ioe) {
      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
    }

    try {
      AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
      fail("This should fail.");
    } catch(IOException ioe) {
      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
    }

    // d. On Machine M1, close file.
    out.close();
  }

  /**
   * TC7: Corrupted replicas are present.
   * @throws IOException an exception might be thrown
   */
  private void testTC7(boolean appendToNewBlock) throws Exception {
    final short repl = 2;
    final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1"));
    System.out.println("p=" + p);
    
    //a. Create file with replication factor of 2. Write half block of data. Close file.
    final int len1 = (int)(BLOCK_SIZE/2); 
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, repl, BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }
    DFSTestUtil.waitReplication(fs, p, repl);

    //b. Log into one datanode that has one replica of this block.
    //   Find the block file on this datanode and truncate it to zero size.
    final LocatedBlocks locatedblocks = fs.dfs.getNamenode().getBlockLocations(p.toString(), 0L, len1);
    assertEquals(1, locatedblocks.locatedBlockCount());
    final LocatedBlock lb = locatedblocks.get(0);
    final ExtendedBlock blk = lb.getBlock();
    assertEquals(len1, lb.getBlockSize());

    DatanodeInfo[] datanodeinfos = lb.getLocations();
    assertEquals(repl, datanodeinfos.length);
    final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
    cluster.getMaterializedReplica(dn, blk).truncateData(0);

    //c. Open file in "append mode".  Append a new block worth of data. Close file.
    final int len2 = (int)BLOCK_SIZE; 
    {
      FSDataOutputStream out = appendToNewBlock ?
          fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p);
      AppendTestUtil.write(out, len1, len2);
      out.close();
    }

    //d. Reopen file and read two blocks worth of data.
    AppendTestUtil.check(fs, p, len1 + len2);
  }

  @Test
  public void testTC7() throws Exception {
    testTC7(false);
  }

  @Test
  public void testTC7ForAppend2() throws Exception {
    testTC7(true);
  }

  /**
   * TC11: Racing rename
   */
  private void testTC11(boolean appendToNewBlock) throws Exception {
    final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1"));
    System.out.println("p=" + p);

    //a. Create file and write one block of data. Close file.
    final int len1 = (int)BLOCK_SIZE; 
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }

    //b. Reopen file in "append" mode. Append half block of data.
    FSDataOutputStream out = appendToNewBlock ?
        fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
        fs.append(p);
    final int len2 = (int)BLOCK_SIZE/2; 
    AppendTestUtil.write(out, len1, len2);
    out.hflush();
    
    //c. Rename file to file.new.
    final Path pnew = new Path(p + ".new");
    assertTrue(fs.rename(p, pnew));

    //d. Close file handle that was opened in (b). 
    out.close();

    //check block sizes
    final long len = fs.getFileStatus(pnew).getLen();
    final LocatedBlocks locatedblocks = fs.dfs.getNamenode().getBlockLocations(pnew.toString(), 0L, len);
    final int numblock = locatedblocks.locatedBlockCount();
    for(int i = 0; i < numblock; i++) {
      final LocatedBlock lb = locatedblocks.get(i);
      final ExtendedBlock blk = lb.getBlock();
      final long size = lb.getBlockSize();
      if (i < numblock - 1) {
        assertEquals(BLOCK_SIZE, size);
      }
      for(DatanodeInfo datanodeinfo : lb.getLocations()) {
        final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
        final Block metainfo = DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
            blk.getBlockPoolId(), blk.getBlockId());
        assertEquals(size, metainfo.getNumBytes());
      }
    }
  }

  @Test
  public void testTC11() throws Exception {
    testTC11(false);
  }

  @Test
  public void testTC11ForAppend2() throws Exception {
    testTC11(true);
  }

  /** 
   * TC12: Append to partial CRC chunk
   */
  private void testTC12(boolean appendToNewBlock) throws Exception {
    final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1"));
    System.out.println("p=" + p);
    
    //a. Create file with a block size of 64KB
    //   and a default io.bytes.per.checksum of 512 bytes.
    //   Write 25687 bytes of data. Close file.
    final int len1 = 25687; 
    {
      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
      AppendTestUtil.write(out, 0, len1);
      out.close();
    }

    //b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
    final int len2 = 5877; 
    {
      FSDataOutputStream out = appendToNewBlock ?
          fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
          fs.append(p);
      AppendTestUtil.write(out, len1, len2);
      out.close();
    }

    //c. Reopen file and read 25687+5877 bytes of data from file. Close file.
    AppendTestUtil.check(fs, p, len1 + len2);
    if (appendToNewBlock) {
      LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0);
      Assert.assertEquals(2, blks.getLocatedBlocks().size());
      Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize());
      Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize());
      AppendTestUtil.check(fs, p, 0, len1);
      AppendTestUtil.check(fs, p, len1, len2);
    }
  }

  @Test
  public void testTC12() throws Exception {
    testTC12(false);
  }

  @Test
  public void testTC12ForAppend2() throws Exception {
    testTC12(true);
  }

  /**
   * Append to a partial CRC chunk and the first write does not fill up the
   * partial CRC trunk
   */
  private void testAppendToPartialChunk(boolean appendToNewBlock)
      throws IOException {
    final Path p = new Path("/partialChunk/foo"
        + (appendToNewBlock ? "0" : "1"));
    final int fileLen = 513;
    System.out.println("p=" + p);
    
    byte[] fileContents = AppendTestUtil.initBuffer(fileLen);

    // create a new file.
    FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, 1);

    // create 1 byte file
    stm.write(fileContents, 0, 1);
    stm.close();
    System.out.println("Wrote 1 byte and closed the file " + p);

    // append to file
    stm = appendToNewBlock ?
        fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
        fs.append(p);
    // Append to a partial CRC trunk
    stm.write(fileContents, 1, 1);
    stm.hflush();
    // The partial CRC trunk is not full yet and close the file
    stm.close();
    System.out.println("Append 1 byte and closed the file " + p);

    // write the remainder of the file
    stm = appendToNewBlock ?
        fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
        fs.append(p);

    // ensure getPos is set to reflect existing size of the file
    assertEquals(2, stm.getPos());

    // append to a partial CRC trunk
    stm.write(fileContents, 2, 1);
    // The partial chunk is not full yet, force to send a packet to DN
    stm.hflush();
    System.out.println("Append and flush 1 byte");
    // The partial chunk is not full yet, force to send another packet to DN
    stm.write(fileContents, 3, 2);
    stm.hflush();
    System.out.println("Append and flush 2 byte");

    // fill up the partial chunk and close the file
    stm.write(fileContents, 5, fileLen-5);
    stm.close();
    System.out.println("Flush 508 byte and closed the file " + p);

    // verify that entire file is good
    AppendTestUtil.checkFullFile(fs, p, fileLen,
        fileContents, "Failed to append to a partial chunk");
  }

  // Do small appends.
  void doSmallAppends(Path file, DistributedFileSystem fs, int iterations)
    throws IOException {
    for (int i = 0; i < iterations; i++) {
      FSDataOutputStream stm;
      try {
        stm = fs.append(file);
      } catch (IOException e) {
        // If another thread is already appending, skip this time.
        continue;
      }
      // Failure in write or close will be terminal.
      AppendTestUtil.write(stm, 0, 123);
      stm.close();
    }
  }


  @Test
  public void testSmallAppendRace()  throws Exception {
    final Path file = new Path("/testSmallAppendRace");
    final String fName = file.toUri().getPath();

    // Create the file and write a small amount of data.
    FSDataOutputStream stm = fs.create(file);
    AppendTestUtil.write(stm, 0, 123);
    stm.close();

    // Introduce a delay between getFileInfo and calling append() against NN.
    final DFSClient client = DFSClientAdapter.getDFSClient(fs);
    DFSClient spyClient = spy(client);
    when(spyClient.getFileInfo(fName)).thenAnswer(new Answer<HdfsFileStatus>() {
      @Override
      public HdfsFileStatus answer(InvocationOnMock invocation){
        try {
          HdfsFileStatus stat = client.getFileInfo(fName);
          Thread.sleep(100);
          return stat;
        } catch (Exception e) {
          return null;
        }
      }
    });

    DFSClientAdapter.setDFSClient(fs, spyClient);

    // Create two threads for doing appends to the same file.
    Thread worker1 = new Thread() {
      @Override
      public void run() {
        try {
          doSmallAppends(file, fs, 20);
        } catch (IOException e) {
        }
      }
    };

    Thread worker2 = new Thread() {
      @Override
      public void run() {
        try {
          doSmallAppends(file, fs, 20);
        } catch (IOException e) {
        }
      }
    };

    worker1.start();
    worker2.start();

    // append will fail when the file size crosses the checksum chunk boundary,
    // if append was called with a stale file stat.
    doSmallAppends(file, fs, 20);
  }

  @Test
  public void testAppendToPartialChunk() throws IOException {
    testAppendToPartialChunk(false);
  }

  @Test
  public void testAppendToPartialChunkforAppend2() throws IOException {
    testAppendToPartialChunk(true);
  }

  @Test
  public void testApppendToPartialChunkWithMiddleBlockNotComplete() throws IOException {
    final Path p = new Path("/TC8/foo");

    //a. Create file and write one block of data. Close file.
    final int len1 = (int) BLOCK_SIZE;
    try (FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
        BLOCK_SIZE)) {
      AppendTestUtil.write(out, 0, len1);
    }

    // Reopen file to append. This length is not the multiple of 512.
    final int len2 = (int) BLOCK_SIZE / 2 + 68;
    try (FSDataOutputStream out = fs.append(p,
        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
      AppendTestUtil.write(out, len1, len2);
    }

    // Reopen file to append with NEW_BLOCK flag again, this will make middle block
    // in file /TC8/foo not full.
    final int len3 = (int) BLOCK_SIZE / 2;
    try (FSDataOutputStream out = fs.append(p,
        EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) {
      AppendTestUtil.write(out, len1 + len2, len3);
    }

    // Not use NEW_BLOCK flag.
    final int len4 = 600;
    try(FSDataOutputStream out = fs.append(p,
        EnumSet.of(CreateFlag.APPEND), 4096, null)) {
      AppendTestUtil.write(out, len1 + len2 + len3, len4);
    }

    AppendTestUtil.check(fs, p, len1 + len2 + len3 + len4);
  }
}