QuorumJournalManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.qjournal.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;

/**
 * A JournalManager that writes to a set of remote JournalNodes,
 * requiring a quorum of nodes to ack each write.
 */
@InterfaceAudience.Private
public class QuorumJournalManager implements JournalManager {
  static final Logger LOG = LoggerFactory.getLogger(QuorumJournalManager.class);

  // This config is not publicly exposed
  public static final String QJM_RPC_MAX_TXNS_KEY =
      "dfs.ha.tail-edits.qjm.rpc.max-txns";
  public static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;

  // Maximum number of transactions to fetch at a time when using the
  // RPC edit fetch mechanism
  private final int maxTxnsPerRpc;
  // Whether or not in-progress tailing is enabled in the configuration
  private final boolean inProgressTailingEnabled;
  // Timeouts for which the QJM will wait for each of the following actions.
  private final int startSegmentTimeoutMs;
  private final int prepareRecoveryTimeoutMs;
  private final int acceptRecoveryTimeoutMs;
  private final int finalizeSegmentTimeoutMs;
  private final int selectInputStreamsTimeoutMs;
  private final int getJournalStateTimeoutMs;
  private final int newEpochTimeoutMs;
  private final int writeTxnsTimeoutMs;

  // This timeout is used for calls that don't occur during normal operation
  // e.g. format, upgrade operations and a few others. So we can use rather
  // lengthy timeouts by default.
  private final int timeoutMs;
  
  private final Configuration conf;
  private final URI uri;
  private final NamespaceInfo nsInfo;
  private final String nameServiceId;
  private boolean isActiveWriter;
  
  private final AsyncLoggerSet loggers;

  private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024;
  private int outputBufferCapacity;
  private final URLConnectionFactory connectionFactory;

