TestServiceManager.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Tests for {@link ServiceManager}.
*/
public class TestServiceManager {
@RegisterExtension
private ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
@Timeout(value = TIMEOUT)
public void testUpgrade() throws Exception {
ServiceContext context = createServiceContext("testUpgrade");
initUpgrade(context, "v2", false, false, false);
assertEquals(ServiceState.UPGRADING,
context.getServiceManager().getServiceSpec().getState(), "service not upgraded");
}
@Test
@Timeout(value = TIMEOUT)
public void testRestartNothingToUpgrade()
throws Exception {
ServiceContext context = createServiceContext(
"testRestartNothingToUpgrade");
initUpgrade(context, "v2", false, false, false);
ServiceManager manager = context.getServiceManager();
//make components stable by upgrading all instances
upgradeAndReadyAllInstances(context);
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.START));
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE,
manager.getServiceSpec().getState(), "service not re-started");
}
@Test
@Timeout(value = TIMEOUT)
public void testAutoFinalizeNothingToUpgrade() throws Exception {
ServiceContext context = createServiceContext(
"testAutoFinalizeNothingToUpgrade");
initUpgrade(context, "v2", false, true, false);
ServiceManager manager = context.getServiceManager();
//make components stable by upgrading all instances
upgradeAndReadyAllInstances(context);
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE,
manager.getServiceSpec().getState(), "service stable");
}
@Test
@Timeout(value = TIMEOUT)
public void testRestartWithPendingUpgrade()
throws Exception {
ServiceContext context = createServiceContext("testRestart");
initUpgrade(context, "v2", true, false, false);
ServiceManager manager = context.getServiceManager();
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.START));
context.scheduler.getDispatcher().stop();
assertEquals(ServiceState.UPGRADING,
manager.getServiceSpec().getState(), "service should still be upgrading");
}
@Test
@Timeout(value = TIMEOUT)
public void testFinalize() throws Exception {
ServiceContext context = createServiceContext("testCheckState");
initUpgrade(context, "v2", true, false, false);
ServiceManager manager = context.getServiceManager();
assertEquals(ServiceState.UPGRADING,
manager.getServiceSpec().getState(), "service not upgrading");
//make components stable by upgrading all instances
upgradeAndReadyAllInstances(context);
// finalize service
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.START));
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE,
manager.getServiceSpec().getState(), "service not re-started");
validateUpgradeFinalization(manager.getName(), "v2");
}
@Test
@Timeout(value = TIMEOUT)
public void testAutoFinalize() throws Exception {
ServiceContext context = createServiceContext("testCheckStateAutoFinalize");
ServiceManager manager = context.getServiceManager();
manager.getServiceSpec().setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
initUpgrade(context, "v2", true, true, false);
// make components stable
upgradeAndReadyAllInstances(context);
GenericTestUtils.waitFor(() ->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE,
manager.getServiceSpec().getState(), "service not stable");
validateUpgradeFinalization(manager.getName(), "v2");
}
@Test
public void testInvalidUpgrade() throws Exception {
ServiceContext serviceContext = createServiceContext("testInvalidUpgrade");
ServiceManager manager = serviceContext.getServiceManager();
manager.getServiceSpec().setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(manager.getName());
upgradedDef.setVersion("v2");
upgradedDef.setLifetime(2L);
writeUpgradedDef(upgradedDef);
try {
manager.processUpgradeRequest("v2", true, false);
} catch (Exception ex) {
assertTrue(ex instanceof UnsupportedOperationException);
return;
}
fail();
}
@Test
@Timeout(value = TIMEOUT)
public void testExpressUpgrade() throws Exception {
ServiceContext context = createServiceContext("testExpressUpgrade");
ServiceManager manager = context.getServiceManager();
manager.getServiceSpec().setState(ServiceState.EXPRESS_UPGRADING);
initUpgrade(context, "v2", true, true, true);
List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
// wait till instances of first component are upgraded and ready
String compA = comps.get(0);
makeInstancesReadyAfterUpgrade(context, compA);
// wait till instances of second component are upgraded and ready
String compB = comps.get(1);
makeInstancesReadyAfterUpgrade(context, compB);
GenericTestUtils.waitFor(() ->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE, manager.getServiceSpec().getState(),
"service not stable");
validateUpgradeFinalization(manager.getName(), "v2");
}
@Test
@Timeout(value = TIMEOUT)
public void testCancelUpgrade() throws Exception {
ServiceContext context = createServiceContext("testCancelUpgrade");
writeInitialDef(context.service);
initUpgrade(context, "v2", true, false, false);
ServiceManager manager = context.getServiceManager();
assertEquals(ServiceState.UPGRADING,
manager.getServiceSpec().getState(), "service not upgrading");
List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
// wait till instances of first component are upgraded and ready
String compA = comps.get(0);
// upgrade the instances
upgradeInstances(context, compA);
makeInstancesReadyAfterUpgrade(context, compA);
// cancel upgrade
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
makeInstancesReadyAfterUpgrade(context, compA);
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE,
manager.getServiceSpec().getState(), "service upgrade not cancelled");
validateUpgradeFinalization(manager.getName(), "v1");
}
@Test
@Timeout(value = TIMEOUT)
public void testCancelUpgradeAfterInitiate() throws Exception {
ServiceContext context = createServiceContext("testCancelUpgrade");
writeInitialDef(context.service);
initUpgrade(context, "v2", true, false, false);
ServiceManager manager = context.getServiceManager();
assertEquals(ServiceState.UPGRADING,
manager.getServiceSpec().getState(), "service not upgrading");
// cancel upgrade
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
assertEquals(ServiceState.STABLE,
manager.getServiceSpec().getState(), "service upgrade not cancelled");
validateUpgradeFinalization(manager.getName(), "v1");
}
private void validateUpgradeFinalization(String serviceName,
String expectedVersion) throws IOException {
Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
assertEquals(expectedVersion,
savedSpec.getVersion(), "service def not re-written");
assertNotNull(savedSpec.getId(), "app id not present");
assertEquals(ServiceState.STABLE,
savedSpec.getState(), "state not stable");
savedSpec.getComponents().forEach(compSpec ->
assertEquals(ComponentState.STABLE,
compSpec.getState(), "comp not stable"));
}
private void initUpgrade(ServiceContext context, String version,
boolean upgradeArtifact, boolean autoFinalize, boolean expressUpgrade)
throws IOException, SliderException, TimeoutException,
InterruptedException {
ServiceManager serviceManager = context.getServiceManager();
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(serviceManager.getName());
upgradedDef.setVersion(version);
if (upgradeArtifact) {
Artifact upgradedArtifact = createTestArtifact("2");
upgradedDef.getComponents().forEach(component -> {
component.setArtifact(upgradedArtifact);
});
}
writeUpgradedDef(upgradedDef);
serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
GenericTestUtils.waitFor(() -> {
for (Component comp : context.scheduler.getAllComponents().values()) {
if (!comp.getComponentSpec().getState().equals(
ComponentState.NEEDS_UPGRADE)) {
return false;
}
}
return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
private void upgradeAndReadyAllInstances(ServiceContext context) throws
TimeoutException, InterruptedException {
upgradeAllInstances(context);
makeAllInstancesReady(context);
}
private void upgradeAllInstances(ServiceContext context) throws
TimeoutException, InterruptedException {
// upgrade the instances
context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
ComponentInstanceEventType.UPGRADE);
context.scheduler.getDispatcher().getEventHandler().handle(event);
}));
}
private void makeAllInstancesReady(ServiceContext context)
throws TimeoutException, InterruptedException {
context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
containerId, ComponentInstanceEventType.START);
context.scheduler.getDispatcher().getEventHandler().handle(startEvent);
ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
containerId, ComponentInstanceEventType.BECOME_READY);
context.scheduler.getDispatcher().getEventHandler().handle(
becomeReadyEvent);
}));
GenericTestUtils.waitFor(()-> {
for (ComponentInstance instance:
context.scheduler.getLiveInstances().values()) {
if (!instance.getContainerState().equals(ContainerState.READY)) {
return false;
}
}
return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
private void upgradeInstances(ServiceContext context, String compName) {
Collection<ComponentInstance> compInstances = context.scheduler
.getAllComponents().get(compName).getAllComponentInstances();
compInstances.forEach(instance -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.UPGRADE);
context.scheduler.getDispatcher().getEventHandler().handle(event);
});
}
private void makeInstancesReadyAfterUpgrade(ServiceContext context,
String compName)
throws TimeoutException, InterruptedException {
Collection<ComponentInstance> compInstances = context.scheduler
.getAllComponents().get(compName).getAllComponentInstances();
GenericTestUtils.waitFor(() -> {
for (ComponentInstance instance : compInstances) {
if (!instance.getContainerState().equals(ContainerState.UPGRADING)) {
return false;
}
}
return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
// instances of comp1 get upgraded and become ready event is triggered
// become ready
compInstances.forEach(instance -> {
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.START);
context.scheduler.getDispatcher().getEventHandler().handle(startEvent);
ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY);
context.scheduler.getDispatcher().getEventHandler().handle(
becomeReadyEvent);
});
GenericTestUtils.waitFor(() -> {
for (ComponentInstance instance : compInstances) {
if (!instance.getContainerState().equals(ContainerState.READY)) {
return false;
}
}
return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
private ServiceContext createServiceContext(String name)
throws Exception {
Service service = createBaseDef(name);
ServiceContext context = new MockRunningServiceContext(rule,
service);
context.scheduler.getDispatcher().setDrainEventsOnStop();
context.scheduler.getDispatcher().start();
return context;
}
public static Service createBaseDef(String name) {
return createDef(name, ServiceTestUtils.createExampleApplication());
}
public static Service createDef(String name, Service serviceDef) {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
serviceDef.setId(applicationId.toString());
serviceDef.setName(name);
serviceDef.setState(ServiceState.STARTED);
Artifact artifact = createTestArtifact("1");
serviceDef.getComponents().forEach(component ->
component.setArtifact(artifact));
return serviceDef;
}
static Artifact createTestArtifact(String artifactId) {
Artifact artifact = new Artifact();
artifact.setId(artifactId);
artifact.setType(Artifact.TypeEnum.TARBALL);
return artifact;
}
private void writeInitialDef(Service service)
throws IOException, SliderException {
Path servicePath = rule.getFs().buildClusterDirPath(
service.getName());
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), servicePath,
service);
}
private void writeUpgradedDef(Service upgradedDef)
throws IOException, SliderException {
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
upgradedDef.getName(), upgradedDef.getVersion());
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
upgradedDef);
}
private static final int TIMEOUT = 10000;
private static final int CHECK_EVERY_MILLIS = 100;
}