ConfigurationUpdateAssembler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class ConfigurationUpdateAssembler {
private ConfigurationUpdateAssembler() {
}
public static Map<String, String> constructKeyValueConfUpdate(
CapacitySchedulerConfiguration proposedConf,
SchedConfUpdateInfo mutationInfo) throws IOException {
Map<String, String> confUpdate = new HashMap<>();
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
removeQueue(queueToRemove, proposedConf, confUpdate);
}
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
addQueue(addQueueInfo, proposedConf, confUpdate);
}
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
updateQueue(updateQueueInfo, proposedConf, confUpdate);
}
for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
.entrySet()) {
confUpdate.put(global.getKey(), global.getValue());
}
return confUpdate;
}
private static void removeQueue(
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (queueToRemove == null) {
return;
}
QueuePath queuePath = new QueuePath(queueToRemove);
if (queuePath.isRoot() || queuePath.isInvalid()) {
throw new IOException("Can't remove queue " + queuePath.getFullPath());
}
String queueName = queuePath.getLeafName();
List<String> siblingQueues = getSiblingQueues(queuePath,
proposedConf);
if (!siblingQueues.contains(queueName)) {
throw new IOException("Queue " + queuePath.getFullPath() + " not found");
}
siblingQueues.remove(queueName);
QueuePath parentPath = queuePath.getParentObject();
proposedConf.setQueues(parentPath, siblingQueues.toArray(
new String[0]));
String queuesConfig = getQueuesConfig(parentPath);
if (siblingQueues.isEmpty()) {
confUpdate.put(queuesConfig, null);
// Unset Ordering Policy of Leaf Queue converted from
// Parent Queue after removeQueue
String queueOrderingPolicy = getOrderingPolicyConfig(parentPath);
proposedConf.unset(queueOrderingPolicy);
confUpdate.put(queueOrderingPolicy, null);
} else {
confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
}
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
".*" + queuePath.getFullPath() + "\\..*")
.entrySet()) {
proposedConf.unset(confRemove.getKey());
confUpdate.put(confRemove.getKey(), null);
}
}
private static void addQueue(
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) throws IOException {
if (addInfo == null) {
return;
}
QueuePath queuePath = new QueuePath(addInfo.getQueue());
String queueName = queuePath.getLeafName();
if (queuePath.isRoot() || queuePath.isInvalid()) {
throw new IOException("Can't add invalid queue " + queuePath);
} else if (getSiblingQueues(queuePath, proposedConf).contains(
queueName)) {
throw new IOException("Can't add existing queue " + queuePath);
}
QueuePath parentPath = queuePath.getParentObject();
List<String> siblingQueues = proposedConf.getQueues(parentPath);
siblingQueues.add(queueName);
proposedConf.setQueues(parentPath,
siblingQueues.toArray(new String[0]));
confUpdate.put(getQueuesConfig(parentPath),
Joiner.on(',').join(siblingQueues));
String keyPrefix = QueuePrefixes.getQueuePrefix(queuePath);
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
String keyValue = kv.getValue();
if (keyValue == null || keyValue.isEmpty()) {
proposedConf.unset(keyPrefix + kv.getKey());
confUpdate.put(keyPrefix + kv.getKey(), null);
} else {
proposedConf.set(keyPrefix + kv.getKey(), keyValue);
confUpdate.put(keyPrefix + kv.getKey(), keyValue);
}
}
// Unset Ordering Policy of Parent Queue converted from
// Leaf Queue after addQueue
String queueOrderingPolicy = getOrderingPolicyConfig(parentPath);
if (siblingQueues.size() == 1) {
proposedConf.unset(queueOrderingPolicy);
confUpdate.put(queueOrderingPolicy, null);
}
}
private static void updateQueue(QueueConfigInfo updateInfo,
CapacitySchedulerConfiguration proposedConf,
Map<String, String> confUpdate) {
if (updateInfo == null) {
return;
}
QueuePath queuePath = new QueuePath(updateInfo.getQueue());
String keyPrefix = QueuePrefixes.getQueuePrefix(queuePath);
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
String keyValue = kv.getValue();
if (keyValue == null || keyValue.isEmpty()) {
proposedConf.unset(keyPrefix + kv.getKey());
confUpdate.put(keyPrefix + kv.getKey(), null);
} else {
proposedConf.set(keyPrefix + kv.getKey(), keyValue);
confUpdate.put(keyPrefix + kv.getKey(), keyValue);
}
}
}
private static List<String> getSiblingQueues(QueuePath queuePath, Configuration conf) {
String childQueuesKey = getQueuesConfig(queuePath.getParentObject());
return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
}
private static String getQueuesConfig(QueuePath queuePath) {
return QueuePrefixes.getQueuePrefix(queuePath) + CapacitySchedulerConfiguration.QUEUES;
}
private static String getOrderingPolicyConfig(QueuePath queuePath) {
return QueuePrefixes.getQueuePrefix(queuePath) + CapacitySchedulerConfiguration.ORDERING_POLICY;
}
}