FederationStateStoreUtils.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.federation.store.utils;

import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Base64;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zaxxer.hikari.HikariDataSource;

/**
 * Common utility methods used by the store implementations.
 */
public final class FederationStateStoreUtils {

  public static final Logger LOG =
      LoggerFactory.getLogger(FederationStateStoreUtils.class);

  public final static String FEDERATION_STORE_URL = "url";

  private FederationStateStoreUtils() {
  }

  /**
   * Returns the SQL <code>FederationStateStore</code> connections to the pool.
   *
   * @param log the logger interface
   * @param cstmt the interface used to execute SQL stored procedures
   * @param conn the SQL connection
   * @param rs the ResultSet interface used to execute SQL stored procedures
   * @throws YarnException on failure
   */
  public static void returnToPool(Logger log, CallableStatement cstmt,
      Connection conn, ResultSet rs) throws YarnException {
    if (cstmt != null) {
      try {
        cstmt.close();
      } catch (SQLException e) {
        logAndThrowException(log, "Exception while trying to close Statement",
            e);
      }
    }

    if (conn != null) {
      try {
        conn.close();
        FederationStateStoreClientMetrics.decrConnections();
      } catch (SQLException e) {
        logAndThrowException(log, "Exception while trying to close Connection",
            e);
      }
    }

    if (rs != null) {
      try {
        rs.close();
      } catch (SQLException e) {
        logAndThrowException(log, "Exception while trying to close ResultSet",
            e);
      }
    }
  }

  /**
   * Returns the SQL <code>FederationStateStore</code> connections to the pool.
   *
   * @param log the logger interface
   * @param cstmt the interface used to execute SQL stored procedures
   * @param conn the SQL connection
   * @throws YarnException on failure
   */
  public static void returnToPool(Logger log, CallableStatement cstmt,
      Connection conn) throws YarnException {
    returnToPool(log, cstmt, conn, null);
  }

  /**
   * Returns the SQL <code>FederationStateStore</code> connections to the pool.
   *
   * @param log the logger interface
   * @param cstmt the interface used to execute SQL stored procedures
   * @throws YarnException on failure
   */
  public static void returnToPool(Logger log, CallableStatement cstmt)
      throws YarnException {
    returnToPool(log, cstmt, null);
  }

  /**
   * Throws an exception due to an error in <code>FederationStateStore</code>.
   *
   * @param log the logger interface
   * @param errMsg the error message
   * @param t the throwable raised in the called class.
   * @throws YarnException on failure
   */
  public static void logAndThrowException(Logger log, String errMsg,
      Throwable t) throws YarnException {
    if (t != null) {
      log.error(errMsg, t);
      throw new YarnException(errMsg, t);
    } else {
      log.error(errMsg);
      throw new YarnException(errMsg);
    }
  }

  /**
   * Throws an <code>FederationStateStoreException</code> due to an error in
   * <code>FederationStateStore</code>.
   *
   * @param log the logger interface
   * @param errMsg the error message
   * @throws YarnException on failure
   */
  public static void logAndThrowStoreException(Logger log, String errMsg)
      throws YarnException {
    log.error(errMsg);
    throw new FederationStateStoreException(errMsg);
  }

  /**
   * Throws an <code>FederationStateStoreException</code> due to an error in
   * <code>FederationStateStore</code>.
   *
   * @param log the logger interface
   * @param errMsgFormat the error message format string.
   * @param args referenced by the format specifiers in the format string.
   * @throws YarnException on failure
   */
  public static void logAndThrowStoreException(Logger log, String errMsgFormat, Object... args)
      throws YarnException {
    String errMsg = String.format(errMsgFormat, args);
    log.error(errMsg);
    throw new FederationStateStoreException(errMsg);
  }


  /**
   * Throws an <code>FederationStateStoreException</code> due to an error in
   * <code>FederationStateStore</code>.
   *
   * @param t the throwable raised in the called class.
   * @param log the logger interface.
   * @param errMsgFormat the error message format string.
   * @param args referenced by the format specifiers in the format string.
   * @throws YarnException on failure
   */
  public static void logAndThrowStoreException(
      Throwable t, Logger log, String errMsgFormat, Object... args) throws YarnException {
    String errMsg = String.format(errMsgFormat, args);
    if (t != null) {
      log.error(errMsg, t);
      throw new FederationStateStoreException(errMsg, t);
    } else {
      log.error(errMsg);
      throw new FederationStateStoreException(errMsg);
    }
  }

  /**
   * Throws an <code>FederationStateStoreInvalidInputException</code> due to an
   * error in <code>FederationStateStore</code>.
   *
   * @param log the logger interface
   * @param errMsg the error message
   * @throws YarnException on failure
   */
  public static void logAndThrowInvalidInputException(Logger log, String errMsg)
      throws YarnException {
    log.error(errMsg);
    throw new FederationStateStoreInvalidInputException(errMsg);
  }

  /**
   * Throws an <code>FederationStateStoreRetriableException</code> due to an
   * error in <code>FederationStateStore</code>.
   *
   * @param log the logger interface
   * @param errMsg the error message
   * @param t the throwable raised in the called class.
   * @throws YarnException on failure
   */
  public static void logAndThrowRetriableException(Logger log, String errMsg,
      Throwable t) throws YarnException {
    if (t != null) {
      log.error(errMsg, t);
      throw new FederationStateStoreRetriableException(errMsg, t);
    } else {
      log.error(errMsg);
      throw new FederationStateStoreRetriableException(errMsg);
    }
  }

