TestDecayRpcScheduler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.ipc;

import static java.lang.Thread.sleep;

import java.util.Map;
import org.eclipse.jetty.util.ajax.JSON;
import org.junit.Test;

import static org.apache.hadoop.ipc.DecayRpcScheduler.IPC_DECAYSCHEDULER_THRESHOLDS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.conf.Configuration;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;

public class TestDecayRpcScheduler {
  private Schedulable mockCall(String id) {
    Schedulable mockCall = mock(Schedulable.class);
    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(id);

    when(mockCall.getUserGroupInformation()).thenReturn(ugi);

    return mockCall;
  }

  private static class TestIdentityProvider implements IdentityProvider {
    public String makeIdentity(Schedulable obj) {
      UserGroupInformation ugi = obj.getUserGroupInformation();
      if (ugi == null) {
        return null;
      }
      return ugi.getShortUserName();
    }
  }

  private static class TestCostProvider implements CostProvider {

    @Override
    public void init(String namespace, Configuration conf) {
      // No-op
    }

    @Override
    public long getCost(ProcessingDetails details) {
      return 1;
    }
  }

  private DecayRpcScheduler scheduler;

  @Test(expected=IllegalArgumentException.class)
  public void testNegativeScheduler() {
    scheduler = new DecayRpcScheduler(-1, "", new Configuration());
  }

  @Test(expected=IllegalArgumentException.class)
  public void testZeroScheduler() {
    scheduler = new DecayRpcScheduler(0, "", new Configuration());
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testParsePeriod() {
    // By default
    scheduler = new DecayRpcScheduler(1, "ipc.1", new Configuration());
    assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
      scheduler.getDecayPeriodMillis());

    // Custom
    Configuration conf = new Configuration();
    conf.setLong("ipc.2." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
      1058);
    scheduler = new DecayRpcScheduler(1, "ipc.2", conf);
    assertEquals(1058L, scheduler.getDecayPeriodMillis());
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testParsePeriodWithPortLessIdentityProvider() {
    // By default
    scheduler = new DecayRpcScheduler(1, "ipc.50", new Configuration());
    assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
        scheduler.getDecayPeriodMillis());

    // Custom
    Configuration conf = new Configuration();
    conf.setLong("ipc.51." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
        1058);
    conf.unset("ipc.51." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY);
    conf.set("ipc." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
        "org.apache.hadoop.ipc.TestDecayRpcScheduler$TestIdentityProvider");
    scheduler = new DecayRpcScheduler(1, "ipc.51", conf);
    assertEquals(1058L, scheduler.getDecayPeriodMillis());
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testParsePeriodWithPortLessCostProvider() {
    // By default
    scheduler = new DecayRpcScheduler(1, "ipc.52", new Configuration());
    assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
        scheduler.getDecayPeriodMillis());

    // Custom
    Configuration conf = new Configuration();
    conf.setLong("ipc.52." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
        1058);
    conf.unset("ipc.52." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY);
    conf.set("ipc." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
        "org.apache.hadoop.ipc.TestDecayRpcScheduler$TestCostProvider");
    scheduler = new DecayRpcScheduler(1, "ipc.52", conf);
    assertEquals(1058L, scheduler.getDecayPeriodMillis());
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testParseFactor() {
    // Default
    scheduler = new DecayRpcScheduler(1, "ipc.3", new Configuration());
    assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT,
      scheduler.getDecayFactor(), 0.00001);

    // Custom
    Configuration conf = new Configuration();
    conf.set("ipc.4." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
      "0.125");
    scheduler = new DecayRpcScheduler(1, "ipc.4", conf);
    assertEquals(0.125, scheduler.getDecayFactor(), 0.00001);
  }

