JournaledEditsCache.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Preconditions;

/**
 * An in-memory cache of edits in their serialized form. This is used to serve
 * the {@link Journal#getJournaledEdits(long, int)} call, used by the
 * QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is
 * enabled.
 *
 * <p>When a batch of edits is received by the JournalNode, it is put into this
 * cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be
 * stored contiguously; if a batch of edits is stored that does not align with
 * the previously stored edits, the cache will be cleared before storing new
 * edits to avoid gaps. This decision is made because gaps are only handled
 * when in recovery mode, which the cache is not intended to be used for.
 *
 * <p>Batches of edits are stored in a {@link TreeMap} mapping the starting
 * transaction ID of the batch to the data buffer. Upon retrieval, the
 * relevant data buffers are concatenated together and a header is added
 * to construct a fully-formed edit data stream.
 *
 * <p>The cache is of a limited size capacity determined by
 * {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity
 * is exceeded after adding a new batch of edits, batches of edits are removed
 * until the total size is less than the capacity, starting from the ones
 * containing the oldest transactions. Transactions range in size, but a
 * decent rule of thumb is that 200 bytes are needed per transaction. Monitoring
 * the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended
 * to determine if the cache is too small; it will indicate both how many
 * cache misses occurred, and how many more transactions would have been
 * needed in the cache to serve the request.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
class JournaledEditsCache {

  private static final int INVALID_LAYOUT_VERSION = 0;
  private static final long INVALID_TXN_ID = -1;

  /** The capacity, in bytes, of this cache. */
  private final int capacity;

  /**
   * Read/write lock pair wrapped in AutoCloseable; these refer to the same
   * underlying lock.
   */
  private final AutoCloseableLock readLock;
  private final AutoCloseableLock writeLock;

  // ** Start lock-protected fields **

  /**
   * Stores the actual data as a mapping of the StartTxnId of a batch of edits
   * to the serialized batch of edits. Stores only contiguous ranges; that is,
   * the last transaction ID in one batch is always one less than the first
   * transaction ID in the next batch. Though the map is protected by the lock,
   * individual data buffers are immutable and can be accessed without locking.
   */
  private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
  /** Stores the layout version currently present in the cache. */
  private int layoutVersion = INVALID_LAYOUT_VERSION;
  /** Stores the serialized version of the header for the current version. */
  private ByteBuffer layoutHeader;

  /**
   * The lowest/highest transaction IDs present in the cache.
   * {@value INVALID_TXN_ID} if there are no transactions in the cache.
   */
  private long lowestTxnId;
  private long highestTxnId;
  /**
   * The lowest transaction ID that was ever present in the cache since last
   * being reset (i.e. since initialization or since reset due to being out of
   * sync with the Journal). Until the cache size goes above capacity, this is
   * equal to lowestTxnId.
   */
  private long initialTxnId;
  /** The current total size of all buffers in this cache. */
  private int totalSize;

  // ** End lock-protected fields **

  JournaledEditsCache(Configuration conf) {
    float fraction = conf.getFloat(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY,
        DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_DEFAULT);
    Preconditions.checkArgument((fraction > 0 && fraction < 1.0f),
        String.format("Cache config %s is set at %f, it should be a positive float value, " +
            "less than 1.0. The recommended value is less than 0.9.",
            DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, fraction));
    capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
        (int) (Runtime.getRuntime().maxMemory() * fraction));
    if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
      Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
          "maximum JVM memory is only %d bytes. It is recommended that you " +
          "decrease the cache size/fraction or increase the heap size.",
          capacity, Runtime.getRuntime().maxMemory()));
    }
    Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
        "of bytes: " + capacity);
    ReadWriteLock lock = new ReentrantReadWriteLock(true);
    readLock = new AutoCloseableLock(lock.readLock());
    writeLock = new AutoCloseableLock(lock.writeLock());
    initialize(INVALID_TXN_ID);
  }

  /**
   * Fetch the data for edits starting at the specific transaction ID, fetching
   * up to {@code maxTxns} transactions. Populates a list of output buffers
   * which contains a serialized version of the edits, and returns the count of
   * edits contained within the serialized buffers. The serialized edits are
   * prefixed with a standard edit log header containing information about the
   * layout version. The transactions returned are guaranteed to have contiguous
   * transaction IDs.
   *
   * If {@code requestedStartTxn} is higher than the highest transaction which
   * has been added to this cache, a response with an empty buffer and a
   * transaction count of 0 will be returned. If {@code requestedStartTxn} is
   * lower than the lowest transaction currently contained in this cache, or no
   * transactions have yet been added to the cache, an exception will be thrown.
   *
   * @param requestedStartTxn The ID of the first transaction to return. If any
   *                          transactions are returned, it is guaranteed that
   *                          the first one will have this ID.
   * @param maxTxns The maximum number of transactions to return.
   * @param outputBuffers A list to populate with output buffers. When
   *                      concatenated, these form a full response.
   * @return The number of transactions contained within the set of output
   *         buffers.
   * @throws IOException If transactions are requested which cannot be served
   *                     by this cache.
   */
  int retrieveEdits(long requestedStartTxn, int maxTxns,
      List<ByteBuffer> outputBuffers) throws IOException {
    int txnCount = 0;

    try (AutoCloseableLock l = readLock.acquire()) {
      if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
        throw getCacheMissException(requestedStartTxn);
      } else if (requestedStartTxn > highestTxnId) {
        return 0;
      }
      outputBuffers.add(layoutHeader);
      Iterator<Map.Entry<Long, byte[]>> incrBuffIter =
          dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true)
              .entrySet().iterator();
      long prevTxn = requestedStartTxn;
      byte[] prevBuf = null;
      // Stop when maximum transactions reached...
      while ((txnCount < maxTxns) &&
          // ... or there are no more entries ...
          (incrBuffIter.hasNext() || prevBuf != null)) {
        long currTxn;
        byte[] currBuf;
        if (incrBuffIter.hasNext()) {
          Map.Entry<Long, byte[]> ent = incrBuffIter.next();
          currTxn = ent.getKey();
          currBuf = ent.getValue();
        } else {
          // This accounts for the trailing entry
          currTxn = highestTxnId + 1;
          currBuf = null;
        }
        if (prevBuf != null) { // True except for the first loop iteration
          outputBuffers.add(ByteBuffer.wrap(prevBuf));
          // if prevTxn < requestedStartTxn, the extra transactions will get
          // removed after the loop, so don't include them in the txn count
          txnCount += currTxn - Math.max(requestedStartTxn, prevTxn);
        }
        prevTxn = currTxn;
        prevBuf = currBuf;
      }
      // Release the lock before doing operations on the buffers (deserializing
      // to find transaction boundaries, and copying into an output buffer)
    }
    // Remove extra leading transactions in the first buffer
    ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header
    firstBuf.position(
        findTransactionPosition(firstBuf.array(), requestedStartTxn));
    // Remove trailing transactions in the last buffer if necessary
    if (txnCount > maxTxns) {
      ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1);
      int limit =
          findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns);
      lastBuf.limit(limit);
      txnCount = maxTxns;
    }

    return txnCount;
  }

  /**
   * Store a batch of serialized edits into this cache. Removes old batches
   * as necessary to keep the total size of the cache below the capacity.
   * See the class Javadoc for more info.
   *
   * This attempts to always handle malformed inputs gracefully rather than
   * throwing an exception, to allow the rest of the Journal's operations
   * to proceed normally.
   *
   * @param inputData A buffer containing edits in serialized form
   * @param newStartTxn The txn ID of the first edit in {@code inputData}
   * @param newEndTxn The txn ID of the last edit in {@code inputData}
   * @param newLayoutVersion The version of the layout used to serialize
   *                         the edits
   */
  void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn,
      int newLayoutVersion) {
    if (newStartTxn < 0 || newEndTxn < newStartTxn) {
      Journal.LOG.error(String.format("Attempted to cache data of length %d " +
          "with newStartTxn %d and newEndTxn %d",
          inputData.length, newStartTxn, newEndTxn));
      return;
    }
    try (AutoCloseableLock l = writeLock.acquire()) {
      if (newLayoutVersion != layoutVersion) {
        try {
          updateLayoutVersion(newLayoutVersion, newStartTxn);
        } catch (IOException ioe) {
          Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " +
              "due to exception when updating to new layout version %d",
              newStartTxn, newEndTxn, newLayoutVersion), ioe);
          return;
        }
      } else if (lowestTxnId == INVALID_TXN_ID) {
        Journal.LOG.info("Initializing edits cache starting from txn ID " +
            newStartTxn);
        initialize(newStartTxn);
      } else if (highestTxnId + 1 != newStartTxn) {
        // Cache is out of sync; clear to avoid storing noncontiguous regions
        Journal.LOG.error(String.format("Edits cache is out of sync; " +
            "looked for next txn id at %d but got start txn id for " +
            "cache put request at %d. Reinitializing at new request.",
            highestTxnId + 1, newStartTxn));
        initialize(newStartTxn);
      }

      while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) {
        Map.Entry<Long, byte[]> lowest = dataMap.firstEntry();
        dataMap.remove(lowest.getKey());
        totalSize -= lowest.getValue().length;
      }
      if (inputData.length > capacity) {
        initialize(INVALID_TXN_ID);
        Journal.LOG.warn(String.format("A single batch of edits was too " +
                "large to fit into the cache: startTxn = %d, endTxn = %d, " +
                "input length = %d. The cache size (%s) or cache fraction (%s) must be " +
                "increased for it to work properly (current capacity %d)." +
                "Cache is now empty.",
            newStartTxn, newEndTxn, inputData.length,
            DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
            DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, capacity));
        return;
      }
      if (dataMap.isEmpty()) {
        lowestTxnId = newStartTxn;
      } else {
        lowestTxnId = dataMap.firstKey();
      }

      dataMap.put(newStartTxn, inputData);
      highestTxnId = newEndTxn;
      totalSize += inputData.length;
    }
  }

  /**
   * Skip through a given stream of edits until the given transaction ID is
   * found. Return the number of bytes that appear prior to the given
   * transaction.
   *
   * @param buf A buffer containing a stream of serialized edits
   * @param txnId The transaction ID to search for
   * @return The number of bytes appearing in {@code buf} <i>before</i>
   *         the start of the transaction with ID {@code txnId}.
   */
  private int findTransactionPosition(byte[] buf, long txnId)
      throws IOException {
    ByteArrayInputStream bais = new ByteArrayInputStream(buf);
    FSEditLogLoader.PositionTrackingInputStream tracker =
        new FSEditLogLoader.PositionTrackingInputStream(bais);
    FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(
        new DataInputStream(tracker), tracker, layoutVersion);
    long previousPos = 0;
    while (reader.scanOp() < txnId) {
      previousPos = tracker.getPos();
    }
    // tracker is backed by a byte[]; position cannot go above an integer
    return (int) previousPos;
  }

  /**
   * Update the layout version of the cache. This clears out all existing
   * entries, and populates the new layout version and header for that version.
   *
   * @param newLayoutVersion The new layout version to be stored in the cache
   * @param newStartTxn The new lowest transaction in the cache
   */
  private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
      throws IOException {
    StringBuilder logMsg = new StringBuilder()
        .append("Updating edits cache to use layout version ")
        .append(newLayoutVersion)
        .append(" starting from txn ID ")
        .append(newStartTxn);
    if (layoutVersion != INVALID_LAYOUT_VERSION) {
      logMsg.append("; previous version was ").append(layoutVersion)
          .append("; old entries will be cleared.");
    }
    Journal.LOG.info(logMsg.toString());
    initialize(newStartTxn);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    EditLogFileOutputStream.writeHeader(newLayoutVersion,
        new DataOutputStream(baos));
    layoutVersion = newLayoutVersion;
    layoutHeader = ByteBuffer.wrap(baos.toByteArray());
  }

  /**
   * Initialize the cache back to a clear state.
   *
   * @param newInitialTxnId The new lowest transaction ID stored in the cache.
   *                        This should be {@value INVALID_TXN_ID} if the cache
   *                        is to remain empty at this time.
   */
  private void initialize(long newInitialTxnId) {
    dataMap.clear();
    totalSize = 0;
    initialTxnId = newInitialTxnId;
    lowestTxnId = initialTxnId;
    highestTxnId = INVALID_TXN_ID; // this will be set later
  }

  /**
   * Return the underlying data buffer used to store information about the
   * given transaction ID.
   *
   * @param txnId Transaction ID whose containing buffer should be fetched.
   * @return The data buffer for the transaction
   */
  @VisibleForTesting
  byte[] getRawDataForTests(long txnId) {
    try (AutoCloseableLock l = readLock.acquire()) {
      return dataMap.floorEntry(txnId).getValue();
    }
  }

  private CacheMissException getCacheMissException(long requestedTxnId) {
    if (lowestTxnId == INVALID_TXN_ID) {
      return new CacheMissException(0, "Cache is empty; either it was never " +
          "written to or the last write overflowed the cache capacity.");
    } else if (requestedTxnId < initialTxnId) {
      return new CacheMissException(initialTxnId - requestedTxnId,
          "Cache started at txn ID %d but requested txns starting at %d.",
          initialTxnId, requestedTxnId);
    } else {
      return new CacheMissException(lowestTxnId - requestedTxnId,
          "Oldest txn ID available in the cache is %d, but requested txns " +
              "starting at %d. The cache size (%s) or cache fraction (%s) may need to be " +
              "increased to hold more transactions (currently %d bytes containing %d " +
              "transactions)", lowestTxnId, requestedTxnId,
              DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
              DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, capacity,
          highestTxnId - lowestTxnId + 1);
    }
  }

  static class CacheMissException extends IOException {

    private static final long serialVersionUID = 0L;

    private final long cacheMissAmount;

    CacheMissException(long cacheMissAmount, String msgFormat,
        Object... msgArgs) {
      super(String.format(msgFormat, msgArgs));
      this.cacheMissAmount = cacheMissAmount;
    }

    long getCacheMissAmount() {
      return cacheMissAmount;
    }

  }

  @VisibleForTesting
  int getCapacity() {
    return capacity;
  }

}