TestReencryptionHandler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.test.Whitebox;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertTrue;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
import static org.junit.Assert.fail;

/**
 * Test class for ReencryptionHandler.
 */
public class TestReencryptionHandler {

  protected static final org.slf4j.Logger LOG =
      LoggerFactory.getLogger(TestReencryptionHandler.class);

  @Rule
  public Timeout globalTimeout = new Timeout(180 * 1000);

  @Before
  public void setup() {
    GenericTestUtils.setLogLevel(ReencryptionHandler.LOG, Level.TRACE);
  }

  private ReencryptionHandler mockReencryptionhandler(final Configuration conf)
      throws IOException {
    // mock stuff to create a mocked ReencryptionHandler
    FileSystemTestHelper helper = new FileSystemTestHelper();
    Path targetFile = new Path(new File(helper.getTestRootDir())
        .getAbsolutePath(), "test.jks");
    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
        JavaKeyStoreProvider.SCHEME_NAME + "://file" + targetFile.toUri());
    final EncryptionZoneManager ezm = Mockito.mock(EncryptionZoneManager.class);
    final KeyProvider kp = KMSUtil.createKeyProvider(conf,
        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
    Mockito.when(ezm.getProvider()).thenReturn(
        KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
    FSDirectory fsd = Mockito.mock(FSDirectory.class);
    FSNamesystem fns = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsd.getFSNamesystem()).thenReturn(fns);
    Mockito.when(ezm.getFSDirectory()).thenReturn(fsd);
    return new ReencryptionHandler(ezm, conf);
  }

  @Test
  public void testThrottle() throws Exception {
    final Configuration conf = new Configuration();
    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
        0.5);
    final ReencryptionHandler rh = mockReencryptionhandler(conf);

    // mock StopWatches so all = 30s, locked = 20s. With ratio = .5, throttle
    // should wait for 30 * 0.5 - 20 = 5s.
    final StopWatch mockAll = Mockito.mock(StopWatch.class);
    Mockito.when(mockAll.now(TimeUnit.MILLISECONDS)).thenReturn((long) 30000);
    Mockito.when(mockAll.reset()).thenReturn(mockAll);
    final StopWatch mockLocked = Mockito.mock(StopWatch.class);
    Mockito.when(mockLocked.now(TimeUnit.MILLISECONDS))
        .thenReturn((long) 20000);
    Mockito.when(mockLocked.reset()).thenReturn(mockLocked);
    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    Whitebox.setInternalState(rh, "throttleTimerAll", mockAll);
    Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
    Whitebox.setInternalState(rh, "taskQueue", queue);
    final StopWatch sw = new StopWatch().start();
    rh.getTraverser().throttle();
    sw.stop();
    assertTrue("should have throttled for at least 8 second",
        sw.now(TimeUnit.MILLISECONDS) > 8000);
    assertTrue("should have throttled for at most 12 second",
        sw.now(TimeUnit.MILLISECONDS) < 12000);
  }

  @Test
  public void testThrottleNoOp() throws Exception {
    final Configuration conf = new Configuration();
    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
        0.5);
    final ReencryptionHandler rh = mockReencryptionhandler(conf);

    // mock StopWatches so all = 30s, locked = 10s. With ratio = .5, throttle
    // should not happen.
    StopWatch mockAll = Mockito.mock(StopWatch.class);
    Mockito.when(mockAll.now()).thenReturn(new Long(30000));
    Mockito.when(mockAll.reset()).thenReturn(mockAll);
    StopWatch mockLocked = Mockito.mock(StopWatch.class);
    Mockito.when(mockLocked.now()).thenReturn(new Long(10000));
    Mockito.when(mockLocked.reset()).thenReturn(mockLocked);
    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    Whitebox.setInternalState(rh, "throttleTimerAll", mockAll);
    Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
    Whitebox.setInternalState(rh, "taskQueue", queue);
    final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker>
        submissions = new HashMap<>();
    Whitebox.setInternalState(rh, "submissions", submissions);
    StopWatch sw = new StopWatch().start();
    rh.getTraverser().throttle();
    sw.stop();
    assertTrue("should not have throttled",
        sw.now(TimeUnit.MILLISECONDS) < 1000);
  }

  @Test
  public void testThrottleConfigs() throws Exception {
    final Configuration conf = new Configuration();
    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
        -1.0);
    try {
      mockReencryptionhandler(conf);
      fail("Should not be able to init");
    } catch (IllegalArgumentException e) {
      GenericTestUtils.assertExceptionContains(" is not positive", e);
    }

    conf.setDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
        0.0);
    try {
      mockReencryptionhandler(conf);
      fail("Should not be able to init");
    } catch (IllegalArgumentException e) {
      GenericTestUtils.assertExceptionContains(" is not positive", e);
    }
  }

  @Test
  public void testThrottleAccumulatingTasks() throws Exception {
    final Configuration conf = new Configuration();
    final ReencryptionHandler rh = mockReencryptionhandler(conf);

    // mock tasks piling up
    final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker>
        submissions = new HashMap<>();
    final ReencryptionUpdater.ZoneSubmissionTracker zst =
        new ReencryptionUpdater.ZoneSubmissionTracker();
    submissions.put(new Long(1), zst);
    Future mock = Mockito.mock(Future.class);
    for (int i = 0; i < Runtime.getRuntime().availableProcessors() * 3; ++i) {
      zst.addTask(mock);
    }

    Thread removeTaskThread = new Thread() {
      public void run() {
        try {
          Thread.sleep(3000);
        } catch (InterruptedException ie) {
          LOG.info("removeTaskThread interrupted.");
          Thread.currentThread().interrupt();
        }
        zst.getTasks().clear();
      }
    };

    Whitebox.setInternalState(rh, "submissions", submissions);
    final StopWatch sw = new StopWatch().start();
    removeTaskThread.start();
    rh.getTraverser().throttle();
    sw.stop();
    LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS));
    assertTrue("should have throttled for at least 3 second",
        sw.now(TimeUnit.MILLISECONDS) >= 3000);
  }
}