HSQLDBFederationStateStore.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HSQLDB implementation of {@link FederationStateStore}.
*/
public class HSQLDBFederationStateStore extends SQLFederationStateStore {
private static final Logger LOG =
LoggerFactory.getLogger(HSQLDBFederationStateStore.class);
private Connection conn;
private static final String TABLE_APPLICATIONSHOMESUBCLUSTER =
" CREATE TABLE applicationsHomeSubCluster ("
+ " applicationId varchar(64) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL,"
+ " createTime datetime NOT NULL,"
+ " applicationContext BLOB NULL,"
+ " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
private static final String TABLE_MEMBERSHIP =
"CREATE TABLE membership ( subClusterId varchar(256) NOT NULL,"
+ " amRMServiceAddress varchar(256) NOT NULL,"
+ " clientRMServiceAddress varchar(256) NOT NULL,"
+ " rmAdminServiceAddress varchar(256) NOT NULL,"
+ " rmWebServiceAddress varchar(256) NOT NULL,"
+ " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL,"
+ " lastStartTime bigint NULL, capability varchar(6000) NOT NULL,"
+ " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))";
private static final String TABLE_POLICIES =
"CREATE TABLE policies ( queue varchar(256) NOT NULL,"
+ " policyType varchar(256) NOT NULL, params varbinary(512),"
+ " CONSTRAINT pk_queue PRIMARY KEY (queue))";
private static final String TABLE_RESERVATIONSHOMESUBCLUSTER =
" CREATE TABLE reservationsHomeSubCluster ("
+ " reservationId varchar(128) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL,"
+ " CONSTRAINT pk_reservationId PRIMARY KEY (reservationId))";
private static final String TABLE_MASTERKEYS =
" CREATE TABLE masterKeys ("
+ " keyId bigint NOT NULL,"
+ " masterKey varchar(1024) NOT NULL,"
+ " CONSTRAINT pk_keyId PRIMARY KEY (keyId))";
private static final String TABLE_DELEGATIONTOKENS =
" CREATE TABLE delegationTokens ("
+ " sequenceNum bigint NOT NULL,"
+ " tokenIdent varchar(1024) NOT NULL,"
+ " token varchar(1024) NOT NULL,"
+ " renewDate bigint NOT NULL,"
+ " CONSTRAINT pk_sequenceNum PRIMARY KEY (sequenceNum))";
private static final String TABLE_SEQUENCETABLE =
" CREATE TABLE sequenceTable ("
+ " sequenceName varchar(255) NOT NULL,"
+ " nextVal bigint NOT NULL,"
+ " CONSTRAINT pk_sequenceName PRIMARY KEY (sequenceName))";
private static final String TABLE_VERSIONS =
"CREATE TABLE versions ("
+ " fedVersion varbinary(1024) NOT NULL,"
+ " versionComment VARCHAR(255),"
+ " CONSTRAINT pk_fedVersion PRIMARY KEY (fedVersion))";
private static final String SP_REGISTERSUBCLUSTER =
"CREATE PROCEDURE sp_registerSubCluster("
+ " IN subClusterId_IN varchar(256),"
+ " IN amRMServiceAddress_IN varchar(256),"
+ " IN clientRMServiceAddress_IN varchar(256),"
+ " IN rmAdminServiceAddress_IN varchar(256),"
+ " IN rmWebServiceAddress_IN varchar(256),"
+ " IN state_IN varchar(256),"
+ " IN lastStartTime_IN bigint, IN capability_IN varchar(6000),"
+ " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);"
+ " INSERT INTO membership ( subClusterId,"
+ " amRMServiceAddress, clientRMServiceAddress,"
+ " rmAdminServiceAddress, rmWebServiceAddress,"
+ " lastHeartBeat, state, lastStartTime,"
+ " capability) VALUES ( subClusterId_IN,"
+ " amRMServiceAddress_IN, clientRMServiceAddress_IN,"
+ " rmAdminServiceAddress_IN, rmWebServiceAddress_IN,"
+ " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE,"
+ " state_IN, lastStartTime_IN, capability_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_DEREGISTERSUBCLUSTER =
"CREATE PROCEDURE sp_deregisterSubCluster("
+ " IN subClusterId_IN varchar(256),"
+ " IN state_IN varchar(64), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE membership SET state = state_IN WHERE ("
+ " subClusterId = subClusterId_IN AND state != state_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_SUBCLUSTERHEARTBEAT =
"CREATE PROCEDURE sp_subClusterHeartbeat("
+ " IN subClusterId_IN varchar(256), IN state_IN varchar(64),"
+ " IN capability_IN varchar(6000), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership"
+ " SET capability = capability_IN, state = state_IN,"
+ " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'"
+ " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETSUBCLUSTER =
"CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256),"
+ " OUT amRMServiceAddress_OUT varchar(256),"
+ " OUT clientRMServiceAddress_OUT varchar(256),"
+ " OUT rmAdminServiceAddress_OUT varchar(256),"
+ " OUT rmWebServiceAddress_OUT varchar(256),"
+ " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64),"
+ " OUT lastStartTime_OUT bigint,"
+ " OUT capability_OUT varchar(6000))"
+ " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress,"
+ " clientRMServiceAddress,"
+ " rmAdminServiceAddress, rmWebServiceAddress,"
+ " lastHeartBeat, state, lastStartTime, capability"
+ " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT,"
+ " rmAdminServiceAddress_OUT,"
+ " rmWebServiceAddress_OUT, lastHeartBeat_OUT,"
+ " state_OUT, lastStartTime_OUT, capability_OUT"
+ " FROM membership WHERE subClusterId = subClusterId_IN; END";
private static final String SP_GETSUBCLUSTERS =
"CREATE PROCEDURE sp_getSubClusters()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress,"
+ " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat,"
+ " state, lastStartTime, capability"
+ " FROM membership; OPEN result; END";
private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_addApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64),"
+ " IN homeSubCluster_IN varchar(256),"
+ " IN applicationContext_IN BLOB,"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO applicationsHomeSubCluster "
+ " (applicationId,homeSubCluster,createTime,applicationContext) "
+ " (SELECT applicationId_IN, homeSubCluster_IN, "
+ " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE, "
+ " applicationContext_IN "
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN"
+ " HAVING COUNT(*) = 0 );"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationID_IN; END";
private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_updateApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64),"
+ " IN homeSubCluster_IN varchar(256), "
+ " IN applicationContext_IN BLOB, OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE applicationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN, "
+ " applicationContext = applicationContext_IN "
+ " WHERE applicationId = applicationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64),"
+ " OUT homeSubCluster_OUT varchar(256),"
+ " OUT createTime_OUT datetime,"
+ " OUT applicationContext_OUT BLOB)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT homeSubCluster, applicationContext, createTime "
+ " INTO homeSubCluster_OUT, applicationContext_OUT, createTime_OUT "
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationID_IN; END";
private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getApplicationsHomeSubCluster("
+ "IN limit_IN int, IN homeSubCluster_IN varchar(256))"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT applicationId, homeSubCluster, createTime"
+ " FROM applicationsHomeSubCluster "
+ " WHERE ROWNUM() <= limit_IN AND "
+ " (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN) "
+ " ORDER BY createTime desc; "
+ " OPEN result; END";
private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
+ " IN applicationId_IN varchar(64), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_SETPOLICYCONFIGURATION =
"CREATE PROCEDURE sp_setPolicyConfiguration("
+ " IN queue_IN varchar(256), IN policyType_IN varchar(256),"
+ " IN params_IN varbinary(512), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM policies WHERE queue = queue_IN;"
+ " INSERT INTO policies (queue, policyType, params)"
+ " VALUES (queue_IN, policyType_IN, params_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_GETPOLICYCONFIGURATION =
"CREATE PROCEDURE sp_getPolicyConfiguration("
+ " IN queue_IN varchar(256), OUT policyType_OUT varchar(256),"
+ " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT policyType, params INTO policyType_OUT, params_OUT"
+ " FROM policies WHERE queue = queue_IN; END";
private static final String SP_GETPOLICIESCONFIGURATIONS =
"CREATE PROCEDURE sp_getPoliciesConfigurations()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT * FROM policies; OPEN result; END";
private static final String SP_ADDRESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_addReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256),"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO reservationsHomeSubCluster "
+ " (reservationId,homeSubCluster) "
+ " (SELECT reservationId_IN, homeSubCluster_IN"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN"
+ " HAVING COUNT(*) = 0 );"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN; END";
private static final String SP_GETRESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " OUT homeSubCluster_OUT varchar(256))"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT homeSubCluster INTO homeSubCluster_OUT"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN; END";
private static final String SP_GETRESERVATIONSHOMESUBCLUSTER =
"CREATE PROCEDURE sp_getReservationsHomeSubCluster()"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
+ " SELECT reservationId, homeSubCluster"
+ " FROM reservationsHomeSubCluster; OPEN result; END";
private static final String SP_DELETERESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
private static final String SP_UPDATERESERVATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_updateReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE reservationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN"
+ " WHERE reservationId = reservationId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
protected static final String SP_DROP_ADDRESERVATIONHOMESUBCLUSTER =
"DROP PROCEDURE sp_addReservationHomeSubCluster";
protected static final String SP_ADDRESERVATIONHOMESUBCLUSTER2 =
"CREATE PROCEDURE sp_addReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256),"
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO reservationsHomeSubCluster "
+ " (reservationId,homeSubCluster) "
+ " (SELECT reservationId_IN, homeSubCluster_IN"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN"
+ " HAVING COUNT(*) = 0 );"
+ " SELECT homeSubCluster, 2 INTO storedHomeSubCluster_OUT, rowCount_OUT"
+ " FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN; END";
protected static final String SP_DROP_UPDATERESERVATIONHOMESUBCLUSTER =
"DROP PROCEDURE sp_updateReservationHomeSubCluster";
protected static final String SP_UPDATERESERVATIONHOMESUBCLUSTER2 =
"CREATE PROCEDURE sp_updateReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128),"
+ " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE reservationsHomeSubCluster"
+ " SET homeSubCluster = homeSubCluster_IN"
+ " WHERE reservationId = reservationId_IN;"
+ " SET rowCount_OUT = 2; END";
protected static final String SP_DROP_DELETERESERVATIONHOMESUBCLUSTER =
"DROP PROCEDURE sp_deleteReservationHomeSubCluster";
protected static final String SP_DELETERESERVATIONHOMESUBCLUSTER2 =
"CREATE PROCEDURE sp_deleteReservationHomeSubCluster("
+ " IN reservationId_IN varchar(128), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM reservationsHomeSubCluster"
+ " WHERE reservationId = reservationId_IN;"
+ " SET rowCount_OUT = 2; END";
protected static final String SP_DROP_ADDMASTERKEY = "DROP PROCEDURE sp_addMasterKey";
protected static final String SP_ADDMASTERKEY =
"CREATE PROCEDURE sp_addMasterKey("
+ " IN keyId_IN int, IN masterKey_IN varchar(1024),"
+ " OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC "
+ " INSERT INTO masterKeys(keyId, masterKey)"
+ " (SELECT keyId_IN, masterKey_IN"
+ " FROM masterKeys "
+ " WHERE keyId = keyId_IN "
+ " HAVING COUNT(*) = 0);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ " END";
protected static final String SP_DROP_GETMASTERKEY = "DROP PROCEDURE sp_getMasterKey";
protected static final String SP_GETMASTERKEY =
"CREATE PROCEDURE sp_getMasterKey("
+ " IN keyId_IN int,"
+ " OUT masterKey_OUT varchar(1024))"
+ " MODIFIES SQL DATA BEGIN ATOMIC "
+ " SELECT masterKey INTO masterKey_OUT "
+ " FROM masterKeys "
+ " WHERE keyId = keyId_IN; "
+ " END ";
protected static final String SP_DROP_DELETEMASTERKEY = "DROP PROCEDURE sp_deleteMasterKey";
protected static final String SP_DELETEMASTERKEY =
"CREATE PROCEDURE sp_deleteMasterKey("
+ " IN keyId_IN int, OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM masterKeys WHERE keyId = keyId_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
protected static final String SP_DROP_ADD_DELEGATIONTOKEN =
"DROP PROCEDURE sp_addDelegationToken";
protected static final String SP_ADD_DELEGATIONTOKEN =
"CREATE PROCEDURE sp_addDelegationToken("
+ " IN sequenceNum_IN bigint, IN tokenIdent_IN varchar(1024),"
+ " IN token_IN varchar(1024), IN renewDate_IN bigint, OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC "
+ " INSERT INTO delegationTokens(sequenceNum, tokenIdent, token, renewDate)"
+ " (SELECT sequenceNum_IN, tokenIdent_IN, token_IN, renewDate_IN"
+ " FROM delegationTokens"
+ " WHERE sequenceNum = sequenceNum_IN"
+ " HAVING COUNT(*) = 0);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ " END";
protected static final String SP_DROP_GET_DELEGATIONTOKEN =
"DROP PROCEDURE sp_getDelegationToken";
protected static final String SP_GET_DELEGATIONTOKEN =
"CREATE PROCEDURE sp_getDelegationToken("
+ " IN sequenceNum_IN bigint, OUT tokenIdent_OUT varchar(1024), "
+ " OUT token_OUT varchar(1024), OUT renewDate_OUT bigint)"
+ " MODIFIES SQL DATA BEGIN ATOMIC "
+ " SELECT tokenIdent, token, renewDate INTO "
+ " tokenIdent_OUT, token_OUT, renewDate_OUT"
+ " FROM delegationTokens"
+ " WHERE sequenceNum = sequenceNum_IN; "
+ " END ";
protected static final String SP_DROP_UPDATE_DELEGATIONTOKEN =
"DROP PROCEDURE sp_updateDelegationToken";
protected static final String SP_UPDATE_DELEGATIONTOKEN =
"CREATE PROCEDURE sp_updateDelegationToken("
+ " IN sequenceNum_IN bigint, IN tokenIdent_IN varchar(1024),"
+ " IN token_IN varchar(1024), IN renewDate_IN bigint, OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " UPDATE delegationTokens"
+ " SET tokenIdent = tokenIdent_IN,"
+ " token = token_IN, renewDate = renewDate_IN"
+ " WHERE sequenceNum = sequenceNum_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; "
+ " END ";
protected static final String SP_DROP_DELETE_DELEGATIONTOKEN =
"DROP PROCEDURE sp_deleteDelegationToken";
protected static final String SP_DELETE_DELEGATIONTOKEN =
"CREATE PROCEDURE sp_deleteDelegationToken("
+ " IN sequenceNum_IN bigint, OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM delegationTokens"
+ " WHERE sequenceNum = sequenceNum_IN;"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; "
+ " END ";
protected static final String SP_STORE_VERSION =
"CREATE PROCEDURE sp_storeVersion("
+ " IN fedVersion_IN varbinary(1024), IN versionComment_IN varchar(256), "
+ " OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " DELETE FROM versions;"
+ " INSERT INTO versions (fedVersion, versionComment)"
+ " VALUES (fedVersion_IN, versionComment_IN);"
+ " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; "
+ " END ";
protected static final String SP_GET_VERSION =
"CREATE PROCEDURE sp_getVersion("
+ " OUT fedVersion_OUT varbinary(1024), OUT versionComment_OUT varchar(256))"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " SELECT fedVersion, versionComment INTO fedVersion_OUT, versionComment_OUT"
+ " FROM versions; "
+ " END ";
private List<String> tables = new ArrayList<>();
@Override
public void init(Configuration conf) {
try {
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.init(conf);
conn = super.getConn();
LOG.info("Database Init: Start");
conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(TABLE_MEMBERSHIP).execute();
conn.prepareStatement(TABLE_POLICIES).execute();
conn.prepareStatement(TABLE_RESERVATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(TABLE_MASTERKEYS).execute();
conn.prepareStatement(TABLE_DELEGATIONTOKENS).execute();
conn.prepareStatement(TABLE_SEQUENCETABLE).execute();
conn.prepareStatement(TABLE_VERSIONS).execute();
conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute();
conn.prepareStatement(SP_GETSUBCLUSTER).execute();
conn.prepareStatement(SP_GETSUBCLUSTERS).execute();
conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute();
conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute();
conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
conn.prepareStatement(SP_ADDRESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETRESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_GETRESERVATIONSHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_DELETERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER).execute();
conn.prepareStatement(SP_ADDMASTERKEY).execute();
conn.prepareStatement(SP_GETMASTERKEY).execute();
conn.prepareStatement(SP_DELETEMASTERKEY).execute();
conn.prepareStatement(SP_ADD_DELEGATIONTOKEN).execute();
conn.prepareStatement(SP_GET_DELEGATIONTOKEN).execute();
conn.prepareStatement(SP_UPDATE_DELEGATIONTOKEN).execute();
conn.prepareStatement(SP_DELETE_DELEGATIONTOKEN).execute();
conn.prepareStatement(SP_STORE_VERSION).execute();
conn.prepareStatement(SP_GET_VERSION).execute();
LOG.info("Database Init: Complete");
} catch (Exception e) {
LOG.error("ERROR: failed to initialize HSQLDB {}.", e.getMessage());
}
}
public void initConnection(Configuration conf) {
try {
super.init(conf);
conn = super.getConn();
} catch (YarnException e1) {
LOG.error("ERROR: failed open connection to HSQLDB DB {}.", e1.getMessage());
}
}
public void closeConnection() {
try {
conn.close();
} catch (SQLException e) {
LOG.error("ERROR: failed to close connection to HSQLDB DB {}.", e.getMessage());
}
}
/**
* Extract The Create Table Sql From The Script.
*
* @param dbIdentifier database identifier, Like Mysql / SqlServer
* @param regex the regex
* @throws IOException IO exception.
*/
protected void extractCreateTableSQL(String dbIdentifier, String regex) throws IOException {
String[] createTableScriptPathItems = new String[] {
".", "target", "test-classes", dbIdentifier, "FederationStateStoreTables.sql" };
String createTableScriptPath = StringUtils.join(createTableScriptPathItems, File.separator);
String createTableSQL =
FileUtils.readFileToString(new File(createTableScriptPath), StandardCharsets.UTF_8);
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(createTableSQL);
while (m != null && m.find()) {
String group = m.group();
tables.add(group);
}
}
public List<String> getTables() {
return tables;
}
public void setTables(List<String> tables) {
this.tables = tables;
}
}