ITestAbfsDelegationTokens.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.DtUtilShell;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP;
import static org.apache.hadoop.test.LambdaTestUtils.doAs;
/**
* Test custom DT support in ABFS.
* This brings up a mini KDC in class setup/teardown, as the FS checks
* for that when it enables security.
*
* Much of this code is copied from
* {@code org.apache.hadoop.fs.s3a.auth.delegation.AbstractDelegationIT}
*/
public class ITestAbfsDelegationTokens extends AbstractAbfsIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(
ITestAbfsDelegationTokens.class);
/**
* Created in static {@link #setupCluster()} call.
*/
@SuppressWarnings("StaticNonFinalField")
private static KerberizedAbfsCluster cluster;
private UserGroupInformation aliceUser;
/***
* Set up the clusters.
*/
@BeforeClass
public static void setupCluster() throws Exception {
resetUGI();
cluster = new KerberizedAbfsCluster();
cluster.init(new Configuration());
cluster.start();
}
/**
* Tear down the Cluster.
*/
@SuppressWarnings("ThrowableNotThrown")
@AfterClass
public static void teardownCluster() throws Exception {
resetUGI();
ServiceOperations.stopQuietly(LOG, cluster);
}
public ITestAbfsDelegationTokens() throws Exception {
}
@Override
public void setup() throws Exception {
// create the FS
Configuration conf = getRawConfiguration();
cluster.bindConfToCluster(conf);
conf.setBoolean(HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
false);
resetUGI();
UserGroupInformation.setConfiguration(conf);
aliceUser = cluster.createAliceUser();
assertSecurityEnabled();
// log in as alice so that filesystems belong to that user
UserGroupInformation.setLoginUser(aliceUser);
StubDelegationTokenManager.useStubDTManager(conf);
FileSystem.closeAllForUGI(UserGroupInformation.getLoginUser());
super.setup();
assertNotNull("No StubDelegationTokenManager created in filesystem init",
getStubDTManager());
}
protected StubDelegationTokenManager getStubDTManager() throws IOException {
return (StubDelegationTokenManager) getDelegationTokenManager().getTokenManager();
}
/**
* Cleanup removes cached filesystems and the last instance of the
* StubDT manager.
*/
@Override
public void teardown() throws Exception {
// clean up all of alice's instances.
FileSystem.closeAllForUGI(UserGroupInformation.getLoginUser());
super.teardown();
}
/**
* General assertion that security is turred on for a cluster.
*/
public static void assertSecurityEnabled() {
assertTrue("Security is needed for this test",
UserGroupInformation.isSecurityEnabled());
}
/**
* Reset UGI info.
*/
protected static void resetUGI() {
UserGroupInformation.reset();
}
/**
* Create credentials with the DTs of the given FS.
* @param fs filesystem
* @return a non-empty set of credentials.
* @throws IOException failure to create.
*/
protected static Credentials mkTokens(final FileSystem fs)
throws IOException {
Credentials cred = new Credentials();
fs.addDelegationTokens("rm/rm1@EXAMPLE.COM", cred);
return cred;
}
@Test
public void testTokenManagerBinding() throws Throwable {
StubDelegationTokenManager instance
= getStubDTManager();
assertNotNull("No StubDelegationTokenManager created in filesystem init",
instance);
assertTrue("token manager not initialized: " + instance,
instance.isInitialized());
}
/**
* When bound to a custom DT manager, it provides the service name.
* The stub returns the URI by default.
*/
@Test
public void testCanonicalization() throws Throwable {
String service = getCanonicalServiceName();
assertNotNull("No canonical service name from filesystem " + getFileSystem(),
service);
assertEquals("canonical URI and service name mismatch",
getFilesystemURI(), new URI(service));
}
protected URI getFilesystemURI() throws IOException {
return getFileSystem().getUri();
}
protected String getCanonicalServiceName() throws IOException {
return getFileSystem().getCanonicalServiceName();
}
/**
* Checks here to catch any regressions in canonicalization
* logic.
*/
@Test
public void testDefaultCanonicalization() throws Throwable {
FileSystem fs = getFileSystem();
clearTokenServiceName();
assertEquals("canonicalServiceName is not the default",
getDefaultServiceName(fs), getCanonicalServiceName());
}
protected String getDefaultServiceName(final FileSystem fs) {
return SecurityUtil.buildDTServiceName(fs.getUri(), 0);
}
protected void clearTokenServiceName() throws IOException {
getStubDTManager().setCanonicalServiceName(null);
}
/**
* Request a token; this tests the collection workflow.
*/
@Test
public void testRequestToken() throws Throwable {
AzureBlobFileSystem fs = getFileSystem();
Credentials credentials = mkTokens(fs);
assertEquals("Number of collected tokens", 1,
credentials.numberOfTokens());
verifyCredentialsContainsToken(credentials, fs);
}
/**
* Request a token; this tests the collection workflow.
*/
@Test
public void testRequestTokenDefault() throws Throwable {
clearTokenServiceName();
AzureBlobFileSystem fs = getFileSystem();
assertEquals("canonicalServiceName is not the default",
getDefaultServiceName(fs), fs.getCanonicalServiceName());
Credentials credentials = mkTokens(fs);
assertEquals("Number of collected tokens", 1,
credentials.numberOfTokens());
verifyCredentialsContainsToken(credentials,
getDefaultServiceName(fs), getFilesystemURI().toString());
}
public void verifyCredentialsContainsToken(final Credentials credentials,
FileSystem fs) throws IOException {
verifyCredentialsContainsToken(credentials,
fs.getCanonicalServiceName(),
fs.getUri().toString());
}
/**
* Verify that the set of credentials contains a token for the given
* canonical service name, and that it is of the given kind.
* @param credentials set of credentials
* @param serviceName canonical service name for lookup.
* @param tokenService service kind; also expected in string value.
* @return the retrieved token.
* @throws IOException IO failure
*/
public StubAbfsTokenIdentifier verifyCredentialsContainsToken(
final Credentials credentials,
final String serviceName,
final String tokenService) throws IOException {
Token<? extends TokenIdentifier> token = credentials.getToken(
new Text(serviceName));
assertEquals("Token Kind in " + token,
StubAbfsTokenIdentifier.TOKEN_KIND, token.getKind());
assertEquals("Token Service Kind in " + token,
tokenService, token.getService().toString());
StubAbfsTokenIdentifier abfsId = (StubAbfsTokenIdentifier)
token.decodeIdentifier();
LOG.info("Created token {}", abfsId);
assertEquals("token URI in " + abfsId,
tokenService, abfsId.getUri().toString());
return abfsId;
}
/**
* This mimics the DT collection performed inside FileInputFormat to
* collect DTs for a job.
* @throws Throwable on failure.
*/
@Test
public void testJobsCollectTokens() throws Throwable {
// get tokens for all the required FileSystems..
AzureBlobFileSystem fs = getFileSystem();
Credentials credentials = new Credentials();
Path root = fs.makeQualified(new Path("/"));
Path[] paths = {root};
Configuration conf = fs.getConf();
TokenCache.obtainTokensForNamenodes(credentials,
paths,
conf);
verifyCredentialsContainsToken(credentials, fs);
}
/**
* Run the DT Util command.
* @param expected expected outcome
* @param conf configuration for the command (hence: FS to create)
* @param args other arguments
* @return the output of the command.
*/
protected String dtutil(final int expected,
final Configuration conf,
final String... args) throws Exception {
final ByteArrayOutputStream dtUtilContent = new ByteArrayOutputStream();
DtUtilShell dt = new DtUtilShell();
dt.setOut(new PrintStream(dtUtilContent));
dtUtilContent.reset();
int r = doAs(aliceUser,
() -> ToolRunner.run(conf, dt, args));
String s = dtUtilContent.toString();
LOG.info("\n{}", s);
assertEquals("Exit code from command dtutil "
+ StringUtils.join(" ", args) + " with output " + s,
expected, r);
return s;
}
/**
* Verify the dtutil shell command can fetch tokens
*/
@Test
public void testDTUtilShell() throws Throwable {
File tokenfile = cluster.createTempTokenFile();
String tfs = tokenfile.toString();
String fsURI = getFileSystem().getUri().toString();
dtutil(0, getRawConfiguration(),
"get", fsURI,
"-format", "protobuf",
tfs);
assertTrue("not created: " + tokenfile,
tokenfile.exists());
assertTrue("File is empty " + tokenfile,
tokenfile.length() > 0);
assertTrue("File only contains header " + tokenfile,
tokenfile.length() > 6);
String printed = dtutil(0, getRawConfiguration(), "print", tfs);
assertTrue("no " + fsURI + " in " + printed,
printed.contains(fsURI));
assertTrue("no " + StubAbfsTokenIdentifier.ID + " in " + printed,
printed.contains(StubAbfsTokenIdentifier.ID));
}
/**
* Creates a new FS instance with the simplest binding lifecycle;
* get a token.
* This verifies the classic binding mechanism works.
*/
@Test
public void testBaseDTLifecycle() throws Throwable {
Configuration conf = new Configuration(getRawConfiguration());
ClassicDelegationTokenManager.useClassicDTManager(conf);
try (FileSystem fs = FileSystem.newInstance(getFilesystemURI(), conf)) {
Credentials credentials = mkTokens(fs);
assertEquals("Number of collected tokens", 1,
credentials.numberOfTokens());
verifyCredentialsContainsToken(credentials,
fs.getCanonicalServiceName(),
ClassicDelegationTokenManager.UNSET);
}
}
}