TestRollingWindowManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode.top.window;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
import static org.junit.Assert.assertEquals;

public class TestRollingWindowManager {

  Configuration conf;
  RollingWindowManager manager;
  String[] users;
  final static int MIN_2_MS = 60000;

  final int WINDOW_LEN_MS = 1 * MIN_2_MS;
  final int BUCKET_CNT = 10;
  final int N_TOP_USERS = 10;
  final int BUCKET_LEN = WINDOW_LEN_MS / BUCKET_CNT;

  @Before
  public void init() {
    conf = new Configuration();
    conf.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, BUCKET_CNT);
    conf.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
    manager = new RollingWindowManager(conf, WINDOW_LEN_MS);
    users = new String[2 * N_TOP_USERS];
    for (int i = 0; i < users.length; i++) {
      users[i] = "user" + i;
    }
  }

  @Test
  public void testTops() throws Exception {
    long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2;
    for (int i = 0; i < users.length; i++)
      manager.recordMetric(time, "open", users[i], (i + 1) * 2);
    time++;
    for (int i = 0; i < users.length; i++)
      manager.recordMetric(time, "close", users[i], i + 1);
    time++;
    TopWindow tops = manager.snapshot(time);

    assertEquals("Unexpected number of ops", 3, tops.getOps().size());
    assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
    for (Op op : tops.getOps()) {
      final List<User> topUsers = op.getTopUsers();
      assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
      if (op.getOpType().equals("open")) {
        for (int i = 0; i < topUsers.size(); i++) {
          User user = topUsers.get(i);
          assertEquals("Unexpected count for user " + user.getUser(),
              (users.length-i)*2, user.getCount());
        }
        // Closed form of sum(range(2,42,2))
        assertEquals("Unexpected total count for op", 
            (2+(users.length*2))*(users.length/2),
            op.getTotalCount());
      }
    }

    // move the window forward not to see the "open" results
    time += WINDOW_LEN_MS - 2;
    tops = manager.snapshot(time);
    assertEquals("Unexpected number of ops", 2, tops.getOps().size());
    assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType());
    final Op op = tops.getOps().get(1);
    assertEquals("Should only see close ops", "close", op.getOpType());
    final List<User> topUsers = op.getTopUsers();
    for (int i = 0; i < topUsers.size(); i++) {
      User user = topUsers.get(i);
      assertEquals("Unexpected count for user " + user.getUser(),
          (users.length-i), user.getCount());
    }
    // Closed form of sum(range(1,21))
    assertEquals("Unexpected total count for op",
        (1 + users.length) * (users.length / 2), op.getTotalCount());
  }

  @Test
  public void windowReset() throws Exception {
    Configuration config = new Configuration();
    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
    int period = 2;
    RollingWindowManager rollingWindowManager =
        new RollingWindowManager(config, period);
    rollingWindowManager.recordMetric(0, "op1", users[0], 3);
    checkValues(rollingWindowManager, 0, "op1", 3, 3);
    checkValues(rollingWindowManager, period - 1, "op1", 3, 3);
    checkValues(rollingWindowManager, period, "op1", 0, 0);
  }

  @Test
  public void testTotal() throws Exception {
    Configuration config = new Configuration();
    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
    int period = 10;
    RollingWindowManager rollingWindowManager =
        new RollingWindowManager(config, period);

    long t = 0;
    rollingWindowManager.recordMetric(t, "op1", users[0], 3);
    checkValues(rollingWindowManager, t, "op1", 3, 3);

    // both should have a value.
    t = (long)(period * .5);
    rollingWindowManager.recordMetric(t, "op2", users[0], 4);
    checkValues(rollingWindowManager, t, "op1", 3, 7);
    checkValues(rollingWindowManager, t, "op2", 4, 7);

    // neither should reset.
    t = period - 1;
    checkValues(rollingWindowManager, t, "op1", 3, 7);
    checkValues(rollingWindowManager, t, "op2", 4, 7);

    // op1 should reset in its next period, but not op2.
    t = period;
    rollingWindowManager.recordMetric(10, "op1", users[0], 10);
    checkValues(rollingWindowManager, t, "op1", 10, 14);
    checkValues(rollingWindowManager, t, "op2", 4, 14);

    // neither should reset.
    t = (long)(period * 1.25);
    rollingWindowManager.recordMetric(t, "op2", users[0], 7);
    checkValues(rollingWindowManager, t, "op1", 10, 21);
    checkValues(rollingWindowManager, t, "op2", 11, 21);

    // op2 should reset.
    t = (long)(period * 1.5);
    rollingWindowManager.recordMetric(t, "op2", users[0], 13);
    checkValues(rollingWindowManager, t, "op1", 10, 23);
    checkValues(rollingWindowManager, t, "op2", 13, 23);
  }

  @Test
  public void testWithFuzzing() throws Exception {
    Configuration config = new Configuration();
    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS);
    int period = users.length/2;
    RollingWindowManager rollingWindowManager =
        new RollingWindowManager(config, period);

    String[] ops = {"op1", "op2", "op3", "op4"};
    Random rand = new Random();
    for (int i=0; i < 10000; i++) {
      rollingWindowManager.recordMetric(i, ops[rand.nextInt(ops.length)],
          users[rand.nextInt(users.length)],
          rand.nextInt(100));
      TopWindow window = rollingWindowManager.snapshot(i);
      checkTotal(window);
    }
  }

  @Test
  public void testOpTotal() throws Exception {
    int numTopUsers = 2;
    Configuration config = new Configuration();
    config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1);
    config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, numTopUsers);
    int period = users.length/2;
    RollingWindowManager rollingWindowManager =
        new RollingWindowManager(config, period);

    int numOps = 3;
    rollingWindowManager.recordMetric(0, "op1", "user1", 10);
    rollingWindowManager.recordMetric(0, "op1", "user2", 20);
    rollingWindowManager.recordMetric(0, "op1", "user3", 30);

    rollingWindowManager.recordMetric(0, "op2", "user1", 1);
    rollingWindowManager.recordMetric(0, "op2", "user4", 40);
    rollingWindowManager.recordMetric(0, "op2", "user5", 50);

    rollingWindowManager.recordMetric(0, "op3", "user6", 1);
    rollingWindowManager.recordMetric(0, "op3", "user7", 11);
    rollingWindowManager.recordMetric(0, "op3", "user8", 1);

    TopWindow window = rollingWindowManager.snapshot(0);
    Assert.assertEquals(numOps + 1, window.getOps().size());

    Op allOp = window.getOps().get(0);
    Assert.assertEquals(TopConf.ALL_CMDS, allOp.getOpType());
    List<User> topUsers = allOp.getTopUsers();
    Assert.assertEquals(numTopUsers * numOps, topUsers.size());
    // ensure all the top users for each op are present in the total op.
    for (int i = 1; i < numOps; i++) {
      Assert.assertTrue(
          topUsers.containsAll(window.getOps().get(i).getTopUsers()));
    }
  }

  private void checkValues(RollingWindowManager rwManager, long time,
      String opType, long value, long expectedTotal) throws Exception {
    TopWindow window = rwManager.snapshot(time);
    for (Op windowOp : window.getOps()) {
      if (opType.equals(windowOp.getOpType())) {
        assertEquals(value, windowOp.getTotalCount());
        break;
      }
    }
    assertEquals(expectedTotal, checkTotal(window));
  }

  private long checkTotal(TopWindow window) throws Exception {
    long allOpTotal = 0;
    long computedOpTotal = 0;

    Map<String, User> userOpTally = new HashMap<>();
    for (String user : users) {
      userOpTally.put(user, new User(user, 0));
    }
    for (Op windowOp : window.getOps()) {
      int multiplier;
      if (TopConf.ALL_CMDS.equals(windowOp.getOpType())) {
        multiplier = -1;
        allOpTotal += windowOp.getTotalCount();
      } else {
        multiplier = 1;
        computedOpTotal += windowOp.getTotalCount();
      }
      for (User user : windowOp.getAllUsers()) {
        userOpTally.get(user.getUser()).add((int)(multiplier*user.getCount()));
      }
    }
    assertEquals(allOpTotal, computedOpTotal);
    for (String user : userOpTally.keySet()) {
      assertEquals(0, userOpTally.get(user).getCount());
    }
    return computedOpTotal;
  }
}