  /** Limit logging about input stream selection to every 5 seconds max. */
  private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
  private final LogThrottlingHelper selectInputStreamLogHelper =
      new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);

  @VisibleForTesting
  public QuorumJournalManager(Configuration conf,
                              URI uri,
                              NamespaceInfo nsInfo) throws IOException {
    this(conf, uri, nsInfo, null, IPCLoggerChannel.FACTORY);
  }
  
  public QuorumJournalManager(Configuration conf,
      URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
    this(conf, uri, nsInfo, nameServiceId, IPCLoggerChannel.FACTORY);
  }

  @VisibleForTesting
  QuorumJournalManager(Configuration conf,
                       URI uri, NamespaceInfo nsInfo,
                       AsyncLogger.Factory loggerFactory) throws IOException {
    this(conf, uri, nsInfo, null, loggerFactory);

  }

  
  QuorumJournalManager(Configuration conf,
      URI uri, NamespaceInfo nsInfo, String nameServiceId,
      AsyncLogger.Factory loggerFactory) throws IOException {
    Preconditions.checkArgument(conf != null, "must be configured");

    this.conf = conf;
    this.uri = uri;
    this.nsInfo = nsInfo;
    this.nameServiceId = nameServiceId;
    this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));

    this.maxTxnsPerRpc =
        conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
    Preconditions.checkArgument(maxTxnsPerRpc > 0,
        "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
    this.inProgressTailingEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
    // Configure timeouts.
    this.startSegmentTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT);
    this.prepareRecoveryTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT);
    this.acceptRecoveryTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT);
    this.finalizeSegmentTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT);
    this.selectInputStreamsTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
    this.getJournalStateTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
    this.newEpochTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
    this.writeTxnsTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
    this.timeoutMs = (int) conf.getTimeDuration(DFSConfigKeys
            .DFS_QJM_OPERATIONS_TIMEOUT,
        DFSConfigKeys.DFS_QJM_OPERATIONS_TIMEOUT_DEFAULT, TimeUnit
            .MILLISECONDS);

    int connectTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_HTTP_OPEN_TIMEOUT_DEFAULT);
    int readTimeoutMs = conf.getInt(
        DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_KEY,
        DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT);
    this.connectionFactory = URLConnectionFactory
        .newDefaultURLConnectionFactory(connectTimeoutMs, readTimeoutMs, conf);
    setOutputBufferCapacity(OUTPUT_BUFFER_CAPACITY_DEFAULT);
  }
  
  protected List<AsyncLogger> createLoggers(
      AsyncLogger.Factory factory) throws IOException {
    return createLoggers(conf, uri, nsInfo, factory, nameServiceId);
  }

  static String parseJournalId(URI uri) {
    String path = uri.getPath();
    Preconditions.checkArgument(path != null && !path.isEmpty(),
        "Bad URI '%s': must identify journal in path component",
        uri);
    String journalId = path.substring(1);
    checkJournalId(journalId);
    return journalId;
  }
  
  public static void checkJournalId(String jid) {
    Preconditions.checkArgument(jid != null &&
        !jid.isEmpty() &&
        !jid.contains("/") &&
        !jid.startsWith("."),
        "bad journal id: " + jid);
  }

  
  /**
   * Fence any previous writers, and obtain a unique epoch number
   * for write-access to the journal nodes.
   *
   * @return the new, unique epoch number
   */
  Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
      throws IOException {
    Preconditions.checkState(!loggers.isEpochEstablished(),
        "epoch already created");
    
    Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
      loggers.waitForWriteQuorum(loggers.getJournalState(),
          getJournalStateTimeoutMs, "getJournalState()");
    
    long maxPromised = Long.MIN_VALUE;
    for (GetJournalStateResponseProto resp : lastPromises.values()) {
      maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
    }
    assert maxPromised >= 0;
    
    long myEpoch = maxPromised + 1;
    Map<AsyncLogger, NewEpochResponseProto> resps =
        loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
            newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
        
    loggers.setEpoch(myEpoch);
    return resps;
  }
  
  @Override
  public void format(NamespaceInfo nsInfo, boolean force) throws IOException {
    QuorumCall<AsyncLogger, Void> call = loggers.format(nsInfo, force);
    try {
      call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
          "format");
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for format() response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for format() response");
    }
    
    if (call.countExceptions() > 0) {
      call.rethrowException("Could not format one or more JournalNodes");
    }
  }

  @Override
  public boolean hasSomeData() throws IOException {
    QuorumCall<AsyncLogger, Boolean> call =
        loggers.isFormatted();

    try {
      call.waitFor(loggers.size(), 0, 0, timeoutMs, "hasSomeData");
    } catch (InterruptedException e) {
      throw new IOException("Interrupted while determining if JNs have data");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for response from loggers");
    }
    
    if (call.countExceptions() > 0) {
      call.rethrowException(
          "Unable to check if JNs are ready for formatting");
    }
    
    // If any of the loggers returned with a non-empty manifest, then
    // we should prompt for format.
    for (Boolean hasData : call.getResults().values()) {
      if (hasData) {
        return true;
      }
    }

    // Otherwise, none were formatted, we can safely format.
    return false;
  }

  /**
   * Run recovery/synchronization for a specific segment.
   * Postconditions:
   * <ul>
   * <li>This segment will be finalized on a majority
   * of nodes.</li>
   * <li>All nodes which contain the finalized segment will
   * agree on the length.</li>
   * </ul>
   * 
   * @param segmentTxId the starting txid of the segment
   * @throws IOException
   */
  private void recoverUnclosedSegment(long segmentTxId) throws IOException {
    Preconditions.checkArgument(segmentTxId > 0);
    LOG.info("Beginning recovery of unclosed segment starting at txid " +
        segmentTxId);
    
    // Step 1. Prepare recovery
    QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
        loggers.prepareRecovery(segmentTxId);
    Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
        loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
            "prepareRecovery(" + segmentTxId + ")");
    LOG.info("Recovery prepare phase complete. Responses:\n" +
        QuorumCall.mapToString(prepareResponses));

    // Determine the logger who either:
    // a) Has already accepted a previous proposal that's higher than any
    //    other
    //
    //  OR, if no such logger exists:
    //
    // b) Has the longest log starting at this transaction ID
    
    // TODO: we should collect any "ties" and pass the URL for all of them
    // when syncing, so we can tolerate failure during recovery better.
    Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
        prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE); 
    AsyncLogger bestLogger = bestEntry.getKey();
    PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
    
    // Log the above decision, check invariants.
    if (bestResponse.hasAcceptedInEpoch()) {
      LOG.info("Using already-accepted recovery for segment " +
          "starting at txid " + segmentTxId + ": " +
          bestEntry);
    } else if (bestResponse.hasSegmentState()) {
      LOG.info("Using longest log: " + bestEntry);
    } else {
      // None of the responses to prepareRecovery() had a segment at the given
      // txid. This can happen for example in the following situation:
      // - 3 JNs: JN1, JN2, JN3
      // - writer starts segment 101 on JN1, then crashes before
      //   writing to JN2 and JN3
      // - during newEpoch(), we saw the segment on JN1 and decide to
      //   recover segment 101
      // - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
      //   neither of which has any entry for this log.
      // In this case, it is allowed to do nothing for recovery, since the
      // segment wasn't started on a quorum of nodes.

      // Sanity check: we should only get here if none of the responses had
      // a log. This should be a postcondition of the recovery comparator,
      // but a bug in the comparator might cause us to get here.
      for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
        assert !resp.hasSegmentState() :
          "One of the loggers had a response, but no best logger " +
          "was found.";
      }

      LOG.info("None of the responders had a log to recover: " +
          QuorumCall.mapToString(prepareResponses));
      return;
    }
    
    SegmentStateProto logToSync = bestResponse.getSegmentState();
    assert segmentTxId == logToSync.getStartTxId();
    
    // Sanity check: none of the loggers should be aware of a higher
    // txid than the txid we intend to truncate to
    for (Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> e :
         prepareResponses.entrySet()) {
      AsyncLogger logger = e.getKey();
      PrepareRecoveryResponseProto resp = e.getValue();

      if (resp.hasLastCommittedTxId() &&
          resp.getLastCommittedTxId() > logToSync.getEndTxId()) {
        throw new AssertionError("Decided to synchronize log to " + logToSync +
            " but logger " + logger + " had seen txid " +
            resp.getLastCommittedTxId() + " committed");
      }
    }
    
    URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
    
    QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
    loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
        "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");

    // If one of the loggers above missed the synchronization step above, but
    // we send a finalize() here, that's OK. It validates the log before
    // finalizing. Hence, even if it is not "in sync", it won't incorrectly
    // finalize.
    QuorumCall<AsyncLogger, Void> finalize =
        loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); 
    loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
        String.format("finalizeLogSegment(%s-%s)",
            logToSync.getStartTxId(),
            logToSync.getEndTxId()));
  }
  
  static List<AsyncLogger> createLoggers(Configuration conf,
                                         URI uri,
                                         NamespaceInfo nsInfo,
                                         AsyncLogger.Factory factory,
                                         String nameServiceId)
      throws IOException {
    List<AsyncLogger> ret = Lists.newArrayList();
    List<InetSocketAddress> addrs = Util.getAddressesList(uri, conf);
    if (addrs.size() % 2 == 0) {
      LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
          "of Journal Nodes specified. This is not recommended!");
    }
    String jid = parseJournalId(uri);
    for (InetSocketAddress addr : addrs) {
      ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
    }
    return ret;
  }
  
  @Override
  public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
      throws IOException {
    Preconditions.checkState(isActiveWriter,
        "must recover segments before starting a new one");
    QuorumCall<AsyncLogger, Void> q = loggers.startLogSegment(txId,
        layoutVersion);
    loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
        "startLogSegment(" + txId + ")");
    return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
        writeTxnsTimeoutMs, layoutVersion);
  }

  @Override
  public void finalizeLogSegment(long firstTxId, long lastTxId)
      throws IOException {
    QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
        firstTxId, lastTxId);
    loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
        String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
  }

  @Override
  public void setOutputBufferCapacity(int size) {
    int ipcMaxDataLength = conf.getInt(
        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
    if (size >= ipcMaxDataLength) {
      throw new IllegalArgumentException("Attempted to use QJM output buffer "
          + "capacity (" + size + ") greater than the IPC max data length ("
          + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH + " = "
          + ipcMaxDataLength + "). This will cause journals to reject edits.");
    }
    outputBufferCapacity = size;
  }

  @Override
  public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
    // This purges asynchronously -- there's no need to wait for a quorum
    // here, because it's always OK to fail.
    LOG.info("Purging remote journals older than txid " + minTxIdToKeep);
    loggers.purgeLogsOlderThan(minTxIdToKeep);
  }

  @Override
  public void recoverUnfinalizedSegments() throws IOException {
    Preconditions.checkState(!isActiveWriter, "already active writer");
    
    LOG.info("Starting recovery process for unclosed journal segments...");
    Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
    LOG.info("Successfully started new epoch " + loggers.getEpoch());

    if (LOG.isDebugEnabled()) {
      LOG.debug("newEpoch({}) responses:\n{}", loggers.getEpoch(), QuorumCall.mapToString(resps));
    }

    long mostRecentSegmentTxId = Long.MIN_VALUE;
    for (NewEpochResponseProto r : resps.values()) {
      if (r.hasLastSegmentTxId()) {
        mostRecentSegmentTxId = Math.max(mostRecentSegmentTxId,
            r.getLastSegmentTxId());
      }
    }
    
    // On a completely fresh system, none of the journals have any
    // segments, so there's nothing to recover.
    if (mostRecentSegmentTxId != Long.MIN_VALUE) {
      recoverUnclosedSegment(mostRecentSegmentTxId);
    }
    isActiveWriter = true;
  }

  @Override
  public void close() throws IOException {
    loggers.close();
    connectionFactory.destroy();
  }

  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxnId, boolean inProgressOk) throws IOException {
    selectInputStreams(streams, fromTxnId, inProgressOk, false);
  }

  @Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxnId, boolean inProgressOk,
      boolean onlyDurableTxns) throws IOException {
    // Some calls will use inProgressOK to get in-progress edits even if
    // the cache used for RPC calls is not enabled; fall back to using the
    // streaming mechanism to serve such requests
    if (inProgressOk && inProgressTailingEnabled) {
      LOG.debug("Tailing edits starting from txn ID {} via RPC mechanism", fromTxnId);
      try {
        Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
        selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
        streams.addAll(rpcStreams);
        return;
      } catch (IOException ioe) {
        LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
            " via RPC; falling back to streaming.", ioe);
      }
    }
    selectStreamingInputStreams(streams, fromTxnId, inProgressOk,
        onlyDurableTxns);
  }

  /**
   * Select input streams from the journals, specifically using the RPC
   * mechanism optimized for low latency.
   *
   * @param streams The collection to store the return streams into.
   * @param fromTxnId Select edits starting from this transaction ID
   * @param onlyDurableTxns Iff true, only include transactions which have been
   *                        committed to a quorum of the journals.
   * @throws IOException Upon issues, including cache misses on the journals.
   */
  private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
      long fromTxnId, boolean onlyDurableTxns) throws IOException {
    QuorumCall<AsyncLogger, GetJournaledEditsResponseProto> q =
        loggers.getJournaledEdits(fromTxnId, maxTxnsPerRpc);
    Map<AsyncLogger, GetJournaledEditsResponseProto> responseMap =
        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
            "selectRpcInputStreams");
    assert responseMap.size() >= loggers.getMajoritySize() :
        "Quorum call returned without a majority";

    List<Integer> responseCounts = new ArrayList<>();
    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
      responseCounts.add(resp.getTxnCount());
    }
    Collections.sort(responseCounts);
    int highestTxnCount = responseCounts.get(responseCounts.size() - 1);
    if (LOG.isDebugEnabled() || highestTxnCount < 0) {
      StringBuilder msg = new StringBuilder("Requested edits starting from ");
      msg.append(fromTxnId).append("; got ").append(responseMap.size())
          .append(" responses: <");
      for (Map.Entry<AsyncLogger, GetJournaledEditsResponseProto> ent :
          responseMap.entrySet()) {
        msg.append("[").append(ent.getKey()).append(", ")
            .append(ent.getValue().getTxnCount()).append("],");
      }
      msg.append(">");
      if (highestTxnCount < 0) {
        throw new IOException("Did not get any valid JournaledEdits " +
            "responses: " + msg);
      } else {
        LOG.debug(msg.toString());
      }
    }
    // Cancel any outstanding calls to JN's.
    q.cancelCalls();

    int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
        responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
    if (maxAllowedTxns == 0) {
      LOG.debug("No new edits available in logs; requested starting from ID {}",
          fromTxnId);
      return;
    }
    LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
    if (logAction.shouldLog()) {
      LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
          "starting from lowest txn ID " + logAction.getStats(0).getMin() +
          LogThrottlingHelper.getLogSupressionMessage(logAction));
    }
    PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
        JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
      long endTxnId = fromTxnId - 1 +
          Math.min(maxAllowedTxns, resp.getTxnCount());
      allStreams.add(EditLogFileInputStream.fromByteString(
          resp.getEditLog(), fromTxnId, endTxnId, true));
    }
    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
  }

  /**
   * Select input streams from the journals, specifically using the streaming
   * mechanism optimized for resiliency / bulk load.
   */
  private void selectStreamingInputStreams(
      Collection<EditLogInputStream> streams, long fromTxnId,
      boolean inProgressOk, boolean onlyDurableTxns) throws IOException {
    QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
        loggers.getEditLogManifest(fromTxnId, inProgressOk);
    Map<AsyncLogger, RemoteEditLogManifest> resps =
        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
            "selectStreamingInputStreams");
    if (LOG.isDebugEnabled()) {
      LOG.debug("selectStreamingInputStream manifests:\n {}",
          Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
    }

    final PriorityQueue<EditLogInputStream> allStreams =
        new PriorityQueue<EditLogInputStream>(64,
            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
    for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
      AsyncLogger logger = e.getKey();
      RemoteEditLogManifest manifest = e.getValue();
      long committedTxnId = manifest.getCommittedTxnId();

      for (RemoteEditLog remoteLog : manifest.getLogs()) {
        URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());

        long endTxId = remoteLog.getEndTxId();

        // If it's bounded by durable Txns, endTxId could not be larger
        // than committedTxnId. This ensures the consistency.
        // We don't do the following for finalized log segments, since all
        // edits in those are guaranteed to be committed.
        if (onlyDurableTxns && inProgressOk && remoteLog.isInProgress()) {
          endTxId = Math.min(endTxId, committedTxnId);
          if (endTxId < remoteLog.getStartTxId()) {
            LOG.warn("Found endTxId (" + endTxId + ") that is less than " +
                "the startTxId (" + remoteLog.getStartTxId() +
                ") - setting it to startTxId.");
            endTxId = remoteLog.getStartTxId();
          }
        }

        EditLogInputStream elis = EditLogFileInputStream.fromUrl(
            connectionFactory, url, remoteLog.getStartTxId(),
            endTxId, remoteLog.isInProgress());
        allStreams.add(elis);
      }
    }
    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
  }
  
  @Override
  public String toString() {
    return "QJM to " + loggers;
  }

  @VisibleForTesting
  AsyncLoggerSet getLoggerSetForTests() {
    return loggers;
  }

  @Override
  public void doPreUpgrade() throws IOException {
    QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
    try {
      call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
          "doPreUpgrade");
      
      if (call.countExceptions() > 0) {
        call.rethrowException("Could not do pre-upgrade of one or more JournalNodes");
      }
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for doPreUpgrade() response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for doPreUpgrade() response");
    }
  }

  @Override
  public void doUpgrade(Storage storage) throws IOException {
    QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
    try {
      call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
          "doUpgrade");
      
      if (call.countExceptions() > 0) {
        call.rethrowException("Could not perform upgrade of one or more JournalNodes");
      }
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for doUpgrade() response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for doUpgrade() response");
    }
  }
  
  @Override
  public void doFinalize() throws IOException {
    QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
    try {
      call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
          "doFinalize");
      
      if (call.countExceptions() > 0) {
        call.rethrowException("Could not finalize one or more JournalNodes");
      }
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for doFinalize() response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for doFinalize() response");
    }
  }
  
  @Override
  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
      int targetLayoutVersion) throws IOException {
    QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
        prevStorage, targetLayoutVersion);
    try {
      call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
          "lockSharedStorage");
      
      if (call.countExceptions() > 0) {
        call.rethrowException("Could not check if roll back possible for"
            + " one or more JournalNodes");
      }
      
      // Either they all return the same thing or this call fails, so we can
      // just return the first result.
      try {
        DFSUtil.assertAllResultsEqual(call.getResults().values());
      } catch (AssertionError ae) {
        throw new IOException("Results differed for canRollBack", ae);
      }
      for (Boolean result : call.getResults().values()) {
        return result;
      }
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for lockSharedStorage() " +
          "response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for lockSharedStorage() " +
          "response");
    }
    
    throw new AssertionError("Unreachable code.");
  }

  @Override
  public void doRollback() throws IOException {
    QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
    try {
      call.waitFor(loggers.size(), loggers.size(), 0, timeoutMs,
          "doRollback");
      
      if (call.countExceptions() > 0) {
        call.rethrowException("Could not perform rollback of one or more JournalNodes");
      }
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for doFinalize() response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for doFinalize() response");
    }
  }
  
  @Override
  public void discardSegments(long startTxId) throws IOException {
    QuorumCall<AsyncLogger, Void> call = loggers.discardSegments(startTxId);
    try {
      call.waitFor(loggers.size(), loggers.size(), 0,
          timeoutMs, "discardSegments");
      if (call.countExceptions() > 0) {
        call.rethrowException(
            "Could not perform discardSegments of one or more JournalNodes");
      }
    } catch (InterruptedException e) {
      throw new IOException(
          "Interrupted waiting for discardSegments() response");
    } catch (TimeoutException e) {
      throw new IOException(
          "Timed out waiting for discardSegments() response");
    }
  }
  
  @Override
  public long getJournalCTime() throws IOException {
    QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
    try {
      call.waitFor(loggers.size(), loggers.size(), 0,
          timeoutMs, "getJournalCTime");
      
      if (call.countExceptions() > 0) {
        call.rethrowException("Could not journal CTime for one "
            + "more JournalNodes");
      }
      
      // Either they all return the same thing or this call fails, so we can
      // just return the first result.
      try {
        DFSUtil.assertAllResultsEqual(call.getResults().values());
      } catch (AssertionError ae) {
        throw new IOException("Results differed for getJournalCTime", ae);
      }
      for (Long result : call.getResults().values()) {
        return result;
      }
    } catch (InterruptedException e) {
      throw new IOException("Interrupted waiting for getJournalCTime() " +
          "response");
    } catch (TimeoutException e) {
      throw new IOException("Timed out waiting for getJournalCTime() " +
          "response");
    }
    
    throw new AssertionError("Unreachable code.");
  }
}