SpyQJournalUtil.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.qjournal.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

/**
 * One Util class to mock QJM for some UTs not in this package.
 */
public final class SpyQJournalUtil {

  private SpyQJournalUtil() {
  }

  /**
   * Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
   * @param conf input configuration.
   * @param uri input uri.
   * @param nsInfo input nameservice info.
   * @param nameServiceId input nameservice Id.
   * @return one mocked QuorumJournalManager.
   * @throws IOException throw IOException.
   */
  public static QuorumJournalManager createSpyingQJM(Configuration conf,
      URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
      @Override
      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
          String journalId, String nameServiceId, InetSocketAddress addr) {
        AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
            nameServiceId, addr) {
          protected ExecutorService createSingleThreadExecutor() {
            // Don't parallelize calls to the quorum in the tests.
            // This makes the tests more deterministic.
            return new DirectExecutorService();
          }
        };
        return Mockito.spy(logger);
      }
    };
    return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId, spyFactory);
  }

  /**
   * Mock Journals with different response for getJournaledEdits rpc with the input startTxid.
   * 1. First journal with one empty response.
   * 2. Second journal with one normal response.
   * 3. Third journal with one slow response.
   * @param manager input QuorumJournalManager.
   * @param startTxid input start txid.
   */
  public static void mockJNWithEmptyOrSlowResponse(QuorumJournalManager manager, long startTxid) {
    List<AsyncLogger> spies = manager.getLoggerSetForTests().getLoggersForTests();
    Semaphore semaphore = new Semaphore(0);

    // Mock JN0 return an empty response.
    Mockito.doAnswer(invocation -> {
      semaphore.release();
      return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
    }).when(spies.get(0))
        .getJournaledEdits(startTxid, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);

    // Mock JN1 return a normal response.
    spyGetJournaledEdits(spies, 1, startTxid, () -> semaphore.release(1));

    // Mock JN2 return a slow response
    spyGetJournaledEdits(spies, 2, startTxid, () -> semaphore.acquireUninterruptibly(2));
  }

  public static void spyGetJournaledEdits(List<AsyncLogger> spies,
      int jnSpyIdx, long fromTxId, Runnable preHook) {
    Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
      preHook.run();
      @SuppressWarnings("unchecked")
      ListenableFuture<GetJournaledEditsResponseProto> result =
          (ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
      return result;
    }).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
  }
}