TestEvents.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.jobhistory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestEvents {

  private static final String taskId = "task_1_2_r_3";
  /**
   * test a getters of TaskAttemptFinishedEvent and TaskAttemptFinished
   * 
   * @throws Exception
   */
  @Test
  @Timeout(value = 10)
  public void testTaskAttemptFinishedEvent() throws Exception {

    JobID jid = new JobID("001", 1);
    TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
    TaskAttemptID taskAttemptId = new TaskAttemptID(tid, 3);
    Counters counters = new Counters();
    TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId,
        TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS",
        counters, 234);
    assertThat(test.getAttemptId().toString())
        .isEqualTo(taskAttemptId.toString());
    assertThat(test.getCounters()).isEqualTo(counters);
    assertThat(test.getFinishTime()).isEqualTo(123L);
    assertThat(test.getHostname()).isEqualTo("HOSTNAME");
    assertThat(test.getRackName()).isEqualTo("RAKNAME");
    assertThat(test.getState()).isEqualTo("STATUS");
    assertThat(test.getTaskId()).isEqualTo(tid);
    assertThat(test.getTaskStatus()).isEqualTo("TEST");
    assertThat(test.getTaskType()).isEqualTo(TaskType.REDUCE);
    assertEquals(234, test.getStartTime());
  }

  /**
   * simple test JobPriorityChangeEvent and JobPriorityChange
   * 
   * @throws Exception
   */

  @Test
  @Timeout(value = 10)
  public void testJobPriorityChange() throws Exception {
    org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1);
    JobPriorityChangeEvent test = new JobPriorityChangeEvent(jid,
        JobPriority.LOW);
    assertThat(test.getJobId().toString()).isEqualTo(jid.toString());
    assertThat(test.getPriority()).isEqualTo(JobPriority.LOW);

  }
  
  @Test
  @Timeout(value = 10)
  public void testJobQueueChange() throws Exception {
    org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1);
    JobQueueChangeEvent test = new JobQueueChangeEvent(jid,
        "newqueue");
    assertThat(test.getJobId().toString()).isEqualTo(jid.toString());
    assertThat(test.getJobQueueName()).isEqualTo("newqueue");
  }

  /**
   * simple test TaskUpdatedEvent and TaskUpdated
   * 
   * @throws Exception
   */
  @Test
  @Timeout(value = 10)
  public void testTaskUpdated() throws Exception {
    JobID jid = new JobID("001", 1);
    TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
    TaskUpdatedEvent test = new TaskUpdatedEvent(tid, 1234L);
    assertThat(test.getTaskId().toString()).isEqualTo(tid.toString());
    assertThat(test.getFinishTime()).isEqualTo(1234L);

  }

  /*
   * test EventReader EventReader should read the list of events and return
   * instance of HistoryEvent Different HistoryEvent should have a different
   * datum.
   */
  @Test
  @Timeout(value = 10)
  public void testEvents() throws Exception {

    EventReader reader = new EventReader(new DataInputStream(
        new ByteArrayInputStream(getEvents())));
    HistoryEvent e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.JOB_PRIORITY_CHANGED));
    assertEquals("ID", ((JobPriorityChange) e.getDatum()).getJobid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.JOB_STATUS_CHANGED));
    assertEquals("ID", ((JobStatusChanged) e.getDatum()).getJobid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.TASK_UPDATED));
    assertEquals("ID", ((TaskUpdated) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
    assertEquals(taskId,
        ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.JOB_KILLED));
    assertEquals("ID",
        ((JobUnsuccessfulCompletion) e.getDatum()).getJobid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_STARTED));
    assertEquals(taskId,
        ((TaskAttemptStarted) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_FINISHED));
    assertEquals(taskId,
        ((TaskAttemptFinished) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
    assertEquals(taskId,
        ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
    assertEquals(taskId,
        ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_STARTED));
    assertEquals(taskId,
        ((TaskAttemptStarted) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_FINISHED));
    assertEquals(taskId,
        ((TaskAttemptFinished) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
    assertEquals(taskId,
        ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).getTaskid().toString());

    e = reader.getNextEvent();
    assertTrue(e.getEventType().equals(EventType.REDUCE_ATTEMPT_KILLED));
    assertEquals(taskId,
        ((TaskAttemptUnsuccessfulCompletion) e.getDatum()).getTaskid().toString());

    reader.close();
  }

  /*
   * makes array of bytes with History events
   */
  private byte[] getEvents() throws Exception {
    ByteArrayOutputStream output = new ByteArrayOutputStream();
    FSDataOutputStream fsOutput = new FSDataOutputStream(output,
        new FileSystem.Statistics("scheme"));
    EventWriter writer = new EventWriter(fsOutput,
        EventWriter.WriteMode.JSON);
    writer.write(getJobPriorityChangedEvent());
    writer.write(getJobStatusChangedEvent());
    writer.write(getTaskUpdatedEvent());
    writer.write(getReduceAttemptKilledEvent());
    writer.write(getJobKilledEvent());
    writer.write(getSetupAttemptStartedEvent());
    writer.write(getTaskAttemptFinishedEvent());
    writer.write(getSetupAttemptFieledEvent());
    writer.write(getSetupAttemptKilledEvent());
    writer.write(getCleanupAttemptStartedEvent());
    writer.write(getCleanupAttemptFinishedEvent());
    writer.write(getCleanupAttemptFiledEvent());
    writer.write(getCleanupAttemptKilledEvent());

    writer.flush();
    writer.close();

    return output.toByteArray();
  }

  private FakeEvent getCleanupAttemptKilledEvent() {
    FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_KILLED);

    result.setDatum(getTaskAttemptUnsuccessfulCompletion());
    return result;
  }

  private FakeEvent getCleanupAttemptFiledEvent() {
    FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_FAILED);

    result.setDatum(getTaskAttemptUnsuccessfulCompletion());
    return result;
  }

  private TaskAttemptUnsuccessfulCompletion getTaskAttemptUnsuccessfulCompletion() {
    TaskAttemptUnsuccessfulCompletion datum = new TaskAttemptUnsuccessfulCompletion();
    datum.setAttemptId("attempt_1_2_r3_4_5");
    datum.setClockSplits(Arrays.asList(1, 2, 3));
    datum.setCpuUsages(Arrays.asList(100, 200, 300));
    datum.setError("Error");
    datum.setFinishTime(2L);
    datum.setHostname("hostname");
    datum.setRackname("rackname");
    datum.setPhysMemKbytes(Arrays.asList(1000, 2000, 3000));
    datum.setTaskid(taskId);
    datum.setPort(1000);
    datum.setTaskType("REDUCE");
    datum.setStatus("STATUS");
    datum.setCounters(getCounters());
    datum.setVMemKbytes(Arrays.asList(1000, 2000, 3000));
    return datum;
  }

  private JhCounters getCounters() {
    JhCounters counters = new JhCounters();
    counters.setGroups(new ArrayList<JhCounterGroup>(0));
    counters.setName("name");
    return counters;
  }

  private FakeEvent getCleanupAttemptFinishedEvent() {
    FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_FINISHED);
    TaskAttemptFinished datum = new TaskAttemptFinished();
    datum.setAttemptId("attempt_1_2_r3_4_5");

    datum.setCounters(getCounters());
    datum.setFinishTime(2L);
    datum.setHostname("hostname");
    datum.setRackname("rackName");
    datum.setState("state");
    datum.setTaskid(taskId);
    datum.setTaskStatus("taskStatus");
    datum.setTaskType("REDUCE");
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getCleanupAttemptStartedEvent() {
    FakeEvent result = new FakeEvent(EventType.CLEANUP_ATTEMPT_STARTED);
    TaskAttemptStarted datum = new TaskAttemptStarted();

    datum.setAttemptId("attempt_1_2_r3_4_5");
    datum.setAvataar("avatar");
    datum.setContainerId("containerId");
    datum.setHttpPort(10000);
    datum.setLocality("locality");
    datum.setShufflePort(10001);
    datum.setStartTime(1L);
    datum.setTaskid(taskId);
    datum.setTaskType("taskType");
    datum.setTrackerName("trackerName");
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getSetupAttemptKilledEvent() {
    FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_KILLED);
    result.setDatum(getTaskAttemptUnsuccessfulCompletion());
    return result;
  }

  private FakeEvent getSetupAttemptFieledEvent() {
    FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_FAILED);

    result.setDatum(getTaskAttemptUnsuccessfulCompletion());
    return result;
  }

  private FakeEvent getTaskAttemptFinishedEvent() {
    FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_FINISHED);
    TaskAttemptFinished datum = new TaskAttemptFinished();

    datum.setAttemptId("attempt_1_2_r3_4_5");
    datum.setCounters(getCounters());
    datum.setFinishTime(2L);
    datum.setHostname("hostname");
    datum.setRackname("rackname");
    datum.setState("state");
    datum.setTaskid(taskId);
    datum.setTaskStatus("taskStatus");
    datum.setTaskType("REDUCE");
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getSetupAttemptStartedEvent() {
    FakeEvent result = new FakeEvent(EventType.SETUP_ATTEMPT_STARTED);
    TaskAttemptStarted datum = new TaskAttemptStarted();
    datum.setAttemptId("ID");
    datum.setAvataar("avataar");
    datum.setContainerId("containerId");
    datum.setHttpPort(10000);
    datum.setLocality("locality");
    datum.setShufflePort(10001);
    datum.setStartTime(1L);
    datum.setTaskid(taskId);
    datum.setTaskType("taskType");
    datum.setTrackerName("trackerName");
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getJobKilledEvent() {
    FakeEvent result = new FakeEvent(EventType.JOB_KILLED);
    JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion();
    datum.setFinishedMaps(1);
    datum.setFinishedReduces(2);
    datum.setFinishTime(3L);
    datum.setJobid("ID");
    datum.setJobStatus("STATUS");
    datum.setDiagnostics(JobImpl.JOB_KILLED_DIAG);
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getReduceAttemptKilledEvent() {
    FakeEvent result = new FakeEvent(EventType.REDUCE_ATTEMPT_KILLED);

    result.setDatum(getTaskAttemptUnsuccessfulCompletion());
    return result;
  }

  private FakeEvent getJobPriorityChangedEvent() {
    FakeEvent result = new FakeEvent(EventType.JOB_PRIORITY_CHANGED);
    JobPriorityChange datum = new JobPriorityChange();
    datum.setJobid("ID");
    datum.setPriority("priority");
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getJobStatusChangedEvent() {
    FakeEvent result = new FakeEvent(EventType.JOB_STATUS_CHANGED);
    JobStatusChanged datum = new JobStatusChanged();
    datum.setJobid("ID");
    datum.setJobStatus("newStatus");
    result.setDatum(datum);
    return result;
  }

  private FakeEvent getTaskUpdatedEvent() {
    FakeEvent result = new FakeEvent(EventType.TASK_UPDATED);
    TaskUpdated datum = new TaskUpdated();
    datum.setFinishTime(2L);
    datum.setTaskid("ID");
    result.setDatum(datum);
    return result;
  }

  private class FakeEvent implements HistoryEvent {
    private EventType eventType;
    private Object datum;

    public FakeEvent(EventType eventType) {
      this.eventType = eventType;
    }

    @Override
    public EventType getEventType() {
      return eventType;
    }

    @Override
    public Object getDatum() {

      return datum;
    }

    @Override
    public void setDatum(Object datum) {
      this.datum = datum;
    }

    @Override
    public TimelineEvent toTimelineEvent() {
      return null;
    }

    @Override
    public Set<TimelineMetric> getTimelineMetrics() {
      return null;
    }

  }

}