MockNamenode.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation;

import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.protobuf.BlockingService;


/**
 * Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is
 * used by the Routers in a mock cluster.
 */
public class MockNamenode {

  private static final Logger LOG =
      LoggerFactory.getLogger(MockNamenode.class);


  /** Mock implementation of the Namenode. */
  private final NamenodeProtocols mockNn;

  /** Name service identifier (subcluster). */
  private String nsId;
  /** HA state of the Namenode. */
  private HAServiceState haState = HAServiceState.STANDBY;
  /** Datanodes registered in this Namenode. */
  private List<DatanodeInfo> dns = new ArrayList<>();

  /** RPC server of the Namenode that redirects calls to the mock. */
  private Server rpcServer;
  /** HTTP server of the Namenode that redirects calls to the mock. */
  private HttpServer2 httpServer;


  public MockNamenode(final String nsIdentifier) throws IOException {
    this(nsIdentifier, new HdfsConfiguration());
  }

  public MockNamenode(final String nsIdentifier, final Configuration conf)
      throws IOException {
    this.nsId = nsIdentifier;
    this.mockNn = mock(NamenodeProtocols.class);
    setupMock();
    setupRPCServer(conf);
    setupHTTPServer(conf);
  }

  /**
   * Setup the mock of the Namenode. It offers the basic functionality for
   * Routers to get the status.
   * @throws IOException If the mock cannot be setup.
   */
  protected void setupMock() throws IOException {
    NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1);
    when(mockNn.versionRequest()).thenReturn(nsInfo);

