TestSQLFederationStateStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership.  The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.yarn.server.federation.store.impl;

import com.zaxxer.hikari.HikariDataSource;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.sql.DatabaseProduct;
import org.apache.hadoop.yarn.server.federation.store.sql.FederationSQLOutParameter;
import org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner;
import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler;
import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.DatabaseMetaData;
import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_MASTERKEY;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_GET_DELEGATIONTOKEN;
import static org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore.CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.sql.DatabaseProduct.DbType;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_ADDRESERVATIONHOMESUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_ADDRESERVATIONHOMESUBCLUSTER2;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_UPDATERESERVATIONHOMESUBCLUSTER2;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DROP_DELETERESERVATIONHOMESUBCLUSTER;
import static org.apache.hadoop.yarn.server.federation.store.impl.HSQLDBFederationStateStore.SP_DELETERESERVATIONHOMESUBCLUSTER2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static java.sql.Types.VARCHAR;
import static java.sql.Types.BIGINT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Unit tests for SQLFederationStateStore.
 */
public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {

  public static final Logger LOG =
      LoggerFactory.getLogger(TestSQLFederationStateStore.class);
  private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource";
  private static final String DATABASE_URL = "jdbc:hsqldb:mem:state";
  private static final String DATABASE_USERNAME = "SA";
  private static final String DATABASE_PASSWORD = "";
  private SQLFederationStateStore sqlFederationStateStore = null;

  @Override
  protected FederationStateStore createStateStore() {

    YarnConfiguration conf = new YarnConfiguration();

    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
        HSQLDB_DRIVER);
    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME,
        DATABASE_USERNAME);
    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD,
        DATABASE_PASSWORD);
    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
        DATABASE_URL + System.currentTimeMillis());
    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS, 10);
    super.setConf(conf);
    sqlFederationStateStore = new HSQLDBFederationStateStore();
    return sqlFederationStateStore;
  }

  @Test
  public void testSqlConnectionsCreatedCount() throws YarnException {
    FederationStateStore stateStore = getStateStore();
    SubClusterId subClusterId = SubClusterId.newInstance("SC");
    ApplicationId appId = ApplicationId.newInstance(1, 1);

    SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);

    stateStore.registerSubCluster(
        SubClusterRegisterRequest.newInstance(subClusterInfo));
    assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));

    addApplicationHomeSC(appId, subClusterId);
    assertEquals(subClusterId, queryApplicationHomeSC(appId));

    // Verify if connection is created only once at statestore init
    assertEquals(1,
        FederationStateStoreClientMetrics.getNumConnections());
  }

  class ReservationHomeSC {
    private String reservationId;
    private String subHomeClusterId;
    private int dbUpdateCount;

    ReservationHomeSC(String rId, String subHomeSCId, int dbUpdateCount) {
      this.reservationId = rId;
      this.subHomeClusterId = subHomeSCId;
      this.dbUpdateCount = dbUpdateCount;
    }
  }

  private ReservationHomeSC addReservationHomeSubCluster(String procedure,
      String reservationId, String subHomeClusterId) throws SQLException, YarnException {
    // procedure call parameter preparation
    CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
    cstmt.setString("reservationId_IN", reservationId);
    cstmt.setString("homeSubCluster_IN", subHomeClusterId);
    cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
    cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);

    // execute procedure
    cstmt.executeUpdate();

    // get call result
    String dbStoredHomeSubCluster = cstmt.getString("storedHomeSubCluster_OUT");
    int dbRowCount = cstmt.getInt("rowCount_OUT");

    // return cstmt to pool
    FederationStateStoreUtils.returnToPool(LOG, cstmt);

    return new ReservationHomeSC(reservationId, dbStoredHomeSubCluster, dbRowCount);
  }

  private ReservationHomeSC getReservationHomeSubCluster(String procedure,
      String reservationId) throws SQLException, YarnException {

    // procedure call parameter preparation
    CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);
    cstmt.setString("reservationId_IN", reservationId.toString());
    cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);

    // execute procedure
    cstmt.execute();

    // get call result
    String dBSubClusterHomeId = cstmt.getString("homeSubCluster_OUT");

    // return cstmt to pool
    FederationStateStoreUtils.returnToPool(LOG, cstmt);

    // returns the ReservationHomeSubCluster object
    return new ReservationHomeSC(reservationId, dBSubClusterHomeId, 0);
  }

  private List<ReservationHomeSC> getReservationsHomeSubCluster(String procedure)
      throws SQLException, IOException, YarnException {

    List<ReservationHomeSC> results = new ArrayList<>();

    // procedure call parameter preparation
    CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);

    // execute procedure
    ResultSet rs = cstmt.executeQuery();
    while (rs.next()) {
      // 1���OUT reservationId
      String dbReservationId = rs.getString("reservationId");

      // 2���OUT homeSubCluster
      String dbHomeSubCluster = rs.getString("homeSubCluster");
      results.add(new ReservationHomeSC(dbReservationId, dbHomeSubCluster, 0));
    }

    // return cstmt to pool
    FederationStateStoreUtils.returnToPool(LOG, cstmt);

    // return ReservationHomeSubCluster List
    return results;
  }

  private ReservationHomeSC updateReservationHomeSubCluster(String procedure,
      String reservationId, String subHomeClusterId)
      throws SQLException, IOException {

    // procedure call parameter preparation
    CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);

    // 1���IN reservationId_IN varchar(128)
    cstmt.setString("reservationId_IN", reservationId);
    // 2���IN homeSubCluster_IN varchar(256)
    cstmt.setString("homeSubCluster_IN", subHomeClusterId);
    // 3���OUT rowCount_OUT int
    cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);

    // execute procedure
    cstmt.executeUpdate();

    // get rowcount
    int rowCount = cstmt.getInt("rowCount_OUT");

    // returns the ReservationHomeSubCluster object
    return new ReservationHomeSC(reservationId, subHomeClusterId, rowCount);
  }

  private ReservationHomeSC deleteReservationHomeSubCluster(String procedure,
      String reservationId) throws SQLException {
    // procedure call parameter preparation
    CallableStatement cstmt = sqlFederationStateStore.getCallableStatement(procedure);

    // Set the parameters for the stored procedure
    // 1���IN reservationId_IN varchar(128)
    cstmt.setString("reservationId_IN", reservationId);
    // 2���OUT rowCount_OUT int
    cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);

    // execute procedure
    cstmt.executeUpdate();

    // get rowcount
    int rowCount = cstmt.getInt("rowCount_OUT");

    // returns the ReservationHomeSubCluster object
    return new ReservationHomeSC(reservationId, "-", rowCount);
  }

  /**
   * This test case is used to check whether the procedure
   * sp_addReservationHomeSubCluster can be executed normally.
   *
   * This test case will write 1 record to the database, and check returns the result.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testCheckAddReservationHomeSubCluster() throws Exception {

    // procedure call parameter preparation
    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    String subHomeClusterId = "SC-1";
    ReservationHomeSC resultHC = addReservationHomeSubCluster(
        CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);

    // validation results
    assertNotNull(resultHC);
    assertEquals(subHomeClusterId, resultHC.subHomeClusterId);
    assertEquals(1, resultHC.dbUpdateCount);
  }

  /**
   * This test case is used to check whether the procedure
   * sp_getReservationHomeSubCluster can be executed normally.
   *
   * Query according to reservationId, expect accurate query results,
   * and check the homeSubCluster field.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testCheckGetReservationHomeSubCluster() throws Exception {

    // procedure call parameter preparation
    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    String subHomeClusterId = "SC-1";
    addReservationHomeSubCluster(
        CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);

    // Call getReservationHomeSubCluster to get the result
    ReservationHomeSC resultHC = getReservationHomeSubCluster(
        CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());

    assertNotNull(resultHC);
    assertEquals(subHomeClusterId, resultHC.subHomeClusterId);
    assertEquals(reservationId.toString(), resultHC.reservationId);
  }

  /**
   * This test case is used to check whether the procedure
   * sp_getReservationsHomeSubCluster can be executed normally.
   *
   * This test case will write 2 record to the database, and check returns the result.
   *
   * The test case will compare the number of returned records from the database
   * and whether the content of each returned record is accurate.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testCheckGetReservationsHomeSubCluster() throws Exception {

    // add 1st record
    ReservationId reservationId1 = ReservationId.newInstance(Time.now(), 1);
    String subHomeClusterId1 = "SC-1";
    addReservationHomeSubCluster(
        CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER,  reservationId1.toString(),  subHomeClusterId1);

    // add 2nd record
    ReservationId reservationId2 = ReservationId.newInstance(Time.now(), 2);
    String subHomeClusterId2 = "SC-2";
    addReservationHomeSubCluster(
        CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER,  reservationId2.toString(),  subHomeClusterId2);

    List<ReservationHomeSC> reservationHomeSubClusters = getReservationsHomeSubCluster(
        CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);

    assertNotNull(reservationHomeSubClusters);
    assertEquals(2, reservationHomeSubClusters.size());

    ReservationHomeSC resultHC1 = reservationHomeSubClusters.get(0);
    assertNotNull(resultHC1);
    assertEquals(reservationId1.toString(), resultHC1.reservationId);
    assertEquals(subHomeClusterId1, resultHC1.subHomeClusterId);

    ReservationHomeSC resultHC2 = reservationHomeSubClusters.get(1);
    assertNotNull(resultHC2);
    assertEquals(reservationId2.toString(), resultHC2.reservationId);
    assertEquals(subHomeClusterId2, resultHC2.subHomeClusterId);
  }

  /**
   * This test case is used to check whether the procedure
   * sp_updateReservationHomeSubCluster can be executed normally.
   *
   * This test case will first insert 1 record into the database,
   * and then update the new SubHomeClusterId according to the reservationId.
   *
   * It will check whether the SubHomeClusterId is as expected after the addition and update.
   * For the first time, the HomeClusterId of the database should be SC-1,
   * and for the second time, the HomeClusterId of the database should be SC-2.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testCheckUpdateReservationHomeSubCluster() throws Exception {

    // procedure call parameter preparation
    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    String subHomeClusterId = "SC-1";
    addReservationHomeSubCluster(
        CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);

    // verify that the subHomeClusterId corresponding to reservationId is SC-1
    ReservationHomeSC resultHC = getReservationHomeSubCluster(
        CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
    assertNotNull(resultHC);
    assertEquals(subHomeClusterId, resultHC.subHomeClusterId);

    // prepare to update parameters
    String newSubHomeClusterId = "SC-2";
    ReservationHomeSC reservationHomeSubCluster =
        updateReservationHomeSubCluster(
        CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), newSubHomeClusterId);

    assertNotNull(reservationHomeSubCluster);
    assertEquals(1, reservationHomeSubCluster.dbUpdateCount);

    // verify that the subHomeClusterId corresponding to reservationId is SC-2
    ReservationHomeSC resultHC2 = getReservationHomeSubCluster(
        CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
    assertNotNull(resultHC2);
    assertEquals(newSubHomeClusterId, resultHC2.subHomeClusterId);
  }

  /**
   * This test case is used to check whether the procedure
   * sp_deleteReservationHomeSubCluster can be executed normally.
   *
   * This test case will first write 1 record to the database,
   * and then delete the corresponding record according to reservationId.
   *
   * Query the corresponding homeSubCluster according to reservationId,
   * we should get a NULL at this time.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testCheckDeleteReservationHomeSubCluster() throws Exception {

    // procedure call parameter preparation
    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    String subHomeClusterId = "SC-1";
    addReservationHomeSubCluster(
        CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER, reservationId.toString(), subHomeClusterId);

    // call the delete method of the reservation
    ReservationHomeSC resultHC = deleteReservationHomeSubCluster(
        CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());

    assertNotNull(resultHC);
    assertEquals(1, resultHC.dbUpdateCount);

    // call getReservationHomeSubCluster to get the result
    ReservationHomeSC resultHC1 = getReservationHomeSubCluster(
        CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER, reservationId.toString());
    assertNotNull(resultHC1);
    assertEquals(null, resultHC1.subHomeClusterId);
  }

  /**
   * This test case is used to verify the processing logic of the incorrect number of
   * updated records returned by the database when AddReservationHomeSubCluster is used.
   *
   * The probability of the database returning an update record greater than 1 is very low.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testAddReservationHomeSubClusterAbnormalSituation() throws Exception {

    Connection conn =  sqlFederationStateStore.getConn();
    conn.prepareStatement(SP_DROP_ADDRESERVATIONHOMESUBCLUSTER).execute();
    conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER2).execute();

    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    SubClusterId subClusterId = SubClusterId.newInstance("SC");

    ReservationHomeSubCluster reservationHomeSubCluster =
        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
    AddReservationHomeSubClusterRequest request =
        AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);

    String errorMsg = String.format(
        "Wrong behavior during the insertion of subCluster %s according to reservation %s. " +
        "The database expects to insert 1 record, but the number of " +
        "inserted changes is greater than 1, " +
        "please check the records of the database.", subClusterId, reservationId);

    LambdaTestUtils.intercept(YarnException.class, errorMsg,
        () -> sqlFederationStateStore.addReservationHomeSubCluster(request));
  }

  /**
   * This test case is used to verify the logic when calling the updateReservationHomeSubCluster
   * method if the database returns an inaccurate result.
   *
   * The probability of the database returning an update record greater than 1 is very low.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testUpdateReservationHomeSubClusterAbnormalSituation() throws Exception {

    Connection conn =  sqlFederationStateStore.getConn();
    conn.prepareStatement(SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER).execute();
    conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER2).execute();

    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    SubClusterId subClusterId1 = SubClusterId.newInstance("SC");

    // add Reservation data.
    ReservationHomeSubCluster reservationHomeSubCluster =
        ReservationHomeSubCluster.newInstance(reservationId, subClusterId1);
    AddReservationHomeSubClusterRequest addRequest =
        AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
    sqlFederationStateStore.addReservationHomeSubCluster(addRequest);

    SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
    ReservationHomeSubCluster reservationHomeSubCluster2 =
        ReservationHomeSubCluster.newInstance(reservationId, subClusterId2);
    UpdateReservationHomeSubClusterRequest updateRequest =
        UpdateReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster2);

    String errorMsg = String.format(
        "Wrong behavior during update the subCluster %s according to reservation %s. " +
        "The database is expected to update 1 record, " +
        "but the number of database update records is greater than 1, " +
        "the records of the database should be checked.",
        subClusterId2, reservationId);

    LambdaTestUtils.intercept(YarnException.class, errorMsg,
        () -> sqlFederationStateStore.updateReservationHomeSubCluster(updateRequest));
  }

  /**
   * This test case is used to verify the logic when calling the deleteReservationHomeSubCluster
   * method if the database returns an inaccurate result.
   *
   * The probability of the database returning an update record greater than 1 is very low.
   *
   * @throws Exception when the error occurs
   */
  @Test
  public void testDeleteReservationHomeSubClusterAbnormalSituation() throws Exception {

    Connection conn =  sqlFederationStateStore.getConn();
    conn.prepareStatement(SP_DROP_DELETERESERVATIONHOMESUBCLUSTER).execute();
    conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER2).execute();

    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
    SubClusterId subClusterId1 = SubClusterId.newInstance("SC");

    // add Reservation data.
    ReservationHomeSubCluster reservationHomeSubCluster =
        ReservationHomeSubCluster.newInstance(reservationId, subClusterId1);
    AddReservationHomeSubClusterRequest addRequest =
        AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
    sqlFederationStateStore.addReservationHomeSubCluster(addRequest);

    DeleteReservationHomeSubClusterRequest delRequest =
        DeleteReservationHomeSubClusterRequest.newInstance(reservationId);

    String errorMsg = String.format(
        "Wrong behavior during deleting the reservation %s. " +
        "The database is expected to delete 1 record, " +
        "but the number of deleted records returned by the database is greater than 1, " +
        "indicating that a duplicate reservationId occurred during the deletion process.",
        reservationId);

    LambdaTestUtils.intercept(YarnException.class, errorMsg,
        () -> sqlFederationStateStore.deleteReservationHomeSubCluster(delRequest));
  }

  @Override
  protected void checkRouterMasterKey(DelegationKey delegationKey,
      RouterMasterKey routerMasterKey) throws YarnException, IOException, SQLException {
    // Check for MasterKey stored in DB.
    RouterMasterKeyRequest routerMasterKeyRequest =
        RouterMasterKeyRequest.newInstance(routerMasterKey);

    // Query Data from DB.
    Connection conn =  sqlFederationStateStore.getConn();
    int paramKeyId = delegationKey.getKeyId();
    FederationQueryRunner runner = new FederationQueryRunner();
    FederationSQLOutParameter<String> masterKeyOUT =
        new FederationSQLOutParameter<>("masterKey_OUT", VARCHAR, String.class);
    RouterMasterKey sqlRouterMasterKey = runner.execute(
        conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), paramKeyId, masterKeyOUT);

    // Check Data.
    RouterMasterKeyResponse response = getStateStore().
        getMasterKeyByDelegationKey(routerMasterKeyRequest);
    assertNotNull(response);
    RouterMasterKey respRouterMasterKey = response.getRouterMasterKey();
    assertEquals(routerMasterKey, respRouterMasterKey);
    assertEquals(routerMasterKey, sqlRouterMasterKey);
    assertEquals(sqlRouterMasterKey, respRouterMasterKey);
  }

  @Override
  protected void checkRouterStoreToken(RMDelegationTokenIdentifier identifier,
      RouterStoreToken token) throws YarnException, IOException, SQLException {
    // Get SequenceNum.
    int sequenceNum = identifier.getSequenceNumber();

    // Query Data from DB.
    Connection conn = sqlFederationStateStore.getConn();
    FederationQueryRunner runner = new FederationQueryRunner();
    FederationSQLOutParameter<String> tokenIdentOUT =
         new FederationSQLOutParameter<>("tokenIdent_OUT", VARCHAR, String.class);
    FederationSQLOutParameter<String> tokenOUT =
         new FederationSQLOutParameter<>("token_OUT", VARCHAR, String.class);
    FederationSQLOutParameter<Long> renewDateOUT =
         new FederationSQLOutParameter<>("renewDate_OUT", BIGINT, Long.class);
    RouterStoreToken sqlRouterStoreToken = runner.execute(conn, CALL_SP_GET_DELEGATIONTOKEN,
        new RouterStoreTokenHandler(), sequenceNum, tokenIdentOUT, tokenOUT, renewDateOUT);

    assertEquals(token, sqlRouterStoreToken);
  }

  @Test
  public void testCheckHSQLDB() throws SQLException {
    Connection conn =  sqlFederationStateStore.getConn();
    DbType dbType = DatabaseProduct.getDbType(conn);
    assertEquals(DbType.HSQLDB, dbType);
  }

  @Test
  public void testGetDbTypeNullConn() throws SQLException {
    DbType dbType = DatabaseProduct.getDbType(null);
    assertEquals(DbType.UNDEFINED, dbType);
  }

  @Test
  public void testGetDBTypeEmptyConn() throws SQLException {
    Connection connection = mock(Connection.class);
    DatabaseMetaData metaData = mock(DatabaseMetaData.class);
    when(metaData.getDatabaseProductName()).thenReturn("");
    when(connection.getMetaData()).thenReturn(metaData);
    DbType dbType = DatabaseProduct.getDbType(connection);
    assertEquals(DbType.UNDEFINED, dbType);
  }

  @Test
  public void testCheckForHSQLDBUpdateSQL() throws SQLException {
    String sql = "select sequenceName, nextVal from sequenceTable";
    String hsqlDBSQL = DatabaseProduct.addForUpdateClause(DbType.HSQLDB, sql);
    String expectUpdateSQL = "select sequenceName, nextVal from sequenceTable for update";
    assertEquals(expectUpdateSQL, hsqlDBSQL);
  }

  @Test
  public void testCheckForSqlServerDBUpdateSQL() throws SQLException {
    String sql = "select sequenceName, nextVal from sequenceTable";
    String sqlServerDBSQL = DatabaseProduct.addForUpdateClause(DbType.SQLSERVER, sql);
    String expectUpdateSQL = "select sequenceName, nextVal from sequenceTable with (updlock)";
    assertEquals(expectUpdateSQL, sqlServerDBSQL);
  }

  @Test
  public void testCheckHikariDataSourceParam() throws SQLException {
    HikariDataSource dataSource = sqlFederationStateStore.getDataSource();
    long maxLifeTime = dataSource.getMaxLifetime();
    long idleTimeOut = dataSource.getIdleTimeout();
    long connTimeOut = dataSource.getConnectionTimeout();
    String poolName = dataSource.getPoolName();
    int minimumIdle = dataSource.getMinimumIdle();
    int maximumPoolSize = dataSource.getMaximumPoolSize();

    // maxLifeTime 30 minute, 1800000 ms
    assertEquals(1800000, maxLifeTime);
    // idleTimeOut 10 minute, 600000 ms
    assertEquals(600000, idleTimeOut);
    // connTimeOut 10 second, 10000 ms
    assertEquals(10000, connTimeOut);
    assertEquals("YARN-Federation-DataBasePool", poolName);
    assertEquals(1, minimumIdle);
    assertEquals(10, maximumPoolSize);
  }
}