MockRunningServiceContext.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.provider.ProviderService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Mocked service context for a running service.
 */
public class MockRunningServiceContext extends ServiceContext {

  public MockRunningServiceContext(ServiceTestUtils.ServiceFSWatcher fsWatcher,
      Service serviceDef) throws Exception {
    super();
    this.service = serviceDef;
    this.fs = fsWatcher.getFs();

    ContainerLaunchService mockLaunchService = mock(
        ContainerLaunchService.class);

    this.scheduler = new ServiceScheduler(this) {
      @Override
      protected YarnRegistryViewForProviders
      createYarnRegistryOperations(
          ServiceContext context, RegistryOperations registryClient) {
        return mock(YarnRegistryViewForProviders.class);
      }

      @Override
      public NMClientAsync createNMClient() {
        NMClientAsync nmClientAsync = super.createNMClient();
        NMClient nmClient = mock(NMClient.class);
        try {
          when(nmClient.getContainerStatus(any(), any()))
              .thenAnswer(
                  (Answer<ContainerStatus>) invocation -> ContainerStatus
                      .newInstance((ContainerId) invocation.getArguments()[0],
                          org.apache.hadoop.yarn.api.records.ContainerState
                              .RUNNING,
                          "", 0));
        } catch (YarnException | IOException e) {
          throw new RuntimeException(e);
        }
        nmClientAsync.setClient(nmClient);
        return nmClientAsync;
      }

      @Override
      public ContainerLaunchService getContainerLaunchService() {
        return mockLaunchService;
      }

      @Override
      public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
        return new
            ServiceUtils.ProcessTerminationHandler() {
              public void terminate(int exitCode) {
              }
            };
      }

      @Override
      protected ServiceManager createServiceManager() {
        return ServiceTestUtils.createServiceManager(
            MockRunningServiceContext.this);
      }
    };


    this.scheduler.init(fsWatcher.getConf());
    when(mockLaunchService.launchCompInstance(any(), any(),
        any(), any())).thenAnswer(
        (Answer<Future<ProviderService.ResolvedLaunchParams>>)
            this::launchAndReinitHelper);

    when(mockLaunchService.reInitCompInstance(any(), any(),
        any(), any())).thenAnswer((
        Answer<Future<ProviderService.ResolvedLaunchParams>>)
        this::launchAndReinitHelper);
    stabilizeComponents(this);
  }

  private Future<ProviderService.ResolvedLaunchParams> launchAndReinitHelper(
      InvocationOnMock invocation) throws IOException, SliderException {
    AbstractLauncher launcher = new AbstractLauncher(
        scheduler.getContext());
    ComponentInstance instance = (ComponentInstance)
        invocation.getArguments()[1];
    Container container = (Container) invocation.getArguments()[2];
    ContainerLaunchService.ComponentLaunchContext clc =
        (ContainerLaunchService.ComponentLaunchContext)
            invocation.getArguments()[3];

    ProviderService.ResolvedLaunchParams resolvedParams =
        new ProviderService.ResolvedLaunchParams();
    ProviderUtils.createConfigFileAndAddLocalResource(launcher, fs, clc,
        new HashMap<>(), instance, scheduler.getContext(), resolvedParams);
    ProviderUtils.handleStaticFilesForLocalization(launcher, fs, clc,
        resolvedParams);
    return Futures.immediateFuture(resolvedParams);
  }

  private void stabilizeComponents(ServiceContext context) {

    ApplicationId appId = ApplicationId.fromString(context.service.getId());
    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
    context.attemptId = attemptId;
    Map<String, Component>
        componentState = context.scheduler.getAllComponents();

    int counter = 0;
    for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
        context.service.getComponents()) {
      Component component = new org.apache.hadoop.yarn.service.component.
          Component(componentSpec, 1L, context);
      componentState.put(component.getName(), component);
      component.handle(
          new ComponentEvent(component.getName(), ComponentEventType.FLEX)
              .setDesired(
                  component.getComponentSpec().getNumberOfContainers()));

      for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
        counter++;
        assignNewContainer(attemptId, counter, component);
      }

      component.handle(new ComponentEvent(component.getName(),
          ComponentEventType.CHECK_STABLE));
    }
  }

  public void assignNewContainer(ApplicationAttemptId attemptId,
      long containerNum, Component component) {

    Container container = org.apache.hadoop.yarn.api.records.Container
        .newInstance(ContainerId.newContainerId(attemptId, containerNum),
            NODE_ID, "localhost", null, null,
            null);
    component.handle(new ComponentEvent(component.getName(),
        ComponentEventType.CONTAINER_ALLOCATED)
        .setContainer(container).setContainerId(container.getId()));
    ComponentInstance instance = this.scheduler.getLiveInstances().get(
        container.getId());
    ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
        container.getId(), ComponentInstanceEventType.START);
    instance.handle(startEvent);

    ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
        container.getId(), ComponentInstanceEventType.BECOME_READY);
    instance.handle(readyEvent);
  }

  private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
}