TestWebHdfsTokens.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.web;

import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.*;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.security.token.Token;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestWebHdfsTokens {
  private static final String PREFIX = "hadoop.http.authentication.";
  private static Configuration conf;
  URI uri = null;

  //secure cluster
  private static MiniKdc kdc = null;
  private static File baseDir;
  private static File keytabFile;
  private static String username = "webhdfs-tokens-test";
  private static String principal;
  private static String keystoresDir;
  private static String sslConfDir;

  @BeforeAll
  public static void setUp() {
    conf = new Configuration();
  }

  @AfterAll
  public static void destroy() throws Exception {
    if (kdc != null) {
      kdc.stop();
      FileUtil.fullyDelete(baseDir);
      KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
    }
  }

  private static void initEnv(){
    SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.setLoginUser(
        UserGroupInformation.createUserForTesting(
            "LoginUser", new String[]{"supergroup"}));
  }

  private static void initSecureConf(Configuration secureConf)
      throws Exception {

    baseDir = GenericTestUtils.getTestDir(
        TestWebHdfsTokens.class.getSimpleName());
    FileUtil.fullyDelete(baseDir);
    assertTrue(baseDir.mkdirs());

    Properties kdcConf = MiniKdc.createConf();
    kdc = new MiniKdc(kdcConf, baseDir);
    kdc.start();

    SecurityUtil.setAuthenticationMethod(
        UserGroupInformation.AuthenticationMethod.KERBEROS, secureConf);
    UserGroupInformation.setConfiguration(secureConf);
    KerberosName.resetDefaultRealm();
    assertTrue(UserGroupInformation.isSecurityEnabled(),
        "Expected secureConfiguration to enable security");

    keytabFile = new File(baseDir, username + ".keytab");
    String keytab = keytabFile.getAbsolutePath();
    // Windows will not reverse name lookup "127.0.0.1" to "localhost".
    String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
    principal = username + "/" + krbInstance + "@" + kdc.getRealm();
    String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
    kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
        "HTTP/" + krbInstance);

    secureConf.set(HttpServer2.FILTER_INITIALIZER_PROPERTY,
        AuthenticationFilterInitializer.class.getName());
    secureConf.set(PREFIX + "type", "kerberos");
    secureConf.set(PREFIX + "kerberos.keytab", keytab);
    secureConf.set(PREFIX + "kerberos.principal", spnegoPrincipal);
    secureConf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
    secureConf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
    secureConf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
    secureConf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
    secureConf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
        spnegoPrincipal);
    secureConf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
    secureConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
    secureConf.set(DFS_HTTP_POLICY_KEY,
        HttpConfig.Policy.HTTP_AND_HTTPS.name());
    secureConf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
    secureConf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY, "localhost:0");
    secureConf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
    secureConf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:0");
    secureConf.setBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
    secureConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);

    keystoresDir = baseDir.getAbsolutePath();
    sslConfDir = KeyStoreTestUtil.getClasspathDir(TestWebHdfsTokens.class);
    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir,
        secureConf, false);

    secureConf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
        KeyStoreTestUtil.getClientSSLConfigFileName());
    secureConf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
        KeyStoreTestUtil.getServerSSLConfigFileName());
  }



  private WebHdfsFileSystem spyWebhdfsInSecureSetup() throws IOException {
    WebHdfsFileSystem fsOrig = new WebHdfsFileSystem();
    fsOrig.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
    WebHdfsFileSystem fs = spy(fsOrig);
    return fs;
  }

  @Test
  @Timeout(value = 5)
  public void testTokenForNonTokenOp() throws IOException {
    initEnv();
    WebHdfsFileSystem fs = spyWebhdfsInSecureSetup();
    Token<?> token = mock(Token.class);
    doReturn(token).when(fs).getDelegationToken(null);

    // should get/set/renew token
    fs.toUrl(GetOpParam.Op.OPEN, null);
    verify(fs).getDelegationToken();
    verify(fs).getDelegationToken(null);
    verify(fs).setDelegationToken(token);
    reset(fs);

    // should return prior token
    fs.toUrl(GetOpParam.Op.OPEN, null);
    verify(fs).getDelegationToken();
    verify(fs, never()).getDelegationToken(null);
    verify(fs, never()).setDelegationToken(token);
  }

  @Test
  @Timeout(value = 5)
  public void testNoTokenForGetToken() throws IOException {
    initEnv();
    checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
  }

  @Test
  @Timeout(value = 5)
  public void testNoTokenForRenewToken() throws IOException {
    initEnv();
    checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
  }

  @Test
  @Timeout(value = 5)
  public void testNoTokenForCancelToken() throws IOException {
    initEnv();
    checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
  }

  private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
    WebHdfsFileSystem fs = spyWebhdfsInSecureSetup();
    doReturn(null).when(fs).getDelegationToken(null);
    fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);

    // do not get a token!
    fs.toUrl(op, null);
    verify(fs, never()).getDelegationToken();
    verify(fs, never()).getDelegationToken(null);
    verify(fs, never()).setDelegationToken(any());
  }

  @Test
  @Timeout(value = 10)
  public void testGetOpRequireAuth() {
    for (HttpOpParam.Op op : GetOpParam.Op.values()) {
      boolean expect = (op == GetOpParam.Op.GETDELEGATIONTOKEN);
      assertEquals(expect, op.getRequireAuth());
    }
  }

  @Test
  @Timeout(value = 10)
  public void testPutOpRequireAuth() {
    for (HttpOpParam.Op op : PutOpParam.Op.values()) {
      boolean expect = (op == PutOpParam.Op.RENEWDELEGATIONTOKEN || op == PutOpParam.Op.CANCELDELEGATIONTOKEN);
      assertEquals(expect, op.getRequireAuth());
    }
  }

  @Test
  @Timeout(value = 10)
  public void testPostOpRequireAuth() {
    for (HttpOpParam.Op op : PostOpParam.Op.values()) {
      assertFalse(op.getRequireAuth());
    }
  }

  @Test
  @Timeout(value = 10)
  public void testDeleteOpRequireAuth() {
    for (HttpOpParam.Op op : DeleteOpParam.Op.values()) {
      assertFalse(op.getRequireAuth());
    }
  }
  
  @Test
  public void testLazyTokenFetchForWebhdfs() throws Exception {
    MiniDFSCluster cluster = null;
    UserGroupInformation ugi = null;
    try {
      final Configuration clusterConf = new HdfsConfiguration(conf);
      initSecureConf(clusterConf);

      cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
      cluster.waitActive();

      ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
          principal, keytabFile.getAbsolutePath());

      //test with swebhdfs
      uri = DFSUtil.createUri(
          "swebhdfs", cluster.getNameNode().getHttpsAddress());
      validateLazyTokenFetch(ugi, clusterConf);

      //test with webhdfs
      uri = DFSUtil.createUri(
          "webhdfs", cluster.getNameNode().getHttpAddress());
      validateLazyTokenFetch(ugi, clusterConf);

    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }

      // Reset UGI so that other tests are not affected.
      UserGroupInformation.reset();
      UserGroupInformation.setConfiguration(new Configuration());
    }
  }


  @Test
  public void testSetTokenServiceAndKind() throws Exception {
    initEnv();
    MiniDFSCluster cluster = null;

    try {
      final Configuration clusterConf = new HdfsConfiguration(conf);
      SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
      clusterConf.setBoolean(DFSConfigKeys
              .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

      // trick the NN into thinking s[ecurity is enabled w/o it trying
      // to login from a keytab
      UserGroupInformation.setConfiguration(clusterConf);
      cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(0).build();
      cluster.waitActive();
      SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
      final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem
              (clusterConf, "webhdfs");
      Whitebox.setInternalState(fs, "canRefreshDelegationToken", true);

      URLConnectionFactory factory = new URLConnectionFactory(new ConnectionConfigurator() {
        @Override
        public HttpURLConnection configure(HttpURLConnection conn)
                throws IOException {
          return conn;
        }
      }) {
        @Override
        public URLConnection openConnection(URL url) throws IOException {
          return super.openConnection(new URL(url + "&service=foo&kind=bar"));
        }
      };
      Whitebox.setInternalState(fs, "connectionFactory", factory);
      Token<?> token1 = fs.getDelegationToken();
      assertEquals(new Text("bar"), token1.getKind());

      final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
      Token<DelegationTokenIdentifier> token2 =
          fs.new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
              op, null, new RenewerParam(null)) {
            @Override
            Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
                throws IOException {
              return WebHdfsTestUtil.convertJsonToDelegationToken(json);
            }
          }.run();

      assertEquals(new Text("bar"), token2.getKind());
      assertEquals(new Text("foo"), token2.getService());
    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }
  
  private void validateLazyTokenFetch(UserGroupInformation ugi,
      final Configuration clusterConf) throws Exception {

    String testUser = ugi.getShortUserName();

    WebHdfsFileSystem fs = ugi.doAs(
        new PrivilegedExceptionAction<WebHdfsFileSystem>() {
        @Override
        public WebHdfsFileSystem run() throws IOException {
          return spy((WebHdfsFileSystem) FileSystem.newInstance(uri,
              clusterConf));
        }
      });

    // verify token ops don't get a token
    assertNull(fs.getRenewToken());
    Token<?> token = fs.getDelegationToken(null);
    fs.renewDelegationToken(token);
    fs.cancelDelegationToken(token);
    verify(fs, never()).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).setDelegationToken(any());
    assertNull(fs.getRenewToken());
    reset(fs);

    // verify first non-token op gets a token
    final Path p = new Path("/f");
    fs.create(p, (short)1).close();
    verify(fs, times(1)).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, times(1)).getDelegationToken(any());
    verify(fs, times(1)).setDelegationToken(any());
    token = fs.getRenewToken();
    assertNotNull(token);
    assertEquals(testUser, getTokenOwner(token));
    assertEquals(fs.getTokenKind(), token.getKind());
    reset(fs);

    // verify prior token is reused
    fs.getFileStatus(p);
    verify(fs, times(1)).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, never()).setDelegationToken(any());
    Token<?> token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertSame(token, token2);
    reset(fs);

    // verify renew of expired token fails w/o getting a new token
    token = fs.getRenewToken();
    fs.cancelDelegationToken(token);
    try {
      fs.renewDelegationToken(token);
      fail("should have failed");
    } catch (InvalidToken it) {
    } catch (Exception ex) {
      fail("wrong exception:"+ex);
    }
    verify(fs, never()).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, never()).setDelegationToken(any());
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertSame(token, token2);
    reset(fs);

    // verify cancel of expired token fails w/o getting a new token
    try {
      fs.cancelDelegationToken(token);
      fail("should have failed");
    } catch (InvalidToken it) {
    } catch (Exception ex) {
      fail("wrong exception:"+ex);
    }
    verify(fs, never()).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, never()).setDelegationToken(any());
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertSame(token, token2);
    reset(fs);

    // verify an expired token is replaced with a new token
    InputStream is = fs.open(p);
    is.read();
    is.close();
    verify(fs, times(3)).getDelegationToken(); // first bad, then good
    verify(fs, times(1)).replaceExpiredDelegationToken();
    verify(fs, times(1)).getDelegationToken(null);
    verify(fs, times(1)).setDelegationToken(any());
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertNotSame(token, token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertEquals(testUser, getTokenOwner(token2));
    reset(fs);

    // verify with open because it's a little different in how it
    // opens connections
    fs.cancelDelegationToken(fs.getRenewToken());
    is = fs.open(p);
    is.read();
    is.close();
    verify(fs, times(3)).getDelegationToken(); // first bad, then good
    verify(fs, times(1)).replaceExpiredDelegationToken();
    verify(fs, times(1)).getDelegationToken(null);
    verify(fs, times(1)).setDelegationToken(any());
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertNotSame(token, token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertEquals(testUser, getTokenOwner(token2));
    reset(fs);

    // verify fs close cancels the token
    fs.close();
    verify(fs, never()).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, never()).setDelegationToken(any());
    verify(fs, times(1)).cancelDelegationToken(eq(token2));

    // add a token to ugi for a new fs, verify it uses that token
    token = fs.getDelegationToken(null);
    ugi.addToken(token);
    fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
      @Override
      public WebHdfsFileSystem run() throws IOException {
        return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf));
      }
    });
    assertNull(fs.getRenewToken());
    fs.getFileStatus(new Path("/"));
    verify(fs, times(1)).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, times(1)).setDelegationToken(eq(token));
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertSame(token, token2);
    reset(fs);

    // verify it reuses the prior ugi token
    fs.getFileStatus(new Path("/"));
    verify(fs, times(1)).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, never()).setDelegationToken(any());
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertSame(token, token2);
    reset(fs);

    // verify an expired ugi token is not replaced with a new token if the
    // ugi does not contain a new token
    fs.cancelDelegationToken(token);
    for (int i=0; i<2; i++) {
      try {
        fs.getFileStatus(new Path("/"));
        fail("didn't fail");
      } catch (InvalidToken it) {
      } catch (Exception ex) {
        fail("wrong exception:"+ex);
      }
      verify(fs, times(1)).getDelegationToken();
      verify(fs, times(1)).replaceExpiredDelegationToken();
      verify(fs, never()).getDelegationToken(anyString());
      verify(fs, never()).setDelegationToken(any());
      token2 = fs.getRenewToken();
      assertNotNull(token2);
      assertEquals(fs.getTokenKind(), token.getKind());
      assertSame(token, token2);
      reset(fs);
    }
    
    // verify an expired ugi token is replaced with a new token
    // if one is available in UGI
    token = fs.getDelegationToken(null);
    ugi.addToken(token);
    reset(fs);
    fs.getFileStatus(new Path("/"));
    verify(fs, times(2)).getDelegationToken(); // first bad, then good
    verify(fs, times(1)).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, times(1)).setDelegationToken(eq(token));
    token2 = fs.getRenewToken();
    assertNotNull(token2);
    assertEquals(fs.getTokenKind(), token.getKind());
    assertSame(token, token2);
    reset(fs);

    // verify fs close does NOT cancel the ugi token
    fs.close();
    verify(fs, never()).getDelegationToken();
    verify(fs, never()).replaceExpiredDelegationToken();
    verify(fs, never()).getDelegationToken(anyString());
    verify(fs, never()).setDelegationToken(any());
    verify(fs, never()).cancelDelegationToken(any(Token.class));
  } 
  
  private String getTokenOwner(Token<?> token) throws IOException {
    // webhdfs doesn't register properly with the class loader
    @SuppressWarnings({ "rawtypes", "unchecked" })
    Token<?> clone = new Token(token);
    clone.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
    return clone.decodeIdentifier().getUser().getUserName();
  }
}