    when(mockNn.getServiceStatus()).
        thenAnswer((Answer<HAServiceStatus>) invocation -> {
          HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState());
          haStatus.setNotReadyToBecomeActive("");
          return haStatus;
        });
  }

  /**
   * Setup the RPC server of the Namenode that redirects calls to the mock.
   * @param conf Configuration of the server.
   * @throws IOException If the RPC server cannot be setup.
   */
  private void setupRPCServer(final Configuration conf) throws IOException {
    RPC.setProtocolEngine(
        conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class);
    ClientNamenodeProtocolServerSideTranslatorPB
        clientNNProtoXlator =
            new ClientNamenodeProtocolServerSideTranslatorPB(mockNn);
    BlockingService clientNNPbService =
        ClientNamenodeProtocol.newReflectiveBlockingService(
            clientNNProtoXlator);

    int numHandlers = conf.getInt(
        DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY,
        DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT);

    rpcServer = new RPC.Builder(conf)
        .setProtocol(ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress("0.0.0.0")
        .setPort(0)
        .setNumHandlers(numHandlers)
        .build();

    NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
        new NamenodeProtocolServerSideTranslatorPB(mockNn);
    BlockingService nnProtoPbService =
        NamenodeProtocolService.newReflectiveBlockingService(
            nnProtoXlator);
    DFSUtil.addInternalPBProtocol(
        conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);

    DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
        new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000);
    BlockingService dnProtoPbService =
        DatanodeProtocolService.newReflectiveBlockingService(
            dnProtoPbXlator);
    DFSUtil.addInternalPBProtocol(
        conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);

    HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
        new HAServiceProtocolServerSideTranslatorPB(mockNn);
    BlockingService haProtoPbService =
        HAServiceProtocolService.newReflectiveBlockingService(
            haServiceProtoXlator);
    DFSUtil.addInternalPBProtocol(
        conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);

    this.rpcServer.addTerseExceptions(
        RemoteException.class,
        SafeModeException.class,
        FileNotFoundException.class,
        FileAlreadyExistsException.class,
        AccessControlException.class,
        LeaseExpiredException.class,
        NotReplicatedYetException.class,
        IOException.class,
        ConnectException.class,
        StandbyException.class);

    rpcServer.start();
  }

  /**
   * Setup the HTTP server of the Namenode that redirects calls to the mock.
   * @param conf Configuration of the server.
   * @throws IOException If the HTTP server cannot be setup.
   */
  private void setupHTTPServer(Configuration conf) throws IOException {
    HttpServer2.Builder builder = new HttpServer2.Builder()
        .setName("hdfs")
        .setConf(conf)
        .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
        .addEndpoint(URI.create("http://0.0.0.0:0"));
    httpServer = builder.build();
    httpServer.start();
  }

  /**
   * Get the RPC port for the Mock Namenode.
   * @return RPC port.
   */
  public int getRPCPort() {
    return rpcServer.getListenerAddress().getPort();
  }

  /**
   * Get the HTTP port for the Mock Namenode.
   * @return HTTP port.
   */
  public int getHTTPPort() {
    return httpServer.getConnectorAddress(0).getPort();
  }

  /**
   * Get the Mock core. This is used to extend the mock.
   * @return Mock Namenode protocol to be extended.
   */
  public NamenodeProtocols getMock() {
    return mockNn;
  }

  /**
   * Get the name service id (subcluster) of the Mock Namenode.
   * @return Name service identifier.
   */
  public String getNameserviceId() {
    return nsId;
  }

  /**
   * Get the HA state of the Mock Namenode.
   * @return HA state (ACTIVE or STANDBY).
   */
  public HAServiceState getHAServiceState() {
    return haState;
  }

  /**
   * Show the Mock Namenode as Active.
   */
  public void transitionToActive() {
    this.haState = HAServiceState.ACTIVE;
  }

  /**
   * Show the Mock Namenode as Standby.
   */
  public void transitionToStandby() {
    this.haState = HAServiceState.STANDBY;
  }

  /**
   * Get the datanodes that this NN will return.
   * @return The datanodes that this NN will return.
   */
  public List<DatanodeInfo> getDatanodes() {
    return this.dns;
  }

  /**
   * Stop the Mock Namenode. It stops all the servers.
   * @throws Exception If it cannot stop the Namenode.
   */
  public void stop() throws Exception {
    if (rpcServer != null) {
      rpcServer.stop();
      rpcServer = null;
    }
    if (httpServer != null) {
      httpServer.stop();
      httpServer = null;
    }
  }

  /**
   * Add the mock for the FileSystem calls in ClientProtocol.
   * @throws IOException If it cannot be setup.
   */
  public void addFileSystemMock() throws IOException {
    final SortedMap<String, String> fs =
        new ConcurrentSkipListMap<String, String>();

    DirectoryListing l = mockNn.getListing(anyString(), any(), anyBoolean());
    when(l).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      LOG.info("{} getListing({})", nsId, src);
      if (fs.get(src) == null) {
        throw new FileNotFoundException("File does not exist " + src);
      }
      if (!src.endsWith("/")) {
        src += "/";
      }
      Map<String, String> files =
          fs.subMap(src, src + Character.MAX_VALUE);
      List<HdfsFileStatus> list = new ArrayList<>();
      for (String file : files.keySet()) {
        if (file.substring(src.length()).indexOf('/') < 0) {
          HdfsFileStatus fileStatus =
              getMockHdfsFileStatus(file, fs.get(file));
          list.add(fileStatus);
        }
      }
      HdfsFileStatus[] array = list.toArray(
          new HdfsFileStatus[list.size()]);
      return new DirectoryListing(array, 0);
    });
    when(mockNn.getFileInfo(anyString())).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      LOG.info("{} getFileInfo({})", nsId, src);
      return getMockHdfsFileStatus(src, fs.get(src));
    });
    HdfsFileStatus c = mockNn.create(anyString(), any(), anyString(), any(),
        anyBoolean(), anyShort(), anyLong(), any(), any(), any());
    when(c).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      LOG.info("{} create({})", nsId, src);
      boolean createParent = (boolean)invocation.getArgument(4);
      if (createParent) {
        Path path = new Path(src).getParent();
        while (!path.isRoot()) {
          LOG.info("{} create parent {}", nsId, path);
          fs.put(path.toString(), "DIRECTORY");
          path = path.getParent();
        }
      }
      fs.put(src, "FILE");
      return getMockHdfsFileStatus(src, "FILE");
    });
    LocatedBlocks b = mockNn.getBlockLocations(
        anyString(), anyLong(), anyLong());
    when(b).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      LOG.info("{} getBlockLocations({})", nsId, src);
      if (!fs.containsKey(src)) {
        LOG.error("{} cannot find {} for getBlockLocations", nsId, src);
        throw new FileNotFoundException("File does not exist " + src);
      }
      return mock(LocatedBlocks.class);
    });
    boolean f = mockNn.complete(anyString(), anyString(), any(), anyLong());
    when(f).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      if (!fs.containsKey(src)) {
        LOG.error("{} cannot find {} for complete", nsId, src);
        throw new FileNotFoundException("File does not exist " + src);
      }
      return true;
    });
    LocatedBlock a = mockNn.addBlock(
        anyString(), anyString(), any(), any(), anyLong(), any(), any());
    when(a).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      if (!fs.containsKey(src)) {
        LOG.error("{} cannot find {} for addBlock", nsId, src);
        throw new FileNotFoundException("File does not exist " + src);
      }
      return getMockLocatedBlock(nsId);
    });
    boolean m = mockNn.mkdirs(anyString(), any(), anyBoolean());
    when(m).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      LOG.info("{} mkdirs({})", nsId, src);
      boolean createParent = (boolean)invocation.getArgument(2);
      if (createParent) {
        Path path = new Path(src).getParent();
        while (!path.isRoot()) {
          LOG.info("{} mkdir parent {}", nsId, path);
          fs.put(path.toString(), "DIRECTORY");
          path = path.getParent();
        }
      }
      fs.put(src, "DIRECTORY");
      return true;
    });
    when(mockNn.getServerDefaults()).thenAnswer(invocation -> {
      LOG.info("{} getServerDefaults", nsId);
      FsServerDefaults defaults = mock(FsServerDefaults.class);
      when(defaults.getChecksumType()).thenReturn(
          Type.valueOf(DataChecksum.CHECKSUM_CRC32));
      when(defaults.getKeyProviderUri()).thenReturn(nsId);
      return defaults;
    });
    when(mockNn.getContentSummary(anyString())).thenAnswer(invocation -> {
      String src = getSrc(invocation);
      LOG.info("{} getContentSummary({})", nsId, src);
      if (fs.get(src) == null) {
        throw new FileNotFoundException("File does not exist " + src);
      }
      if (!src.endsWith("/")) {
        src += "/";
      }
      Map<String, String> files =
          fs.subMap(src, src + Character.MAX_VALUE);
      int numFiles = 0;
      int numDirs = 0;
      int length = 0;
      for (Entry<String, String> entry : files.entrySet()) {
        String file = entry.getKey();
        if (file.substring(src.length()).indexOf('/') < 0) {
          String type = entry.getValue();
          if ("DIRECTORY".equals(type)) {
            numDirs++;
          } else if ("FILE".equals(type)) {
            numFiles++;
            length += 100;
          }
        }
      }
      return new ContentSummary.Builder()
          .fileCount(numFiles)
          .directoryCount(numDirs)
          .length(length)
          .erasureCodingPolicy("")
          .build();
    });
  }

  /**
   * Add datanode related operations.
   * @throws IOException If it cannot be setup.
   */
  public void addDatanodeMock() throws IOException {
    when(mockNn.getDatanodeReport(any(DatanodeReportType.class))).thenAnswer(
        invocation -> {
          LOG.info("{} getDatanodeReport()", nsId, invocation.getArgument(0));
          return dns.toArray();
        });
    when(mockNn.getDatanodeStorageReport(any(DatanodeReportType.class)))
        .thenAnswer(invocation -> {
          LOG.info("{} getDatanodeStorageReport()",
              nsId, invocation.getArgument(0));
          DatanodeStorageReport[] ret = new DatanodeStorageReport[dns.size()];
          for (int i = 0; i < dns.size(); i++) {
            DatanodeInfo dn = dns.get(i);
            DatanodeStorage storage = new DatanodeStorage(dn.getName());
            StorageReport[] storageReports = new StorageReport[] {
                new StorageReport(storage, false, 0L, 0L, 0L, 0L, 0L)
            };
            ret[i] = new DatanodeStorageReport(dn, storageReports);
          }
          return ret;
        });
  }

  private static String getSrc(InvocationOnMock invocation) {
    return (String) invocation.getArguments()[0];
  }

  /**
   * Get a mock HDFS file status.
   * @param filename Name of the file.
   * @param type Type of the file (FILE, DIRECTORY, or null).
   * @return HDFS file status
   */
  private static HdfsFileStatus getMockHdfsFileStatus(
      final String filename, final String type) {
    if (type == null) {
      return null;
    }
    HdfsFileStatus fileStatus = mock(HdfsFileStatus.class);
    when(fileStatus.getLocalNameInBytes()).thenReturn(filename.getBytes());
    when(fileStatus.getPermission()).thenReturn(mock(FsPermission.class));
    when(fileStatus.getOwner()).thenReturn("owner");
    when(fileStatus.getGroup()).thenReturn("group");
    if (type.equals("FILE")) {
      when(fileStatus.getLen()).thenReturn(100L);
      when(fileStatus.getReplication()).thenReturn((short) 1);
      when(fileStatus.getBlockSize()).thenReturn(
          HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
    } else if (type.equals("DIRECTORY")) {
      when(fileStatus.isDir()).thenReturn(true);
      when(fileStatus.isDirectory()).thenReturn(true);
    }
    return fileStatus;
  }

  /**
   * Get a mock located block pointing to one of the subclusters. It is
   * allocated in a fake Datanode.
   * @param nsId Name service identifier (subcluster).
   * @return Mock located block.
   */
  private static LocatedBlock getMockLocatedBlock(final String nsId) {
    LocatedBlock lb = mock(LocatedBlock.class);
    when(lb.getCachedLocations()).thenReturn(DatanodeInfo.EMPTY_ARRAY);
    DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
        1111, 1112, 1113, 1114);
    DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
    DatanodeInfoWithStorage datanodeInfoWithStorage =
        new DatanodeInfoWithStorage(dnInfo, "storageID", StorageType.DEFAULT);
    when(lb.getLocations())
        .thenReturn(new DatanodeInfoWithStorage[] {datanodeInfoWithStorage});
    ExtendedBlock eb = mock(ExtendedBlock.class);
    when(eb.getBlockPoolId()).thenReturn(nsId);
    when(lb.getBlock()).thenReturn(eb);
    @SuppressWarnings("unchecked")
    Token<BlockTokenIdentifier> tok = mock(Token.class);
    when(tok.getIdentifier()).thenReturn(nsId.getBytes());
    when(tok.getPassword()).thenReturn(nsId.getBytes());
    when(tok.getKind()).thenReturn(new Text(nsId));
    when(tok.getService()).thenReturn(new Text(nsId));
    when(lb.getBlockToken()).thenReturn(tok);
    return lb;
  }

  /**
   * Register a set of NameNodes in a Router.
   * @param router Router to register to.
   * @param namenodes Set of NameNodes.
   * @throws IOException If it cannot register them.
   */
  public static void registerSubclusters(Router router,
      Collection<MockNamenode> namenodes) throws IOException {
    registerSubclusters(singletonList(router), namenodes, emptySet());
  }

  /**
   * Register a set of NameNodes in a set of Routers.
   * @param routers Set of Routers.
   * @param namenodes Set of NameNodes.
   * @param unavailableSubclusters Set of unavailable subclusters.
   * @throws IOException If it cannot register them.
   */
  public static void registerSubclusters(List<Router> routers,
      Collection<MockNamenode> namenodes,
      Set<String> unavailableSubclusters) throws IOException {

    for (final Router router : routers) {
      MembershipNamenodeResolver resolver =
          (MembershipNamenodeResolver) router.getNamenodeResolver();
      for (final MockNamenode nn : namenodes) {
        String nsId = nn.getNameserviceId();
        String rpcAddress = "localhost:" + nn.getRPCPort();
        String httpAddress = "localhost:" + nn.getHTTPPort();
        String scheme = "http";
        NamenodeStatusReport report = new NamenodeStatusReport(
            nsId, null, rpcAddress, rpcAddress,
            rpcAddress, scheme, httpAddress);
        if (unavailableSubclusters.contains(nsId)) {
          LOG.info("Register {} as UNAVAILABLE", nsId);
          report.setRegistrationValid(false);
        } else {
          LOG.info("Register {} as ACTIVE", nsId);
          report.setRegistrationValid(true);
        }
        report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
        resolver.registerNamenode(report);
      }
      resolver.loadCache(true);
    }
  }
}