TestMRAppComponentDependencies.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.v2.app;

import java.io.IOException;

import org.junit.Assert;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Test;

public class TestMRAppComponentDependencies {

  @Test(timeout = 20000)
  public void testComponentStopOrder() throws Exception {
    @SuppressWarnings("resource")
    TestMRApp app = new TestMRApp(1, 1, true, this.getClass().getName(), true);
    JobImpl job = (JobImpl) app.submit(new Configuration());
    app.waitForState(job, JobState.SUCCEEDED);
    app.verifyCompleted();

    int waitTime = 20 * 1000;
    while (waitTime > 0 && app.numStops < 2) {
      Thread.sleep(100);
      waitTime -= 100;
    }

    // assert JobHistoryEventHandlerStopped and then clientServiceStopped
    Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
    Assert.assertEquals(2, app.clientServiceStopped);
  }

  private final class TestMRApp extends MRApp {
    int JobHistoryEventHandlerStopped;
    int clientServiceStopped;
    int numStops;

    public TestMRApp(int maps, int reduces, boolean autoComplete,
        String testName, boolean cleanOnStart) {
      super(maps, reduces, autoComplete, testName, cleanOnStart);
      JobHistoryEventHandlerStopped = 0;
      clientServiceStopped = 0;
      numStops = 0;
    }

    @Override
    protected Job createJob(Configuration conf, JobStateInternal forcedState,
        String diagnostic) {
      UserGroupInformation currentUser = null;
      try {
        currentUser = UserGroupInformation.getCurrentUser();
      } catch (IOException e) {
        throw new YarnRuntimeException(e);
      }
      Job newJob =
          new TestJob(getJobId(), getAttemptID(), conf, getDispatcher()
            .getEventHandler(), getTaskAttemptListener(), getContext()
            .getClock(), getCommitter(), isNewApiCommitter(),
            currentUser.getUserName(), getContext(), forcedState, diagnostic);
      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);

      getDispatcher().register(JobFinishEvent.Type.class,
        createJobFinishEventHandler());

      return newJob;
    }

    @Override
    protected ClientService createClientService(AppContext context) {
      return new MRClientService(context) {
        @Override
        public void serviceStop() throws Exception {
          numStops++;
          clientServiceStopped = numStops;
          super.serviceStop();
        }
      };
    }

    @Override
    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
        AppContext context) {
      return new JobHistoryEventHandler(context, getStartCount()) {
        @Override
        public void serviceStop() throws Exception {
          numStops++;
          JobHistoryEventHandlerStopped = numStops;
          super.serviceStop();
        }
      };
    }
  }
}