TestComponent.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service.component;

import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

import java.util.Iterator;

import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;

import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants
    .CONTAINER_STATE_REPORT_AS_SERVICE_STATE;

/**
 * Tests for {@link Component}.
 */
public class TestComponent {

  static final Logger LOG = Logger.getLogger(TestComponent.class);

  @Rule
  public ServiceTestUtils.ServiceFSWatcher rule =
      new ServiceTestUtils.ServiceFSWatcher();

  @Test
  public void testComponentUpgrade() throws Exception {
    ServiceContext context = createTestContext(rule, "testComponentUpgrade");
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();

    ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
        ComponentEventType.UPGRADE);
    comp.handle(upgradeEvent);
    Assert.assertEquals("component not in need upgrade state",
        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
  }

  @Test
  public void testCheckState() throws Exception {
    String serviceName = "testCheckState";
    ServiceContext context = createTestContext(rule, serviceName);
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();

    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
            "val1")).setUpgradeVersion("v2"));

    // one instance finished upgrading
    comp.getUpgradeStatus().decContainersThatNeedUpgrade();
    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));
    Assert.assertEquals("component not in need upgrade state",
        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());

    // second instance finished upgrading
    comp.getUpgradeStatus().decContainersThatNeedUpgrade();
    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in stable state",
        ComponentState.STABLE, comp.getComponentSpec().getState());
    Assert.assertEquals("component did not upgrade successfully", "val1",
        comp.getComponentSpec().getConfiguration().getEnv("key1"));
  }

  @Test
  public void testContainerCompletedWhenUpgrading() throws Exception {
    String serviceName = "testContainerCompletedWhenUpgrading";
    MockRunningServiceContext context = createTestContext(rule, serviceName);
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();

    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
            "val1")).setUpgradeVersion("v2"));
    comp.getAllComponentInstances().forEach(instance ->
        instance.handle(new ComponentInstanceEvent(
        instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)));

    // reinitialization of a container failed
    for(ComponentInstance instance : comp.getAllComponentInstances()) {
      ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
          ComponentEventType.CONTAINER_COMPLETED)
          .setInstance(instance)
          .setContainerId(instance.getContainer().getId());
      comp.handle(stopEvent);
      instance.handle(new ComponentInstanceEvent(
          instance.getContainer().getId(), STOP));
    }
    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in needs upgrade state",
        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
  }

  @Test
  public void testCancelUpgrade() throws Exception {
    ServiceContext context = createTestContext(rule, "testCancelUpgrade");
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();

    ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
        ComponentEventType.CANCEL_UPGRADE);
    comp.handle(upgradeEvent);
    Assert.assertEquals("component not in need upgrade state",
        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());

    Assert.assertEquals(
        org.apache.hadoop.yarn.service.component.ComponentState
            .CANCEL_UPGRADING, comp.getState());
  }

  @Test
  public void testContainerCompletedCancelUpgrade() throws Exception {
    String serviceName = "testContainerCompletedCancelUpgrade";
    MockRunningServiceContext context = createTestContext(rule, serviceName);
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();

    // upgrade completes
    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
            "val1")).setUpgradeVersion("v2"));
    comp.getAllComponentInstances().forEach(instance ->
        instance.handle(new ComponentInstanceEvent(
            instance.getContainer().getId(),
            ComponentInstanceEventType.UPGRADE)));

    // reinitialization of a container done
    for(ComponentInstance instance : comp.getAllComponentInstances()) {
      instance.handle(new ComponentInstanceEvent(
          instance.getContainer().getId(), START));
      instance.handle(new ComponentInstanceEvent(
          instance.getContainer().getId(), BECOME_READY));
    }

    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CANCEL_UPGRADE)
        .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
            "val0")).setUpgradeVersion("v1"));
    comp.getAllComponentInstances().forEach(instance ->
        instance.handle(new ComponentInstanceEvent(
            instance.getContainer().getId(),
            ComponentInstanceEventType.CANCEL_UPGRADE)));

    Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
        .iterator();

    // cancel upgrade failed of a container
    ComponentInstance instance1 = iter.next();
    ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
        ComponentEventType.CONTAINER_COMPLETED)
        .setInstance(instance1)
        .setContainerId(instance1.getContainer().getId());
    comp.handle(stopEvent);
    instance1.handle(new ComponentInstanceEvent(
        instance1.getContainer().getId(), STOP));
    Assert.assertEquals(
        org.apache.hadoop.yarn.service.component.ComponentState
            .CANCEL_UPGRADING, comp.getState());

    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in needs upgrade state",
        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
    Assert.assertEquals(
        org.apache.hadoop.yarn.service.component.ComponentState
            .CANCEL_UPGRADING, comp.getState());

    // second instance finished upgrading
    ComponentInstance instance2 = iter.next();
    instance2.handle(new ComponentInstanceEvent(
        instance2.getContainer().getId(), ComponentInstanceEventType.START));
    instance2.handle(new ComponentInstanceEvent(
        instance2.getContainer().getId(),
        ComponentInstanceEventType.BECOME_READY));

    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in flexing state",
        ComponentState.FLEXING, comp.getComponentSpec().getState());
    // new container get allocated
    context.assignNewContainer(context.attemptId, 10, comp);

    comp.handle(new ComponentEvent(comp.getName(),
            ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in stable state",
        ComponentState.STABLE, comp.getComponentSpec().getState());
    Assert.assertEquals("cancel upgrade failed", "val0",
        comp.getComponentSpec().getConfiguration().getEnv("key1"));
  }

  @Test
  public void testCancelUpgradeSuccessWhileUpgrading() throws Exception {
    String serviceName = "testCancelUpgradeWhileUpgrading";
    MockRunningServiceContext context = createTestContext(rule, serviceName);
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();
    cancelUpgradeWhileUpgrading(context, comp);

    // cancel upgrade successful for both instances
    for(ComponentInstance instance : comp.getAllComponentInstances()) {
      instance.handle(new ComponentInstanceEvent(
          instance.getContainer().getId(),
          ComponentInstanceEventType.START));
      instance.handle(new ComponentInstanceEvent(
          instance.getContainer().getId(),
          ComponentInstanceEventType.BECOME_READY));
    }

    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in stable state",
        ComponentState.STABLE, comp.getComponentSpec().getState());
    Assert.assertEquals("cancel upgrade failed", "val0",
        comp.getComponentSpec().getConfiguration().getEnv("key1"));
  }

  @Test
  public void testCancelUpgradeFailureWhileUpgrading() throws Exception {
    String serviceName = "testCancelUpgradeFailureWhileUpgrading";
    MockRunningServiceContext context = createTestContext(rule, serviceName);
    Component comp = context.scheduler.getAllComponents().entrySet().iterator()
        .next().getValue();
    cancelUpgradeWhileUpgrading(context, comp);

    // cancel upgrade failed for both instances
    for(ComponentInstance instance : comp.getAllComponentInstances()) {
      instance.handle(new ComponentInstanceEvent(
          instance.getContainer().getId(),
          ComponentInstanceEventType.STOP));
    }
    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in flexing state",
        ComponentState.FLEXING, comp.getComponentSpec().getState());

    for (ComponentInstance instance : comp.getAllComponentInstances()) {
      // new container get allocated
      context.assignNewContainer(context.attemptId, 10, comp);
    }

    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in stable state",
        ComponentState.STABLE, comp.getComponentSpec().getState());
    Assert.assertEquals("cancel upgrade failed", "val0",
        comp.getComponentSpec().getConfiguration().getEnv("key1"));
  }

  private void cancelUpgradeWhileUpgrading(
      MockRunningServiceContext context, Component comp)
      throws Exception {

    comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
        .setTargetSpec(createSpecWithEnv(context.service.getName(),
            comp.getName(), "key1", "val1")).setUpgradeVersion("v0"));

    Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
        .iterator();

    ComponentInstance instance1 = iter.next();

    // instance1 is triggered to upgrade
    instance1.handle(new ComponentInstanceEvent(
        instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE));

    // component upgrade is cancelled
    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CANCEL_UPGRADE)
        .setTargetSpec(createSpecWithEnv(context.service.getName(),
            comp.getName(), "key1",
            "val0")).setUpgradeVersion("v0"));

    // all instances upgrade is cancelled.
    comp.getAllComponentInstances().forEach(instance ->
        instance.handle(new ComponentInstanceEvent(
            instance.getContainer().getId(),
            ComponentInstanceEventType.CANCEL_UPGRADE)));

    // regular upgrade failed for instance 1
    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1)
        .setContainerId(instance1.getContainer().getId()));
    instance1.handle(new ComponentInstanceEvent(
        instance1.getContainer().getId(), STOP));

    // component should be in cancel upgrade
    Assert.assertEquals(
        org.apache.hadoop.yarn.service.component.ComponentState
            .CANCEL_UPGRADING, comp.getState());

    comp.handle(new ComponentEvent(comp.getName(),
        ComponentEventType.CHECK_STABLE));

    Assert.assertEquals("component not in needs upgrade state",
        ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
    Assert.assertEquals(
        org.apache.hadoop.yarn.service.component.ComponentState
            .CANCEL_UPGRADING, comp.getState());
  }

  @Test
  public void testComponentStateReachesStableStateWithTerminatingComponents()
      throws
      Exception {
    final String serviceName =
        "testComponentStateUpdatesWithTerminatingComponents";

    Service testService = ServiceTestUtils.createTerminatingJobExample(
        serviceName);
    TestServiceManager.createDef(serviceName, testService);

    ServiceContext context = new MockRunningServiceContext(rule, testService);

    for (Component comp : context.scheduler.getAllComponents().values()) {

      Iterator<ComponentInstance> instanceIter = comp.
          getAllComponentInstances().iterator();

      ComponentInstance componentInstance = instanceIter.next();
      Container instanceContainer = componentInstance.getContainer();

      Assert.assertEquals(0, comp.getNumSucceededInstances());
      Assert.assertEquals(0, comp.getNumFailedInstances());
      Assert.assertEquals(2, comp.getNumRunningInstances());
      Assert.assertEquals(2, comp.getNumReadyInstances());
      Assert.assertEquals(0, comp.getPendingInstances().size());

      //stop 1 container
      ContainerStatus containerStatus = ContainerStatus.newInstance(
          instanceContainer.getId(),
          org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
          "successful", 0);
      comp.handle(new ComponentEvent(comp.getName(),
          ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
          .setContainerId(instanceContainer.getId()));
      componentInstance.handle(
          new ComponentInstanceEvent(componentInstance.getContainer().getId(),
              ComponentInstanceEventType.STOP).setStatus(containerStatus));

      Assert.assertEquals(1, comp.getNumSucceededInstances());
      Assert.assertEquals(0, comp.getNumFailedInstances());
      Assert.assertEquals(1, comp.getNumRunningInstances());
      Assert.assertEquals(1, comp.getNumReadyInstances());
      Assert.assertEquals(0, comp.getPendingInstances().size());

      org.apache.hadoop.yarn.service.component.ComponentState componentState =
          Component.checkIfStable(comp);
      Assert.assertEquals(
          org.apache.hadoop.yarn.service.component.ComponentState.STABLE,
          componentState);
    }
  }

  @Test
  public void testComponentStateUpdatesWithTerminatingComponents()
      throws
      Exception {
    final String serviceName =
        "testComponentStateUpdatesWithTerminatingComponents";

    Service testService = ServiceTestUtils.createTerminatingJobExample(
        serviceName);
    TestServiceManager.createDef(serviceName, testService);

    ServiceContext context = new MockRunningServiceContext(rule, testService);

    for (Component comp : context.scheduler.getAllComponents().values()) {
      Iterator<ComponentInstance> instanceIter = comp.
          getAllComponentInstances().iterator();

      while (instanceIter.hasNext()) {

        ComponentInstance componentInstance = instanceIter.next();
        Container instanceContainer = componentInstance.getContainer();

        //stop 1 container
        ContainerStatus containerStatus = ContainerStatus.newInstance(
            instanceContainer.getId(),
            org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
            "successful", 0);
        comp.handle(new ComponentEvent(comp.getName(),
            ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
            .setContainerId(instanceContainer.getId()));
        componentInstance.handle(
            new ComponentInstanceEvent(componentInstance.getContainer().getId(),
                ComponentInstanceEventType.STOP).setStatus(containerStatus));
      }

      ComponentState componentState =
          comp.getComponentSpec().getState();
      Assert.assertEquals(
          ComponentState.SUCCEEDED,
          componentState);
    }

    ServiceState serviceState =
        testService.getState();
    Assert.assertEquals(
        ServiceState.SUCCEEDED,
        serviceState);
  }

  @Test
  public void testComponentStateUpdatesWithTerminatingDominantComponents()
      throws Exception {
    final String serviceName =
        "testComponentStateUpdatesWithTerminatingServiceStateComponents";

    Service testService =
        ServiceTestUtils.createTerminatingDominantComponentJobExample(
            serviceName);
    TestServiceManager.createDef(serviceName, testService);

    ServiceContext context = new MockRunningServiceContext(rule, testService);

    for (Component comp : context.scheduler.getAllComponents().values()) {
      boolean componentIsDominant = comp.getComponentSpec()
          .getConfiguration().getPropertyBool(
              CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false);
      if (componentIsDominant) {
        Iterator<ComponentInstance> instanceIter = comp.
            getAllComponentInstances().iterator();

        while (instanceIter.hasNext()) {

          ComponentInstance componentInstance = instanceIter.next();
          Container instanceContainer = componentInstance.getContainer();

          //stop 1 container
          ContainerStatus containerStatus = ContainerStatus.newInstance(
              instanceContainer.getId(),
              org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
              "successful", 0);
          comp.handle(new ComponentEvent(comp.getName(),
              ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
              .setContainerId(instanceContainer.getId()));
          componentInstance.handle(
              new ComponentInstanceEvent(componentInstance.getContainer().
                  getId(), ComponentInstanceEventType.STOP).
                  setStatus(containerStatus));
        }
        ComponentState componentState =
            comp.getComponentSpec().getState();
        Assert.assertEquals(
            ComponentState.SUCCEEDED,
            componentState);
      }
    }

    ServiceState serviceState =
        testService.getState();
    Assert.assertEquals(
        ServiceState.SUCCEEDED,
        serviceState);
  }

  private static org.apache.hadoop.yarn.service.api.records.Component
      createSpecWithEnv(String serviceName, String compName, String key,
      String val) {
    Service service = TestServiceManager.createBaseDef(serviceName);
    org.apache.hadoop.yarn.service.api.records.Component spec =
        service.getComponent(compName);
    spec.getConfiguration().getEnv().put(key, val);
    return spec;
  }

  public static MockRunningServiceContext createTestContext(
      ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
      throws Exception {
    return new MockRunningServiceContext(fsWatcher,
        TestServiceManager.createBaseDef(serviceName));
  }
}