AbstractDelegationIT.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth.delegation;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.lookupS3ADelegationToken;
/**
* superclass class for DT tests.
*/
public abstract class AbstractDelegationIT extends AbstractS3ATestBase {
protected static final String YARN_RM = "yarn-rm@EXAMPLE.COM";
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDelegationIT.class);
/**
* Look up a token from the submitted credentials.
* @param submittedCredentials credentials
* @param uri URI of the FS
* @param kind required kind of the token (which is asserted on)
* @return the token
* @throws IOException IO failure
*/
public static AbstractS3ATokenIdentifier lookupToken(
Credentials submittedCredentials,
URI uri,
Text kind) throws IOException {
final Token<AbstractS3ATokenIdentifier> token =
requireNonNull(
lookupS3ADelegationToken(submittedCredentials, uri),
"No Token for " + uri);
assertEquals(kind, token.getKind(), "Kind of token " + token);
AbstractS3ATokenIdentifier tid
= token.decodeIdentifier();
LOG.info("Found for URI {}, token {}", uri, tid);
return tid;
}
/**
* Create credentials with the DTs of the given FS.
* @param fs filesystem
* @return a non-empty set of credentials.
* @throws IOException failure to create.
*/
protected static Credentials mkTokens(final S3AFileSystem fs)
throws IOException {
Credentials cred = new Credentials();
fs.addDelegationTokens(AbstractDelegationIT.YARN_RM, cred);
return cred;
}
/**
* Create and Init an FS instance.
* @param uri URI
* @param conf config to use
* @return the instance
* @throws IOException failure to create/init
*/
protected static S3AFileSystem newS3AInstance(final URI uri,
final Configuration conf)
throws IOException {
S3AFileSystem fs = new S3AFileSystem();
fs.initialize(uri, conf);
return fs;
}
/**
* Assert that a filesystem is bound to a DT; that is: it is a delegate FS.
* @param fs filesystem
* @param tokenKind the kind of the token to require
*/
protected static void assertBoundToDT(final S3AFileSystem fs,
final Text tokenKind) {
final S3ADelegationTokens dtSupport = fs.getDelegationTokens().get();
assertTrue(dtSupport.isBoundToDT(),
"Expected bound to a delegation token: " + dtSupport);
assertEquals(tokenKind, dtSupport.getBoundDT().get().getKind(),
"Wrong token kind");
}
/**
* Assert that the number of tokens created by an FS matches the
* expected value.
* @param fs filesystem
* @param expected expected creation count.
*/
protected static void assertTokenCreationCount(final S3AFileSystem fs,
final int expected) {
assertEquals(expected, getTokenCreationCount(fs),
"DT creation count from " + fs.getDelegationTokens().get());
}
/**
* Get the token creation count of a filesystem.
* @param fs FS
* @return creation count
*/
private static int getTokenCreationCount(final S3AFileSystem fs) {
return fs.getDelegationTokens()
.map(S3ADelegationTokens::getCreationCount)
.get();
}
/**
* Patch the current config with the DT binding.
* @param conf configuration to patch
* @param binding binding to use
*/
protected void enableDelegationTokens(Configuration conf, String binding) {
removeBaseAndBucketOverrides(conf,
DELEGATION_TOKEN_BINDING);
LOG.info("Enabling delegation token support for {}", binding);
conf.set(DELEGATION_TOKEN_BINDING, binding);
}
/**
* Reset UGI info.
*/
protected void resetUGI() {
UserGroupInformation.reset();
}
/**
* Bind the provider list to the args supplied.
* At least one must be provided, to stop the default list being
* picked up.
* @param config configuration to patch.
* @param bucket bucket to clear.
* @param providerClassnames providers
*/
protected void bindProviderList(String bucket,
Configuration config,
String... providerClassnames) {
removeBaseAndBucketOverrides(bucket, config, AWS_CREDENTIALS_PROVIDER);
assertTrue(providerClassnames.length > 0, "No providers to bind to");
config.setStrings(AWS_CREDENTIALS_PROVIDER, providerClassnames);
}
/**
* Save a DT to a file.
* @param tokenFile destination file
* @param token token to save
* @throws IOException failure
*/
protected void saveDT(final File tokenFile, final Token<?> token)
throws IOException {
requireNonNull(token, "Null token");
Credentials cred = new Credentials();
cred.addToken(token.getService(), token);
try(DataOutputStream out = new DataOutputStream(
new FileOutputStream(tokenFile))) {
cred.writeTokenStorageToStream(out);
}
}
/**
* Create and init an S3a DT instance, but don't start it.
* @param conf conf to use
* @return a new instance
* @throws IOException IOE
*/
public S3ADelegationTokens instantiateDTSupport(Configuration conf)
throws IOException {
S3AFileSystem fs = getFileSystem();
S3ADelegationTokens tokens = new S3ADelegationTokens();
tokens.bindToFileSystem(
fs.getCanonicalUri(),
fs.createStoreContext(),
fs.createDelegationOperations());
tokens.init(conf);
return tokens;
}
}