  public void assertEqualDecimalArrays(double[] a, double[] b) {
    assertEquals(a.length, b.length);
    for(int i = 0; i < a.length; i++) {
      assertEquals(a[i], b[i], 0.00001);
    }
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testParseThresholds() {
    // Defaults vary by number of queues
    Configuration conf = new Configuration();
    scheduler = new DecayRpcScheduler(1, "ipc.5", conf);
    assertEqualDecimalArrays(new double[]{}, scheduler.getThresholds());

    scheduler = new DecayRpcScheduler(2, "ipc.6", conf);
    assertEqualDecimalArrays(new double[]{0.5}, scheduler.getThresholds());

    scheduler = new DecayRpcScheduler(3, "ipc.7", conf);
    assertEqualDecimalArrays(new double[]{0.25, 0.5}, scheduler.getThresholds());

    scheduler = new DecayRpcScheduler(4, "ipc.8", conf);
    assertEqualDecimalArrays(new double[]{0.125, 0.25, 0.5}, scheduler.getThresholds());

    // Custom
    conf = new Configuration();
    conf.set("ipc.9." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
      "1, 10, 20, 50, 85");
    scheduler = new DecayRpcScheduler(6, "ipc.9", conf);
    assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds());
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testAccumulate() {
    Configuration conf = new Configuration();
    conf.set("ipc.10." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
        "99999999"); // Never flush
    scheduler = new DecayRpcScheduler(1, "ipc.10", conf);

    assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first

    getPriorityIncrementCallCount("A");
    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());

    getPriorityIncrementCallCount("A");
    getPriorityIncrementCallCount("B");
    getPriorityIncrementCallCount("A");

    assertEquals(3, scheduler.getCallCostSnapshot().get("A").longValue());
    assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testDecay() throws Exception {
    Configuration conf = new Configuration();
    conf.setLong("ipc.11." // Never decay
        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999);
    conf.setDouble("ipc.11."
        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5);
    scheduler = new DecayRpcScheduler(1, "ipc.11", conf);

    assertEquals(0, scheduler.getTotalCallSnapshot());

    for (int i = 0; i < 4; i++) {
      getPriorityIncrementCallCount("A");
    }

    sleep(1000);

    for (int i = 0; i < 8; i++) {
      getPriorityIncrementCallCount("B");
    }

    assertEquals(12, scheduler.getTotalCallSnapshot());
    assertEquals(4, scheduler.getCallCostSnapshot().get("A").longValue());
    assertEquals(8, scheduler.getCallCostSnapshot().get("B").longValue());

    scheduler.forceDecay();

    assertEquals(6, scheduler.getTotalCallSnapshot());
    assertEquals(2, scheduler.getCallCostSnapshot().get("A").longValue());
    assertEquals(4, scheduler.getCallCostSnapshot().get("B").longValue());

    scheduler.forceDecay();

    assertEquals(3, scheduler.getTotalCallSnapshot());
    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
    assertEquals(2, scheduler.getCallCostSnapshot().get("B").longValue());

    scheduler.forceDecay();

    assertEquals(1, scheduler.getTotalCallSnapshot());
    assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
    assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());

    scheduler.forceDecay();

    assertEquals(0, scheduler.getTotalCallSnapshot());
    assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
    assertEquals(null, scheduler.getCallCostSnapshot().get("B"));
  }

