BaseContainerSchedulerTest.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.TestContainerSchedulerQueuing;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.mockito.Mockito.spy;
/**
* Base test class that overrides the behavior of
* {@link ContainerStateTransitionListener} for testing
* the {@link ContainerScheduler}.
*/
public class BaseContainerSchedulerTest extends BaseContainerManagerTest {
private static final long TWO_GB = 2048 * 1024 * 1024L;
public BaseContainerSchedulerTest() throws UnsupportedFileSystemException {
super();
}
static {
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
}
public static class Listener implements ContainerStateTransitionListener {
private final Map<ContainerId, List<ContainerState>> states =
new HashMap<>();
private final Map<ContainerId, List<ContainerEventType>> events =
new HashMap<>();
public Map<ContainerId, List<ContainerEventType>> getEvents() {
return events;
}
public Map<ContainerId, List<ContainerState>> getStates() {
return states;
}
@Override
public void init(Context context) {}
@Override
public void preTransition(ContainerImpl op,
ContainerState beforeState,
ContainerEvent eventToBeProcessed) {
if (!states.containsKey(op.getContainerId())) {
states.put(op.getContainerId(), new ArrayList<>());
states.get(op.getContainerId()).add(beforeState);
events.put(op.getContainerId(), new ArrayList<>());
}
}
@Override
public void postTransition(ContainerImpl op, ContainerState beforeState,
ContainerState afterState, ContainerEvent processedEvent) {
states.get(op.getContainerId()).add(afterState);
events.get(op.getContainerId()).add(processedEvent.getType());
}
}
private boolean delayContainers = true;
protected void setDelayContainers(final boolean delayContainersParam) {
this.delayContainers = delayContainersParam;
}
@Override
protected ContainerManagerImpl createContainerManager(
DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc,
getNodeStatusUpdater(), metrics, dirsHandler) {
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
.getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
.getKeyId()));
return ugi;
}
@Override
protected ContainersMonitor createContainersMonitor(
ContainerExecutor exec) {
return new ContainersMonitorImpl(exec, getDispatcher(), this.context) {
// Define resources available for containers to be executed.
@Override
public long getPmemAllocatedForContainers() {
return TWO_GB;
}
@Override
public long getVmemAllocatedForContainers() {
float pmemRatio = getConfig().getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
return (long) (pmemRatio * getPmemAllocatedForContainers());
}
@Override
public long getVCoresAllocatedForContainers() {
return 4;
}
};
}
};
}
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
ConcurrentMap<String, Boolean> oversleepMap =
new ConcurrentHashMap<String, Boolean>();
/**
* Launches the container.
* If delayContainers is turned on, then we sleep a while before
* starting the container.
*/
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
final String containerId =
ctx.getContainer().getContainerId().toString();
oversleepMap.put(containerId, false);
if (delayContainers) {
try {
Thread.sleep(10000);
if (oversleepMap.get(containerId)) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
@Override
public void pauseContainer(Container container) {
// To mimic pausing we force the container to be in the PAUSED state
// a little longer by oversleeping.
oversleepMap.put(container.getContainerId().toString(), true);
LOG.info("Container was paused");
}
@Override
public void resumeContainer(Container container) {
LOG.info("Container was resumed");
}
};
exec.setConf(conf);
return spy(exec);
}
}