RouterDelegationTokenSecretManager.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.security;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.security.token.delegation.RouterDelegationTokenSupport;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Base64;
/**
* A Router specific delegation token secret manager.
* The secret manager is responsible for generating and accepting the password
* for each token.
*/
public class RouterDelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {
private static final Logger LOG = LoggerFactory
.getLogger(RouterDelegationTokenSecretManager.class);
private FederationStateStoreFacade federationFacade;
/**
* Create a Router Secret manager.
*
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
* new secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
* tokens in milliseconds
* @param delegationTokenRenewInterval how often the tokens must be renewed
* in milliseconds
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* @param conf Configuration.
*/
public RouterDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, Configuration conf) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.federationFacade = FederationStateStoreFacade.getInstance(conf);
}
@Override
public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier();
}
private boolean shouldIgnoreException(Exception e) {
return !running && e.getCause() instanceof InterruptedException;
}
/**
* The Router Supports Store the New Master Key.
* During this Process, Facade will call the specific StateStore to store the MasterKey.
*
* @param newKey DelegationKey
*/
@Override
public void storeNewMasterKey(DelegationKey newKey) {
try {
federationFacade.storeNewMasterKey(newKey);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in storing master key with KeyID: {}.", newKey.getKeyId());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router Supports Remove the master key.
* During this Process, Facade will call the specific StateStore to remove the MasterKey.
*
* @param delegationKey DelegationKey
*/
@Override
public void removeStoredMasterKey(DelegationKey delegationKey) {
try {
federationFacade.removeStoredMasterKey(delegationKey);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in removing master key with KeyID: {}.", delegationKey.getKeyId());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router Supports Store new Token.
*
* @param identifier RMDelegationToken
* @param renewDate renewDate
* @throws IOException IO exception occurred.
*/
@Override
public void storeNewToken(RMDelegationTokenIdentifier identifier,
long renewDate) throws IOException {
try {
federationFacade.storeNewToken(identifier, renewDate);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in storing RMDelegationToken with sequence number: {}.",
identifier.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router Supports Store new Token.
*
* @param identifier RMDelegationToken.
* @param tokenInfo DelegationTokenInformation.
*/
public void storeNewToken(RMDelegationTokenIdentifier identifier,
DelegationTokenInformation tokenInfo) {
try {
String token =
RouterDelegationTokenSupport.encodeDelegationTokenInformation(tokenInfo);
long renewDate = tokenInfo.getRenewDate();
federationFacade.storeNewToken(identifier, renewDate, token);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in storing RMDelegationToken with sequence number: {}.",
identifier.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router Supports Update Token.
*
* @param id RMDelegationToken
* @param renewDate renewDate
* @throws IOException IO exception occurred
*/
@Override
public void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) throws IOException {
try {
federationFacade.updateStoredToken(id, renewDate);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: {}.",
id.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router Supports Update Token.
*
* @param identifier RMDelegationToken.
* @param tokenInfo DelegationTokenInformation.
*/
public void updateStoredToken(RMDelegationTokenIdentifier identifier,
DelegationTokenInformation tokenInfo) {
try {
long renewDate = tokenInfo.getRenewDate();
String token = RouterDelegationTokenSupport.encodeDelegationTokenInformation(tokenInfo);
federationFacade.updateStoredToken(identifier, renewDate, token);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in updating persisted RMDelegationToken with sequence number: {}.",
identifier.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router Supports Remove Token.
*
* @param identifier Delegation Token
* @throws IOException IO exception occurred.
*/
@Override
public void removeStoredToken(RMDelegationTokenIdentifier identifier) throws IOException {
try {
federationFacade.removeStoredToken(identifier);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in removing RMDelegationToken with sequence number: {}",
identifier.getSequenceNumber());
ExitUtil.terminate(1, e);
}
}
}
/**
* The Router supports obtaining the DelegationKey stored in the Router StateStote
* according to the DelegationKey.
*
* @param key Param DelegationKey
* @return Delegation Token
* @throws YarnException An internal conversion error occurred when getting the Token
* @throws IOException IO exception occurred
*/
public DelegationKey getMasterKeyByDelegationKey(DelegationKey key)
throws YarnException, IOException {
try {
RouterMasterKeyResponse response = federationFacade.getMasterKeyByDelegationKey(key);
RouterMasterKey masterKey = response.getRouterMasterKey();
ByteBuffer keyByteBuf = masterKey.getKeyBytes();
byte[] keyBytes = new byte[keyByteBuf.remaining()];
keyByteBuf.get(keyBytes);
DelegationKey delegationKey =
new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
return delegationKey;
} catch (IOException ex) {
throw new IOException(ex);
} catch (YarnException ex) {
throw new YarnException(ex);
}
}
/**
* Get RMDelegationTokenIdentifier according to RouterStoreToken.
*
* @param identifier RMDelegationTokenIdentifier
* @return RMDelegationTokenIdentifier
* @throws YarnException An internal conversion error occurred when getting the Token
* @throws IOException IO exception occurred
*/
public RMDelegationTokenIdentifier getTokenByRouterStoreToken(
RMDelegationTokenIdentifier identifier) throws YarnException, IOException {
try {
RouterRMTokenResponse response = federationFacade.getTokenByRouterStoreToken(identifier);
YARNDelegationTokenIdentifier responseIdentifier =
response.getRouterStoreToken().getTokenIdentifier();
return (RMDelegationTokenIdentifier) responseIdentifier;
} catch (Exception ex) {
throw new YarnException(ex);
}
}
public void setFederationFacade(FederationStateStoreFacade federationFacade) {
this.federationFacade = federationFacade;
}
@Public
@VisibleForTesting
public int getLatestDTSequenceNumber() {
return delegationTokenSequenceNumber;
}
@Public
@VisibleForTesting
public synchronized Set<DelegationKey> getAllMasterKeys() {
return new HashSet<>(allKeys.values());
}
@Public
@VisibleForTesting
public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
Map<RMDelegationTokenIdentifier, Long> allTokens = new HashMap<>();
for (Map.Entry<RMDelegationTokenIdentifier,
DelegationTokenInformation> entry : currentTokens.entrySet()) {
RMDelegationTokenIdentifier keyIdentifier = entry.getKey();
DelegationTokenInformation tokenInformation = entry.getValue();
allTokens.put(keyIdentifier, tokenInformation.getRenewDate());
}
return allTokens;
}
public long getRenewDate(RMDelegationTokenIdentifier ident)
throws InvalidToken {
DelegationTokenInformation info = currentTokens.get(ident);
if (info == null) {
throw new InvalidToken("token (" + ident.toString()
+ ") can't be found in cache");
}
return info.getRenewDate();
}
@Override
protected synchronized int incrementDelegationTokenSeqNum() {
return federationFacade.incrementDelegationTokenSeqNum();
}
@Override
protected void storeToken(RMDelegationTokenIdentifier rmDelegationTokenIdentifier,
DelegationTokenInformation tokenInfo) throws IOException {
this.currentTokens.put(rmDelegationTokenIdentifier, tokenInfo);
this.addTokenForOwnerStats(rmDelegationTokenIdentifier);
storeNewToken(rmDelegationTokenIdentifier, tokenInfo);
}
@Override
protected void updateToken(RMDelegationTokenIdentifier rmDelegationTokenIdentifier,
DelegationTokenInformation tokenInfo) throws IOException {
this.currentTokens.put(rmDelegationTokenIdentifier, tokenInfo);
updateStoredToken(rmDelegationTokenIdentifier, tokenInfo);
}
@Override
protected DelegationTokenInformation getTokenInfo(
RMDelegationTokenIdentifier ident) {
// First check if I have this..
DelegationTokenInformation tokenInfo = currentTokens.get(ident);
if (tokenInfo == null) {
try {
RouterRMTokenResponse response = federationFacade.getTokenByRouterStoreToken(ident);
RouterStoreToken routerStoreToken = response.getRouterStoreToken();
String tokenStr = routerStoreToken.getTokenInfo();
byte[] tokenBytes = Base64.getUrlDecoder().decode(tokenStr);
tokenInfo = RouterDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
} catch (Exception e) {
LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber()
+ "] from StateStore.", e);
throw new YarnRuntimeException(e);
}
}
return tokenInfo;
}
@Override
protected synchronized int getDelegationTokenSeqNum() {
return federationFacade.getDelegationTokenSeqNum();
}
@Override
protected synchronized void setDelegationTokenSeqNum(int seqNum) {
federationFacade.setDelegationTokenSeqNum(seqNum);
}
@Override
protected synchronized int getCurrentKeyId() {
return federationFacade.getCurrentKeyId();
}
@Override
protected synchronized int incrementCurrentKeyId() {
return federationFacade.incrementCurrentKeyId();
}
}