TestJournaledEditsCache.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.thirdparty.com.google.common.primitives.Bytes;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.PathUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createGabageTxns;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;


/**
 * Test the {@link JournaledEditsCache} used for caching edits in-memory on the
 * {@link Journal}.
 */
public class TestJournaledEditsCache {

  private static final int EDITS_CAPACITY = 100;

  private static final File TEST_DIR =
      PathUtils.getTestDir(TestJournaledEditsCache.class, false);
  private JournaledEditsCache cache;

  @Before
  public void setup() throws Exception {
    Configuration conf = new Configuration();
    conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
        createTxnData(1, 1).length * EDITS_CAPACITY);
    cache = new JournaledEditsCache(conf);
    TEST_DIR.mkdirs();
  }

  @After
  public void cleanup() throws Exception {
    FileUtils.deleteQuietly(TEST_DIR);
  }

  @Test
  public void testCacheSingleSegment() throws Exception {
    storeEdits(1, 20);
    // Leading part of the segment
    assertTxnCountAndContents(1, 5, 5);
    // All of the segment
    assertTxnCountAndContents(1, 20, 20);
    // Past the segment
    assertTxnCountAndContents(1, 40, 20);
    // Trailing part of the segment
    assertTxnCountAndContents(10, 11, 20);
    // Trailing part of the segment, past the end
    assertTxnCountAndContents(10, 20, 20);
  }

  @Test
  public void testCacheBelowCapacityRequestOnBoundary() throws Exception {
    storeEdits(1, 5);
    storeEdits(6, 20);
    storeEdits(21, 30);

    // First segment only
    assertTxnCountAndContents(1, 3, 3);
    // Second segment only
    assertTxnCountAndContents(6, 10, 15);
    // First and second segment
    assertTxnCountAndContents(1, 7, 7);
    // All three segments
    assertTxnCountAndContents(1, 25, 25);
    // Second and third segment
    assertTxnCountAndContents(6, 20, 25);
    // Second and third segment; request past the end
    assertTxnCountAndContents(6, 50, 30);
    // Third segment only; request past the end
    assertTxnCountAndContents(21, 20, 30);
  }

  @Test
  public void testCacheBelowCapacityRequestOffBoundary() throws Exception {
    storeEdits(1, 5);
    storeEdits(6, 20);
    storeEdits(21, 30);

    // First segment only
    assertTxnCountAndContents(3, 1, 3);
    // First and second segment
    assertTxnCountAndContents(3, 6, 8);
    // Second and third segment
    assertTxnCountAndContents(15, 10, 24);
    // Second and third segment; request past the end
    assertTxnCountAndContents(15, 50, 30);
    // Start read past the end
    List<ByteBuffer> buffers = new ArrayList<>();
    assertEquals(0, cache.retrieveEdits(31, 10, buffers));
    assertTrue(buffers.isEmpty());
  }

  @Test
  public void testCacheAboveCapacity() throws Exception {
    int thirdCapacity = EDITS_CAPACITY / 3;
    storeEdits(1, thirdCapacity);
    storeEdits(thirdCapacity + 1, thirdCapacity * 2);
    storeEdits(thirdCapacity * 2 + 1, EDITS_CAPACITY);
    storeEdits(EDITS_CAPACITY + 1, thirdCapacity * 4);
    storeEdits(thirdCapacity * 4 + 1, thirdCapacity * 5);

    try {
      cache.retrieveEdits(1, 10, new ArrayList<>());
      fail();
    } catch (IOException ioe) {
      // expected
    }
    assertTxnCountAndContents(EDITS_CAPACITY + 1, EDITS_CAPACITY,
        thirdCapacity * 5);
  }

  @Test
  public void testCacheSingleAdditionAboveCapacity() throws Exception {
    LogCapturer logs = LogCapturer.captureLogs(Journal.LOG);
    storeEdits(1, EDITS_CAPACITY * 2);
    logs.stopCapturing();
    assertTrue(logs.getOutput().contains("batch of edits was too large"));
    try {
      cache.retrieveEdits(1, 1, new ArrayList<>());
      fail();
    } catch (IOException ioe) {
      // expected
    }
    storeEdits(EDITS_CAPACITY * 2 + 1, EDITS_CAPACITY * 2 + 5);
    assertTxnCountAndContents(EDITS_CAPACITY * 2 + 1, 5,
        EDITS_CAPACITY * 2 + 5);
  }

  @Test
  public void testCacheWithFutureLayoutVersion() throws Exception {
    byte[] firstHalf = createGabageTxns(1, 5);
    byte[] secondHalf = createGabageTxns(6, 5);
    int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
    cache.storeEdits(Bytes.concat(firstHalf, secondHalf), 1, 10,
        futureVersion);
    List<ByteBuffer> buffers = new ArrayList<>();
    assertEquals(5, cache.retrieveEdits(6, 5, buffers));
    assertArrayEquals(getHeaderForLayoutVersion(futureVersion),
        buffers.get(0).array());
    byte[] retBytes = new byte[buffers.get(1).remaining()];
    System.arraycopy(buffers.get(1).array(), buffers.get(1).position(),
        retBytes, 0, buffers.get(1).remaining());
    assertArrayEquals(secondHalf, retBytes);
  }

  @Test
  public void testCacheWithMultipleLayoutVersions() throws Exception {
    int oldLayout = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1;
    cache.storeEdits(createTxnData(1, 5), 1, 5, oldLayout);
    storeEdits(6, 10);
    // Ensure the cache will only return edits from a single
    // layout version at a time
    try {
      cache.retrieveEdits(1, 50, new ArrayList<>());
      fail("Expected a cache miss");
    } catch (JournaledEditsCache.CacheMissException cme) {
      // expected
    }
    assertTxnCountAndContents(6, 50, 10);
  }

  @Test
  public void testCacheEditsWithGaps() throws Exception {
    storeEdits(1, 5);
    storeEdits(10, 15);

    try {
      cache.retrieveEdits(1, 20, new ArrayList<>());
      fail();
    } catch (JournaledEditsCache.CacheMissException cme) {
      assertEquals(9, cme.getCacheMissAmount());
    }
    assertTxnCountAndContents(10, 10, 15);
  }

  @Test(expected = JournaledEditsCache.CacheMissException.class)
  public void testReadUninitializedCache() throws Exception {
    cache.retrieveEdits(1, 10, new ArrayList<>());
  }

  @Test(expected = JournaledEditsCache.CacheMissException.class)
  public void testCacheMalformedInput() throws Exception {
    storeEdits(1, 1);
    cache.retrieveEdits(-1, 10, new ArrayList<>());
  }

  @Test
  public void testCacheSizeConfigs() {
    // Assert the default configs.
    Configuration config = new Configuration();
    cache = new JournaledEditsCache(config);
    assertEquals((int) (Runtime.getRuntime().maxMemory() * 0.5f), cache.getCapacity());

    // Set dfs.journalnode.edit-cache-size.bytes.
    Configuration config1 = new Configuration();
    config1.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, 1);
    config1.setFloat(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, 0.1f);
    cache = new JournaledEditsCache(config1);
    assertEquals(1, cache.getCapacity());

    // Don't set dfs.journalnode.edit-cache-size.bytes.
    Configuration config2 = new Configuration();
    config2.setFloat(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, 0.1f);
    cache = new JournaledEditsCache(config2);
    assertEquals((int) (Runtime.getRuntime().maxMemory() * 0.1f), cache.getCapacity());
  }

  private void storeEdits(int startTxn, int endTxn) throws Exception {
    cache.storeEdits(createTxnData(startTxn, endTxn - startTxn + 1), startTxn,
        endTxn, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
  }

  private void assertTxnCountAndContents(int startTxn, int requestedMaxTxns,
      int expectedEndTxn) throws Exception {
    List<ByteBuffer> buffers = new ArrayList<>();
    int expectedTxnCount = expectedEndTxn - startTxn + 1;
    assertEquals(expectedTxnCount,
        cache.retrieveEdits(startTxn, requestedMaxTxns, buffers));

    byte[] expectedBytes = Bytes.concat(
        getHeaderForLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION),
        createTxnData(startTxn, expectedTxnCount));
    byte[] actualBytes =
        new byte[buffers.stream().mapToInt(ByteBuffer::remaining).sum()];
    int pos = 0;
    for (ByteBuffer buf : buffers) {
      System.arraycopy(buf.array(), buf.position(), actualBytes, pos,
          buf.remaining());
      pos += buf.remaining();
    }
    assertArrayEquals(expectedBytes, actualBytes);
  }

  private static byte[] getHeaderForLayoutVersion(int version)
      throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    EditLogFileOutputStream.writeHeader(version, new DataOutputStream(baos));
    return baos.toByteArray();
  }

}