  @Test
  @SuppressWarnings("deprecation")
  public void testPriority() throws Exception {
    Configuration conf = new Configuration();
    final String namespace = "ipc.12";
    conf.set(namespace + "." + DecayRpcScheduler
        .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
    conf.set(namespace + "." + DecayRpcScheduler
        .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
    scheduler = new DecayRpcScheduler(4, namespace, conf);

    assertEquals(0, getPriorityIncrementCallCount("A")); // 0 out of 0 calls
    assertEquals(3, getPriorityIncrementCallCount("A")); // 1 out of 1 calls
    assertEquals(0, getPriorityIncrementCallCount("B")); // 0 out of 2 calls
    assertEquals(1, getPriorityIncrementCallCount("B")); // 1 out of 3 calls
    assertEquals(0, getPriorityIncrementCallCount("C")); // 0 out of 4 calls
    assertEquals(0, getPriorityIncrementCallCount("C")); // 1 out of 5 calls
    assertEquals(1, getPriorityIncrementCallCount("A")); // 2 out of 6 calls
    assertEquals(1, getPriorityIncrementCallCount("A")); // 3 out of 7 calls
    assertEquals(2, getPriorityIncrementCallCount("A")); // 4 out of 8 calls
    assertEquals(2, getPriorityIncrementCallCount("A")); // 5 out of 9 calls

    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
    ObjectName mxbeanName = new ObjectName(
        "Hadoop:service="+ namespace + ",name=DecayRpcScheduler");

    String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
    assertTrue("Get expected JMX of CallVolumeSummary before decay",
        cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}"));

    scheduler.forceDecay();

    String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
    assertTrue("Get expected JMX for CallVolumeSummary after decay",
        cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}"));
  }

  @Test(timeout=2000)
  @SuppressWarnings("deprecation")
  public void testPeriodic() throws InterruptedException {
    Configuration conf = new Configuration();
    conf.set(
        "ipc.13." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
    conf.set(
        "ipc.13." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
    scheduler = new DecayRpcScheduler(1, "ipc.13", conf);

    assertEquals(10, scheduler.getDecayPeriodMillis());
    assertEquals(0, scheduler.getTotalCallSnapshot());

    for (int i = 0; i < 64; i++) {
      getPriorityIncrementCallCount("A");
    }

    // It should eventually decay to zero
    while (scheduler.getTotalCallSnapshot() > 0) {
      sleep(10);
    }
  }

  @Test(timeout=60000)
  public void testNPEatInitialization() throws InterruptedException {
    // redirect the LOG to and check if there is NPE message while initializing
    // the DecayRpcScheduler
    PrintStream output = System.out;
    try {
      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
      System.setOut(new PrintStream(bytes));

      // initializing DefaultMetricsSystem here would set "monitoring" flag in
      // MetricsSystemImpl to true
      DefaultMetricsSystem.initialize("NameNode");
      Configuration conf = new Configuration();
      scheduler = new DecayRpcScheduler(1, "ipc.14", conf);
      // check if there is npe in log
      assertFalse(bytes.toString().contains("NullPointerException"));
    } finally {
      //set systout back
      System.setOut(output);
    }
  }

  @Test
  public void testUsingWeightedTimeCostProvider() {
    scheduler = getSchedulerWithWeightedTimeCostProvider(3, "ipc.15");

    // 3 details in increasing order of cost. Although medium has a longer
    // duration, the shared lock is weighted less than the exclusive lock
    ProcessingDetails callDetailsLow =
        new ProcessingDetails(TimeUnit.MILLISECONDS);
    callDetailsLow.set(ProcessingDetails.Timing.LOCKFREE, 1);
    ProcessingDetails callDetailsMedium =
        new ProcessingDetails(TimeUnit.MILLISECONDS);
    callDetailsMedium.set(ProcessingDetails.Timing.LOCKSHARED, 500);
    ProcessingDetails callDetailsHigh =
        new ProcessingDetails(TimeUnit.MILLISECONDS);
    callDetailsHigh.set(ProcessingDetails.Timing.LOCKEXCLUSIVE, 100);

    for (int i = 0; i < 10; i++) {
      scheduler.addResponseTime("ignored", mockCall("LOW"), callDetailsLow);
    }
    scheduler.addResponseTime("ignored", mockCall("MED"), callDetailsMedium);
    scheduler.addResponseTime("ignored", mockCall("HIGH"), callDetailsHigh);

    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
    assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
    assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));

    assertEquals(3, scheduler.getUniqueIdentityCount());
    long totalCallInitial = scheduler.getTotalRawCallVolume();
    assertEquals(totalCallInitial, scheduler.getTotalCallVolume());

    scheduler.forceDecay();

    // Relative priorities should stay the same after a single decay
    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
    assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
    assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));

    assertEquals(3, scheduler.getUniqueIdentityCount());
    assertEquals(totalCallInitial, scheduler.getTotalRawCallVolume());
    assertTrue(scheduler.getTotalCallVolume() < totalCallInitial);

    for (int i = 0; i < 100; i++) {
      scheduler.forceDecay();
    }
    // After enough decay cycles, all callers should be high priority again
    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
    assertEquals(0, scheduler.getPriorityLevel(mockCall("MED")));
    assertEquals(0, scheduler.getPriorityLevel(mockCall("HIGH")));
  }

  @Test
  public void testUsingWeightedTimeCostProviderWithZeroCostCalls() {
    scheduler = getSchedulerWithWeightedTimeCostProvider(2, "ipc.16");

    ProcessingDetails emptyDetails =
        new ProcessingDetails(TimeUnit.MILLISECONDS);

    for (int i = 0; i < 1000; i++) {
      scheduler.addResponseTime("ignored", mockCall("MANY"), emptyDetails);
    }
    scheduler.addResponseTime("ignored", mockCall("FEW"), emptyDetails);

    // Since the calls are all "free", they should have the same priority
    assertEquals(0, scheduler.getPriorityLevel(mockCall("MANY")));
    assertEquals(0, scheduler.getPriorityLevel(mockCall("FEW")));
  }

  @Test
  public void testUsingWeightedTimeCostProviderNoRequests() {
    scheduler = getSchedulerWithWeightedTimeCostProvider(2, "ipc.18");

    assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
  }