  /**
   * Throws an <code>FederationStateStoreRetriableException</code> due to an
   * error in <code>FederationStateStore</code>.
   *
   * @param t the throwable raised in the called class.
   * @param log the logger interface.
   * @param errMsgFormat the error message format string.
   * @param args referenced by the format specifiers in the format string.
   * @throws YarnException on failure
   */
  public static void logAndThrowRetriableException(
      Throwable t, Logger log, String errMsgFormat, Object... args) throws YarnException {
    String errMsg = String.format(errMsgFormat, args);
    if (t != null) {
      log.error(errMsg, t);
      throw new FederationStateStoreRetriableException(errMsg, t);
    } else {
      log.error(errMsg);
      throw new FederationStateStoreRetriableException(errMsg);
    }
  }

  /**
   * Throws an <code>FederationStateStoreRetriableException</code> due to an
   * error in <code>FederationStateStore</code>.
   *
   * @param log the logger interface.
   * @param errMsgFormat the error message format string.
   * @param args referenced by the format specifiers in the format string.
   * @throws YarnException on failure
   */
  public static void logAndThrowRetriableException(
      Logger log, String errMsgFormat, Object... args) throws YarnException {
    String errMsg = String.format(errMsgFormat, args);
    log.error(errMsg);
    throw new FederationStateStoreRetriableException(errMsg);
  }

  /**
   * Sets a specific value for a specific property of
   * <code>HikariDataSource</code> SQL connections.
   *
   * @param dataSource the <code>HikariDataSource</code> connections
   * @param property the property to set
   * @param value the value to set
   */
  public static void setProperty(HikariDataSource dataSource, String property,
      String value) {
    LOG.debug("Setting property {} with value {}", property, value);
    if (property != null && !property.isEmpty() && value != null) {
      dataSource.addDataSourceProperty(property, value);
    }
  }

  /**
   * Sets a specific username for <code>HikariDataSource</code> SQL connections.
   *
   * @param dataSource the <code>HikariDataSource</code> connections
   * @param userNameDB the value to set
   */
  public static void setUsername(HikariDataSource dataSource,
      String userNameDB) {
    if (userNameDB != null) {
      dataSource.setUsername(userNameDB);
      LOG.debug("Setting non NULL Username for Store connection");
    } else {
      LOG.debug("NULL Username specified for Store connection, so ignoring");
    }
  }

  /**
   * Sets a specific password for <code>HikariDataSource</code> SQL connections.
   *
   * @param dataSource the <code>HikariDataSource</code> connections
   * @param password the value to set
   */
  public static void setPassword(HikariDataSource dataSource, String password) {
    if (password != null) {
      dataSource.setPassword(password);
      LOG.debug("Setting non NULL Credentials for Store connection");
    } else {
      LOG.debug("NULL Credentials specified for Store connection, so ignoring");
    }
  }

  /**
   * Filter HomeSubCluster based on Filter SubCluster.
   *
   * @param filterSubCluster filter query conditions
   * @param homeSubCluster homeSubCluster
   * @return return true, if match filter conditions,
   *         return false, if not match filter conditions.
   */
  public static boolean filterHomeSubCluster(SubClusterId filterSubCluster,
      SubClusterId homeSubCluster) {

    // If the filter condition is empty,
    // it means that homeSubCluster needs to be added
    if (filterSubCluster == null) {
      return true;
    }

    // If the filter condition filterSubCluster is not empty,
    // and filterSubCluster is equal to homeSubCluster, it needs to be added
    if (filterSubCluster.equals(homeSubCluster)) {
      return true;
    }

    return false;
  }

  /**
   * Encode for Writable objects.
   * This method will convert the writable object to a base64 string.
   *
   * @param key Writable Key.
   * @return base64 string.
   * @throws IOException raised on errors performing I/O.
   */
  public static String encodeWritable(Writable key) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(bos);
    key.write(dos);
    dos.flush();
    return Base64.getUrlEncoder().encodeToString(bos.toByteArray());
  }

  /**
   * Decode Base64 string to Writable object.
   *
   * @param w Writable Key.
   * @param idStr base64 string.
   * @throws IOException raised on errors performing I/O.
   */
  public static void decodeWritable(Writable w, String idStr) throws IOException {
    DataInputStream in = new DataInputStream(
        new ByteArrayInputStream(Base64.getUrlDecoder().decode(idStr)));
    w.readFields(in);
  }

  /**
   * Convert MasterKey to DelegationKey.
   *
   * Before using this function,
   * please use FederationRouterRMTokenInputValidator to verify the request.
   * By default, the request is not empty, and the internal object is not empty.
   *
   * @param request RouterMasterKeyRequest
   * @return DelegationKey.
   */
  public static DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest request) {
    RouterMasterKey masterKey = request.getRouterMasterKey();
    return convertMasterKeyToDelegationKey(masterKey);
  }

  /**
   * Convert MasterKey to DelegationKey.
   *
   * @param masterKey masterKey.
   * @return DelegationKey.
   */
  private static DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey masterKey) {
    ByteBuffer keyByteBuf = masterKey.getKeyBytes();
    byte[] keyBytes = new byte[keyByteBuf.remaining()];
    keyByteBuf.get(keyBytes);
    return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
  }
}