TestBlockToken.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.security.token.block;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.DataOutput;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Calendar;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.Set;

import org.mockito.Mockito;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.TestWritable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslInputStream;
import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;

import org.apache.hadoop.fs.StorageType;
import org.slf4j.event.Level;

/** Unit tests for block tokens */
public class TestBlockToken {
  public static final Logger LOG =
      LoggerFactory.getLogger(TestBlockToken.class);
  private static final String ADDRESS = "0.0.0.0";

  static {
    GenericTestUtils.setLogLevel(Client.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(Server.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.TRACE);
  }

  /**
   * Directory where we can count our open file descriptors under Linux
   */
  static final File FD_DIR = new File("/proc/self/fd/");

  final long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins
  final long blockTokenLifetime = 2 * 60 * 1000; // 2 mins
  final ExtendedBlock block1 = new ExtendedBlock("0", 0L);
  final ExtendedBlock block2 = new ExtendedBlock("10", 10L);
  final ExtendedBlock block3 = new ExtendedBlock("-10", -108L);

  @BeforeEach
  public void disableKerberos() {
    Configuration conf = new Configuration();
    conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple");
    UserGroupInformation.setConfiguration(conf);
  }

  private static class GetLengthAnswer implements
      Answer<GetReplicaVisibleLengthResponseProto> {
    final BlockTokenSecretManager sm;
    final BlockTokenIdentifier ident;

    public GetLengthAnswer(BlockTokenSecretManager sm,
                           BlockTokenIdentifier ident) {
      this.sm = sm;
      this.ident = ident;
    }

    @Override
    public GetReplicaVisibleLengthResponseProto answer(
        InvocationOnMock invocation) throws IOException {
      Object args[] = invocation.getArguments();
      assertEquals(2, args.length);
      GetReplicaVisibleLengthRequestProto req =
          (GetReplicaVisibleLengthRequestProto) args[1];
      Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
          .getTokenIdentifiers();
      assertEquals(1, tokenIds.size(), "Only one BlockTokenIdentifier expected");
      long result = 0;
      for (TokenIdentifier tokenId : tokenIds) {
        BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
        LOG.info("Got: " + id.toString());
        assertTrue(ident.equals(id), "Received BlockTokenIdentifier is wrong");
        sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
            BlockTokenIdentifier.AccessMode.WRITE,
            new StorageType[]{StorageType.DEFAULT}, null);
        result = id.getBlockId();
      }
      return GetReplicaVisibleLengthResponseProto.newBuilder()
          .setLength(result).build();
    }
  }