  /**
   * Get a scheduler that uses {@link WeightedTimeCostProvider} and has
   * normal decaying disabled.
   */
  private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider(
      int priorityLevels, String ns) {
    Configuration conf = new Configuration();
    conf.setClass(ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
        WeightedTimeCostProvider.class, CostProvider.class);
    conf.setLong(ns + "."
        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
    return new DecayRpcScheduler(priorityLevels, ns, conf);
  }

  /**
   * Get the priority and increment the call count, assuming that
   * {@link DefaultCostProvider} is in use.
   */
  private int getPriorityIncrementCallCount(String callId) {
    Schedulable mockCall = mockCall(callId);
    int priority = scheduler.getPriorityLevel(mockCall);
    // The DefaultCostProvider uses a cost of 1 for all calls, ignoring
    // the processing details, so an empty one is fine
    ProcessingDetails emptyProcessingDetails =
        new ProcessingDetails(TimeUnit.MILLISECONDS);
    scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails);
    return priority;
  }

  /**
   * Test computing priorities and priority cache of users and service-users.
   */
  @Test
  public void testServiceUsersCase1() {
    Configuration conf = new Configuration();
    conf.setLong("ipc.19."
        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
    conf.set("ipc.19." + DecayRpcScheduler.IPC_DECAYSCHEDULER_SERVICE_USERS_KEY,
        "service1,service2");
    scheduler = new DecayRpcScheduler(4, "ipc.19", conf);

    assertTrue(scheduler.getServiceUserNames().contains("service1"));
    assertTrue(scheduler.getServiceUserNames().contains("service2"));

    for (int i = 0; i < 10; i++) {
      getPriorityIncrementCallCount("user1");
      getPriorityIncrementCallCount("service1");
      getPriorityIncrementCallCount("service2");
    }

    assertNotEquals(0, scheduler.getPriorityLevel(mockCall("user1")));
    // The priorities of service users should be always 0.
    assertEquals(0, scheduler.getPriorityLevel(mockCall("service1")));
    assertEquals(0, scheduler.getPriorityLevel(mockCall("service2")));

    // DecayRpcScheduler caches priorities after decay
    scheduler.forceDecay();
    // Check priorities on cache
    String summary = scheduler.getSchedulingDecisionSummary();
    Map<String, Object> summaryMap = (Map<String, Object>) JSON.parse(summary);
    assertNotEquals(0L, summaryMap.get("user1"));
    assertEquals(0L, summaryMap.get("service1"));
    assertEquals(0L, summaryMap.get("service2"));
  }

  /**
   * Test the service users' calls are not included when computing user's call
   * priority.
   */
  @Test
  public void testServiceUsersCase2() {
    final int level = 4;
    Configuration conf = new Configuration();
    conf.setLong("ipc.20."
        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
    conf.set("ipc.20." + DecayRpcScheduler.IPC_DECAYSCHEDULER_SERVICE_USERS_KEY,
        "service");
    conf.set(IPC_DECAYSCHEDULER_THRESHOLDS_KEY, "0.125,0.25,0.5");
    scheduler = new DecayRpcScheduler(level, "ipc.20", conf);

    // test total costs.
    for (int i = 0; i < 10; i++) {
      getPriorityIncrementCallCount("user1");
    }
    for (int i = 0; i < 50; i++) {
      getPriorityIncrementCallCount("service");
    }
    assertEquals(10, scheduler.getTotalCallVolume());
    assertEquals(10, scheduler.getTotalRawCallVolume());
    assertEquals(50, scheduler.getTotalServiceUserCallVolume());
    assertEquals(50, scheduler.getTotalServiceUserRawCallVolume());
    // test priority of normal user.
    assertEquals(level - 1, scheduler.getPriorityLevel(mockCall("user1")));

    // test total costs after decay.
    scheduler.forceDecay();
    assertEquals(5, scheduler.getTotalCallVolume());
    assertEquals(10, scheduler.getTotalRawCallVolume());
    assertEquals(25, scheduler.getTotalServiceUserCallVolume());
    assertEquals(50, scheduler.getTotalServiceUserRawCallVolume());
    // test priority of normal user.
    assertEquals(level - 1, scheduler.getPriorityLevel(mockCall("user1")));

    // test total costs again.
    for (int i = 0; i < 10; i++) {
      getPriorityIncrementCallCount("user1");
    }
    for (int i = 0; i < 50; i++) {
      getPriorityIncrementCallCount("service");
    }
    assertEquals(15, scheduler.getTotalCallVolume());
    assertEquals(20, scheduler.getTotalRawCallVolume());
    assertEquals(75, scheduler.getTotalServiceUserCallVolume());
    assertEquals(100, scheduler.getTotalServiceUserRawCallVolume());
    // test priority of normal user.
    assertEquals(level - 1, scheduler.getPriorityLevel(mockCall("user1")));
  }
}