  private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
      ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> accessModes,
      StorageType[] storageTypes, String[] storageIds)
      throws IOException {
    Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes,
        storageTypes, storageIds);
    BlockTokenIdentifier id = sm.createIdentifier();
    id.readFields(new DataInputStream(new ByteArrayInputStream(token
        .getIdentifier())));
    return id;
  }

  private void testWritable(boolean enableProtobuf) throws Exception {
    TestWritable.testWritable(new BlockTokenIdentifier());
    BlockTokenSecretManager sm = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        enableProtobuf);
    TestWritable.testWritable(generateTokenId(sm, block3,
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, null));
    TestWritable.testWritable(generateTokenId(sm, block3,
        EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
        new StorageType[]{StorageType.DEFAULT}, null));
    TestWritable.testWritable(generateTokenId(sm, block3,
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, null));
    TestWritable.testWritable(generateTokenId(sm, block1,
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, null));
    TestWritable.testWritable(generateTokenId(sm, block2,
        EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
        new StorageType[]{StorageType.DEFAULT}, null));
    TestWritable.testWritable(generateTokenId(sm, block3,
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, null));
    // We must be backwards compatible when adding storageType
    TestWritable.testWritable(generateTokenId(sm, block3,
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null));
    TestWritable.testWritable(generateTokenId(sm, block3,
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
        StorageType.EMPTY_ARRAY, null));
  }

  @Test
  public void testWritableLegacy() throws Exception {
    testWritable(false);
  }

  @Test
  public void testWritableProtobuf() throws Exception {
    testWritable(true);
  }

  private static void checkAccess(BlockTokenSecretManager m,
      Token<BlockTokenIdentifier> t, ExtendedBlock blk,
      BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes,
      String[] storageIds) throws IOException {
    if (storageIds == null) {
      // Test overloaded checkAccess method.
      m.checkAccess(t.decodeIdentifier(), null, blk, mode, storageTypes);

      if (storageTypes == null) {
        // Test overloaded checkAccess method.
        m.checkAccess(t, null, blk, mode);
      }
    }
    m.checkAccess(t, null, blk, mode, storageTypes, storageIds);
  }

  private void tokenGenerationAndVerification(BlockTokenSecretManager master,
      BlockTokenSecretManager slave, StorageType[] storageTypes,
      String[] storageIds) throws Exception {
    // single-mode tokens
    for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
        .values()) {
      // generated by master
      Token<BlockTokenIdentifier> token1 = master.generateToken(block1,
          EnumSet.of(mode), storageTypes, storageIds);
      checkAccess(master, token1, block1, mode, storageTypes, storageIds);
      checkAccess(slave, token1, block1, mode, storageTypes, storageIds);
      // generated by slave
      Token<BlockTokenIdentifier> token2 = slave.generateToken(block2,
          EnumSet.of(mode), storageTypes, storageIds);
      checkAccess(master, token2, block2, mode, storageTypes, storageIds);
      checkAccess(slave, token2, block2, mode, storageTypes, storageIds);
    }
    // multi-mode tokens
    Token<BlockTokenIdentifier> mtoken = master.generateToken(block3,
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
        storageTypes, storageIds);
    for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode
        .values()) {
      checkAccess(master, mtoken, block3, mode, storageTypes, storageIds);
      checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds);
    }
  }

  /** test block key and token handling */
  private void testBlockTokenSecretManager(boolean enableProtobuf)
      throws Exception {
    BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        enableProtobuf);
    BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
        enableProtobuf);
    ExportedBlockKeys keys = masterHandler.exportKeys();
    slaveHandler.addKeys(keys);
    tokenGenerationAndVerification(masterHandler, slaveHandler,
        new StorageType[]{StorageType.DEFAULT}, null);
    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
    // key updating
    masterHandler.updateKeys();
    tokenGenerationAndVerification(masterHandler, slaveHandler,
        new StorageType[]{StorageType.DEFAULT}, null);
    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
    keys = masterHandler.exportKeys();
    slaveHandler.addKeys(keys);
    tokenGenerationAndVerification(masterHandler, slaveHandler,
        new StorageType[]{StorageType.DEFAULT}, null);
    tokenGenerationAndVerification(masterHandler, slaveHandler, null, null);
  }

  @Test
  public void testBlockTokenSecretManagerLegacy() throws Exception {
    testBlockTokenSecretManager(false);
  }

  @Test
  public void testBlockTokenSecretManagerProtobuf() throws Exception {
    testBlockTokenSecretManager(true);
  }

  private static Server createMockDatanode(BlockTokenSecretManager sm,
      Token<BlockTokenIdentifier> token, Configuration conf)
      throws IOException, ServiceException {
    ClientDatanodeProtocolPB mockDN = mock(ClientDatanodeProtocolPB.class);

    BlockTokenIdentifier id = sm.createIdentifier();
    id.readFields(new DataInputStream(new ByteArrayInputStream(token
        .getIdentifier())));

    doAnswer(new GetLengthAnswer(sm, id)).when(mockDN)
        .getReplicaVisibleLength(any(), any());

    RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
        ProtobufRpcEngine2.class);
    BlockingService service = ClientDatanodeProtocolService
        .newReflectiveBlockingService(mockDN);
    return new RPC.Builder(conf).setProtocol(ClientDatanodeProtocolPB.class)
        .setInstance(service).setBindAddress(ADDRESS).setPort(0)
        .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();
  }

  private void testBlockTokenRpc(boolean enableProtobuf) throws Exception {
    Configuration conf = new Configuration();
    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
    UserGroupInformation.setConfiguration(conf);

    BlockTokenSecretManager sm = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        enableProtobuf);
    Token<BlockTokenIdentifier> token = sm.generateToken(block3,
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, new String[0]);

    final Server server = createMockDatanode(sm, token, conf);

    server.start();

    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
    final UserGroupInformation ticket = UserGroupInformation
        .createRemoteUser(block3.toString());
    ticket.addToken(token);

    ClientDatanodeProtocol proxy = null;
    try {
      proxy = DFSUtilClient.createClientDatanodeProtocolProxy(addr, ticket, conf,
          NetUtils.getDefaultSocketFactory(conf));
      assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
    } finally {
      server.stop();
      if (proxy != null) {
        RPC.stopProxy(proxy);
      }
    }
  }

  @Test
  public void testBlockTokenRpcLegacy() throws Exception {
    testBlockTokenRpc(false);
  }

  @Test
  public void testBlockTokenRpcProtobuf() throws Exception {
    testBlockTokenRpc(true);
  }

  /**
   * Test that fast repeated invocations of createClientDatanodeProtocolProxy
   * will not end up using up thousands of sockets. This is a regression test
   * for HDFS-1965.
   */
  private void testBlockTokenRpcLeak(boolean enableProtobuf) throws Exception {
    Configuration conf = new Configuration();
    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
    UserGroupInformation.setConfiguration(conf);

    assumeTrue(FD_DIR.exists());
    BlockTokenSecretManager sm = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        enableProtobuf);
    Token<BlockTokenIdentifier> token = sm.generateToken(block3,
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, new String[0]);

    final Server server = createMockDatanode(sm, token, conf);
    server.start();

    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
    DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort());

    ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
    LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
    fakeBlock.setBlockToken(token);

    // Create another RPC proxy with the same configuration - this will never
    // attempt to connect anywhere -- but it causes the refcount on the
    // RPC "Client" object to stay above 0 such that RPC.stopProxy doesn't
    // actually close the TCP connections to the real target DN.
    ClientDatanodeProtocol proxyToNoWhere = RPC.getProxy(
        ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID,
        new InetSocketAddress("1.1.1.1", 1),
        UserGroupInformation.createRemoteUser("junk"), conf,
        NetUtils.getDefaultSocketFactory(conf));

    ClientDatanodeProtocol proxy = null;

    int fdsAtStart = countOpenFileDescriptors();
    try {
      long endTime = Time.now() + 3000;
      while (Time.now() < endTime) {
        proxy = DFSUtilClient.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000,
            false, fakeBlock);
        assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
        if (proxy != null) {
          RPC.stopProxy(proxy);
        }
        LOG.info("Num open fds:" + countOpenFileDescriptors());
      }

      int fdsAtEnd = countOpenFileDescriptors();

      if (fdsAtEnd - fdsAtStart > 50) {
        fail("Leaked " + (fdsAtEnd - fdsAtStart) + " fds!");
      }
    } finally {
      server.stop();
    }

    RPC.stopProxy(proxyToNoWhere);
  }

  @Test
  public void testBlockTokenRpcLeakLegacy() throws Exception {
    testBlockTokenRpcLeak(false);
  }

  @Test
  public void testBlockTokenRpcLeakProtobuf() throws Exception {
    testBlockTokenRpcLeak(true);
  }

  /**
   * @return the current number of file descriptors open by this process.
   */
  private static int countOpenFileDescriptors() {
    return FD_DIR.list().length;
  }

  /**
   * Test {@link BlockPoolTokenSecretManager}
   */
  private void testBlockPoolTokenSecretManager(boolean enableProtobuf)
      throws Exception {
    BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager();

    // Test BlockPoolSecretManager with upto 10 block pools
    for (int i = 0; i < 10; i++) {
      String bpid = Integer.toString(i);
      BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
          blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
          enableProtobuf);
      BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
          blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null,
          enableProtobuf);
      bpMgr.addBlockPool(bpid, slaveHandler);

      ExportedBlockKeys keys = masterHandler.exportKeys();
      bpMgr.addKeys(bpid, keys, true);
      String[] storageIds = new String[] {"DS-9001"};
      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
          new StorageType[]{StorageType.DEFAULT}, storageIds);
      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
          null);
      // Test key updating
      masterHandler.updateKeys();
      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
          new StorageType[]{StorageType.DEFAULT}, storageIds);
      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
          null);
      keys = masterHandler.exportKeys();
      bpMgr.addKeys(bpid, keys, true);
      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid),
          new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"});
      tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null,
          null);
    }
  }

  @Test
  public void testBlockPoolTokenSecretManagerLegacy() throws Exception {
    testBlockPoolTokenSecretManager(false);
  }

  @Test
  public void testBlockPoolTokenSecretManagerProtobuf() throws Exception {
    testBlockPoolTokenSecretManager(true);
  }

  /**
   * This test writes a file and gets the block locations without closing the
   * file, and tests the block token in the last block. Block token is verified
   * by ensuring it is of correct kind.
   *
   * @throws IOException
   * @throws InterruptedException
   */
  private void testBlockTokenInLastLocatedBlock(boolean enableProtobuf)
      throws IOException, InterruptedException {
    Configuration conf = new HdfsConfiguration();
    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512);
    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_PROTOBUF_ENABLE,
        enableProtobuf);
    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(1).build();
    cluster.waitActive();

    try {
      FileSystem fs = cluster.getFileSystem();
      String fileName = "/testBlockTokenInLastLocatedBlock";
      Path filePath = new Path(fileName);
      FSDataOutputStream out = fs.create(filePath, (short) 1);
      out.write(new byte[1000]);
      // ensure that the first block is written out (see FSOutputSummer#flush)
      out.flush();
      LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
          fileName, 0, 1000);
      while (locatedBlocks.getLastLocatedBlock() == null) {
        Thread.sleep(100);
        locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(fileName, 0,
            1000);
      }
      Token<BlockTokenIdentifier> token = locatedBlocks.getLastLocatedBlock()
          .getBlockToken();
      assertEquals(BlockTokenIdentifier.KIND_NAME, token.getKind());
      out.close();
    } finally {
      cluster.shutdown();
    }
  }

  @Test
  public void testBlockTokenInLastLocatedBlockLegacy() throws IOException,
      InterruptedException {
    testBlockTokenInLastLocatedBlock(false);
  }

  @Test
  public void testBlockTokenInLastLocatedBlockProtobuf() throws IOException,
      InterruptedException {
    testBlockTokenInLastLocatedBlock(true);
  }

  @Test
  public void testLegacyBlockTokenBytesIsLegacy() throws IOException {
    final boolean useProto = false;
    BlockTokenSecretManager sm = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        useProto);
    Token<BlockTokenIdentifier> token = sm.generateToken(block1,
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DEFAULT}, new String[0]);
    final byte[] tokenBytes = token.getIdentifier();
    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
    BlockTokenIdentifier readToken = new BlockTokenIdentifier();

    DataInputBuffer dib = new DataInputBuffer();

    dib.reset(tokenBytes, tokenBytes.length);
    legacyToken.readFieldsLegacy(dib);

    boolean invalidProtobufMessage = false;
    try {
      dib.reset(tokenBytes, tokenBytes.length);
      protobufToken.readFieldsProtobuf(dib);
    } catch (IOException e) {
      invalidProtobufMessage = true;
    }
    assertTrue(invalidProtobufMessage);

    dib.reset(tokenBytes, tokenBytes.length);
    readToken.readFields(dib);

    // Using legacy, the token parses as a legacy block token and not a protobuf
    assertEquals(legacyToken, readToken);
    assertNotEquals(protobufToken, readToken);
  }

  @Test
  public void testEmptyLegacyBlockTokenBytesIsLegacy() throws IOException {
    BlockTokenIdentifier emptyIdent = new BlockTokenIdentifier();
    DataOutputBuffer dob = new DataOutputBuffer(4096);
    DataInputBuffer dib = new DataInputBuffer();

    emptyIdent.writeLegacy(dob);
    byte[] emptyIdentBytes = Arrays.copyOf(dob.getData(), dob.getLength());

    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
    BlockTokenIdentifier readToken = new BlockTokenIdentifier();

    dib.reset(emptyIdentBytes, emptyIdentBytes.length);
    legacyToken.readFieldsLegacy(dib);

    boolean invalidProtobufMessage = false;
    try {
      dib.reset(emptyIdentBytes, emptyIdentBytes.length);
      protobufToken.readFieldsProtobuf(dib);
    } catch (IOException e) {
      invalidProtobufMessage = true;
    }
    assertTrue(invalidProtobufMessage);

    dib.reset(emptyIdentBytes, emptyIdentBytes.length);
    readToken.readFields(dib);
  }

  /**
   * If the NameNode predates HDFS-6708 and HDFS-9807, then the LocatedBlocks
   * that it returns to the client will have block tokens that don't include
   * the storage types or storage IDs. Simulate this by setting the storage
   * type and storage ID to null to test backwards compatibility.
   */
  @Test
  public void testLegacyBlockTokenWithoutStorages() throws IOException,
          IllegalAccessException {
    BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
            "blockpool", 123,
            EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), null, null,
            false);
    FieldUtils.writeField(identifier, "storageTypes", null, true);
    FieldUtils.writeField(identifier, "storageIds", null, true);
    testCraftedBlockTokenIdentifier(identifier, false, false, false);
  }

  @Test
  public void testProtobufBlockTokenBytesIsProtobuf() throws IOException {
    final boolean useProto = true;
    BlockTokenSecretManager sm = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        useProto);
    Token<BlockTokenIdentifier> token = sm.generateToken(block1,
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class),
        StorageType.EMPTY_ARRAY, new String[0]);
    final byte[] tokenBytes = token.getIdentifier();
    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
    BlockTokenIdentifier readToken = new BlockTokenIdentifier();

    DataInputBuffer dib = new DataInputBuffer();

    /* We receive NegativeArraySizeException because we didn't call
     * readFields and instead try to parse this directly as a legacy
     * BlockTokenIdentifier.
     *
     * Note: because the parsing depends on the expiryDate which is based on
     * `Time.now()` it can sometimes fail with IOException and sometimes with
     * NegativeArraySizeException.
     */
    boolean invalidLegacyMessage = false;
    try {
      dib.reset(tokenBytes, tokenBytes.length);
      legacyToken.readFieldsLegacy(dib);
    } catch (IOException | NegativeArraySizeException e) {
      invalidLegacyMessage = true;
    }
    assertTrue(invalidLegacyMessage);

    dib.reset(tokenBytes, tokenBytes.length);
    protobufToken.readFieldsProtobuf(dib);

    dib.reset(tokenBytes, tokenBytes.length);
    readToken.readFields(dib);

    // Using protobuf, the token parses as a protobuf and not a legacy block
    // token
    assertNotEquals(legacyToken, readToken);
    assertEquals(protobufToken, readToken);
  }

  private void testCraftedBlockTokenIdentifier(
      BlockTokenIdentifier identifier, boolean expectIOE,
      boolean expectRTE, boolean isProtobuf) throws IOException {
    DataOutputBuffer dob = new DataOutputBuffer(4096);
    DataInputBuffer dib = new DataInputBuffer();

    if (isProtobuf) {
      identifier.writeProtobuf(dob);
    } else {
      identifier.writeLegacy(dob);
    }
    byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());

    BlockTokenIdentifier legacyToken = new BlockTokenIdentifier();
    BlockTokenIdentifier protobufToken = new BlockTokenIdentifier();
    BlockTokenIdentifier readToken = new BlockTokenIdentifier();

    boolean invalidLegacyMessage = false;
    try {
      dib.reset(identBytes, identBytes.length);
      legacyToken.readFieldsLegacy(dib);
    } catch (IOException e) {
      if (!expectIOE) {
        fail("Received IOException but it was not expected.");
      }
      invalidLegacyMessage = true;
    } catch (RuntimeException e) {
      if (!expectRTE) {
        fail("Received RuntimeException but it was not expected.");
      }
      invalidLegacyMessage = true;
    }

    if (isProtobuf) {
      assertTrue(invalidLegacyMessage);

      dib.reset(identBytes, identBytes.length);
      protobufToken.readFieldsProtobuf(dib);
      dib.reset(identBytes, identBytes.length);
      readToken.readFields(dib);
      assertEquals(identifier, readToken);
      assertEquals(protobufToken, readToken);
    }
  }

  @Test
  public void testEmptyProtobufBlockTokenBytesIsProtobuf() throws IOException {
    // Empty BlockTokenIdentifiers throw IOException
    BlockTokenIdentifier identifier = new BlockTokenIdentifier();
    testCraftedBlockTokenIdentifier(identifier, true, false, true);
  }

  @Test
  public void testCraftedProtobufBlockTokenBytesIsProtobuf() throws
      IOException {
    /* Parsing BlockTokenIdentifier with expiryDate
     * 2017-02-09 00:12:35,072+0100 will throw IOException.
     * However, expiryDate of
     * 2017-02-09 00:12:35,071+0100 will throw NegativeArraySizeException.
     */
    BlockTokenIdentifier identifier = new BlockTokenIdentifier("user",
        "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class),
        new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
        new String[] {"fake-storage-id"}, true);
    Calendar cal = new GregorianCalendar();
    cal.set(2017, 1, 9, 0, 12, 35);
    long datetime = cal.getTimeInMillis();
    datetime = ((datetime / 1000) * 1000); // strip milliseconds.
    datetime = datetime + 71; // 2017-02-09 00:12:35,071+0100
    identifier.setExpiryDate(datetime);
    testCraftedBlockTokenIdentifier(identifier, false, true, true);
    datetime += 1; // 2017-02-09 00:12:35,072+0100
    identifier.setExpiryDate(datetime);
    testCraftedBlockTokenIdentifier(identifier, true, false, true);
  }

  private BlockTokenIdentifier writeAndReadBlockToken(
      BlockTokenIdentifier identifier) throws IOException {
    DataOutputBuffer dob = new DataOutputBuffer(4096);
    DataInputBuffer dib = new DataInputBuffer();
    identifier.write(dob);
    byte[] identBytes = Arrays.copyOf(dob.getData(), dob.getLength());

    BlockTokenIdentifier readToken = new BlockTokenIdentifier();

    dib.reset(identBytes, identBytes.length);
    readToken.readFields(dib);
    assertEquals(identifier, readToken);
    return readToken;
  }

  @Test
  public void testEmptyBlockTokenSerialization() throws IOException {
    BlockTokenIdentifier ident = new BlockTokenIdentifier();
    BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
    assertEquals(ret.getExpiryDate(), 0);
    assertEquals(ret.getKeyId(), 0);
    assertEquals(ret.getUserId(), null);
    assertEquals(ret.getBlockPoolId(), null);
    assertEquals(ret.getBlockId(), 0);
    assertEquals(ret.getAccessModes(),
        EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class));
    assertArrayEquals(ret.getStorageTypes(), StorageType.EMPTY_ARRAY);
  }

  private void testBlockTokenSerialization(boolean useProto) throws
      IOException {
    EnumSet<BlockTokenIdentifier.AccessMode> accessModes =
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class);
    StorageType[] storageTypes =
        new StorageType[]{StorageType.RAM_DISK, StorageType.SSD,
            StorageType.DISK, StorageType.ARCHIVE, StorageType.NVDIMM};
    BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool",
        123, accessModes, storageTypes, new String[] {"fake-storage-id"},
        useProto);
    ident.setExpiryDate(1487080345L);
    BlockTokenIdentifier ret = writeAndReadBlockToken(ident);
    assertEquals(ret.getExpiryDate(), 1487080345L);
    assertEquals(ret.getKeyId(), 0);
    assertEquals(ret.getUserId(), "user");
    assertEquals(ret.getBlockPoolId(), "bpool");
    assertEquals(ret.getBlockId(), 123);
    assertEquals(ret.getAccessModes(),
        EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
    assertArrayEquals(ret.getStorageTypes(), storageTypes);
    assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"});
  }

  @Test
  public void testBlockTokenSerialization() throws IOException {
    testBlockTokenSerialization(false);
    testBlockTokenSerialization(true);
  }

  private void testBadStorageIDCheckAccess(boolean enableProtobuf)
      throws IOException {
    BlockTokenSecretManager sm = new BlockTokenSecretManager(
        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null,
        enableProtobuf);
    StorageType[] storageTypes = new StorageType[] {StorageType.DISK};
    String[] storageIds = new String[] {"fake-storage-id"};
    String[] badStorageIds = new String[] {"BAD-STORAGE-ID"};
    String[] emptyStorageIds = new String[] {};
    BlockTokenIdentifier.AccessMode mode = BlockTokenIdentifier.AccessMode.READ;
    BlockTokenIdentifier id = generateTokenId(sm, block3,
        EnumSet.of(mode), storageTypes, storageIds);
    sm.checkAccess(id, null, block3, mode, storageTypes, storageIds);

    try {
      sm.checkAccess(id, null, block3, mode, storageTypes, badStorageIds);
      fail("Expected strict BlockTokenSecretManager to fail");
    } catch(SecretManager.InvalidToken e) {
    }
    // We allow empty storageId tokens for backwards compatibility. i.e. old
    // clients may not have known to pass the storageId parameter to the
    // writeBlock api.
    sm.checkAccess(id, null, block3, mode, storageTypes,
        emptyStorageIds);
    sm.checkAccess(id, null, block3, mode, storageTypes,
        null);
    sm.checkAccess(id, null, block3, mode, storageTypes);
    sm.checkAccess(id, null, block3, mode);
  }

  @Test
  public void testBadStorageIDCheckAccess() throws IOException {
    testBadStorageIDCheckAccess(false);
    testBadStorageIDCheckAccess(true);
  }

  /**
   * Verify that block token serialNo is always within the range designated to
   * to the NameNode.
   */
  @Test
  public void testBlockTokenRanges() throws IOException {
    final int interval = 1024;
    final int numNNs = Integer.MAX_VALUE / interval;
    for(int nnIdx = 0; nnIdx < 64; nnIdx++) {
      BlockTokenSecretManager sm = new BlockTokenSecretManager(
          blockKeyUpdateInterval, blockTokenLifetime, nnIdx, numNNs,
          "fake-pool", null, false);
      int rangeStart = nnIdx * interval;
      for(int i = 0; i < interval * 3; i++) {
        int serialNo = sm.getSerialNoForTesting();
        assertTrue(serialNo >= rangeStart && serialNo < (rangeStart + interval),
            "serialNo " + serialNo + " is not in the designated range: [" + rangeStart
                + ", " + (rangeStart + interval) + ")");
        sm.updateKeys();
      }
    }
  }

  @Test
  public void testRetrievePasswordWithUnknownFields() throws IOException {
    BlockTokenIdentifier id = new BlockTokenIdentifier();
    BlockTokenIdentifier spyId = Mockito.spy(id);
    Mockito.doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        DataOutput out = (DataOutput) invocation.getArguments()[0];
        invocation.callRealMethod();
        // write something at the end that BlockTokenIdentifier#readFields()
        // will ignore, but which is still a part of the password
        out.write(7);
        return null;
      }
    }).when(spyId).write(Mockito.any());

    BlockTokenSecretManager sm =
        new BlockTokenSecretManager(blockKeyUpdateInterval, blockTokenLifetime,
            0, 1, "fake-pool", null, false);
    // master create password
    byte[] password = sm.createPassword(spyId);

    BlockTokenIdentifier slaveId = new BlockTokenIdentifier();
    slaveId.readFields(
        new DataInputStream(new ByteArrayInputStream(spyId.getBytes())));

    // slave retrieve password
    assertArrayEquals(password, sm.retrievePassword(slaveId));
  }

  @Test
  public void testRetrievePasswordWithRecognizableFieldsOnly()
      throws IOException {
    BlockTokenSecretManager sm =
        new BlockTokenSecretManager(blockKeyUpdateInterval, blockTokenLifetime,
            0, 1, "fake-pool", null, false);
    // master create password
    BlockTokenIdentifier masterId = new BlockTokenIdentifier();
    byte[] password = sm.createPassword(masterId);
    // set cache to null, so that master getBytes() were only recognizable bytes
    masterId.setExpiryDate(masterId.getExpiryDate());
    BlockTokenIdentifier slaveId = new BlockTokenIdentifier();
    slaveId.readFields(
        new DataInputStream(new ByteArrayInputStream(masterId.getBytes())));
    assertArrayEquals(password, sm.retrievePassword(slaveId));
  }

  /** Test for last in-progress block token expiry.
   * 1. Write file with one block which is in-progress.
   * 2. Open input stream and close the output stream.
   * 3. Wait for block token expiration and read the data.
   * 4. Read should be success.
   */
  @Test
  public void testLastLocatedBlockTokenExpiry()
      throws IOException, InterruptedException {
    Configuration conf = new Configuration();
    conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
        .numDataNodes(1).build()) {
      cluster.waitClusterUp();
      final NameNode nn = cluster.getNameNode();
      final BlockManager bm = nn.getNamesystem().getBlockManager();
      final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager();

      // set a short token lifetime (1 second)
      SecurityTestUtil.setBlockTokenLifetime(sm, 1000L);

      DistributedFileSystem fs = cluster.getFileSystem();
      Path p = new Path("/tmp/abc.log");
      FSDataOutputStream out = fs.create(p);
      byte[] data = "hello\n".getBytes(StandardCharsets.UTF_8);
      out.write(data);
      out.hflush();
      FSDataInputStream in = fs.open(p);
      out.close();

      // wait for last block token to expire
      Thread.sleep(2000L);

      byte[] readData = new byte[data.length];
      long startTime = System.currentTimeMillis();
      in.read(readData);
      // DFSInputStream#refetchLocations() minimum wait for 1sec to refetch
      // complete located blocks.
      assertTrue(1000L > (System.currentTimeMillis() - startTime),
          "Should not wait for refetch complete located blocks");
    }
  }
}