TestBlockPoolManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.net.MockDomainNameResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;


public class TestBlockPoolManager {
  private static final Logger LOG =
      LoggerFactory.getLogger(TestBlockPoolManager.class);
  private final DataNode mockDN = Mockito.mock(DataNode.class);
  private BlockPoolManager bpm;
  private final StringBuilder log = new StringBuilder();
  private int mockIdx = 1;
  
  @Before
  public void setupBPM() {
    bpm = new BlockPoolManager(mockDN){

      @Override
      protected BPOfferService createBPOS(
          final String nameserviceId,
          List<String> nnIds,
          List<InetSocketAddress> nnAddrs,
          List<InetSocketAddress> lifelineNnAddrs) {
        final int idx = mockIdx++;
        doLog("create #" + idx);
        final BPOfferService bpos = Mockito.mock(BPOfferService.class);
        Mockito.doReturn("Mock BPOS #" + idx).when(bpos).toString();
        List<BPServiceActor> bpsa = new ArrayList<>(nnIds.size());
        for (int i = 0; i < nnIds.size(); i++) {
          BPServiceActor actor = Mockito.mock(BPServiceActor.class);
          Mockito.doReturn(nnIds.get(i)).when(actor).getNnId();
          Mockito.doReturn(nnAddrs.get(i)).when(actor).getNNSocketAddress();
          bpsa.add(actor);
        }
        Mockito.doReturn(bpsa).when(bpos).getBPServiceActors();
        // Log refreshes
        try {
          Mockito.doAnswer(
              new Answer<Void>() {
                @Override
                public Void answer(InvocationOnMock invocation) throws Throwable {
                  doLog("refresh #" + idx);
                  return null;
                }
              }).when(bpos).refreshNNList(Mockito.anyString(),
                  Mockito.<List<String>>any(),
                  Mockito.<ArrayList<InetSocketAddress>>any(),
                  Mockito.<ArrayList<InetSocketAddress>>any());
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        // Log stops
        Mockito.doAnswer(
            new Answer<Void>() {
              @Override
              public Void answer(InvocationOnMock invocation) throws Throwable {
                doLog("stop #" + idx);
                bpm.remove(bpos);
                return null;
              }
            }).when(bpos).stop();
        return bpos;
      }
    };
  }
  
  private void doLog(String string) {
    synchronized(log) {
      LOG.info(string);
      log.append(string).append("\n");
    }
  }

  @Test
  public void testSimpleSingleNS() throws Exception {
    Configuration conf = new Configuration();
    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY,
        "hdfs://mock1:8020");
    bpm.refreshNamenodes(conf);
    assertEquals("create #1\n", log.toString());
  }

  @Test
  public void testFederationRefresh() throws Exception {
    Configuration conf = new Configuration();
    conf.set(DFSConfigKeys.DFS_NAMESERVICES,
        "ns1,ns2");
    addNN(conf, "ns1", "mock1:8020");
    addNN(conf, "ns2", "mock1:8020");
    bpm.refreshNamenodes(conf);
    assertEquals(
        "create #1\n" +
        "create #2\n", log.toString());
    log.setLength(0);

    // Remove the first NS
    conf.set(DFSConfigKeys.DFS_NAMESERVICES,
        "ns2");
    bpm.refreshNamenodes(conf);
    assertEquals(
        "stop #1\n" +
        "refresh #2\n", log.toString());
    log.setLength(0);
    
    // Add back an NS -- this creates a new BPOS since the old
    // one for ns2 should have been previously retired
    conf.set(DFSConfigKeys.DFS_NAMESERVICES,
        "ns1,ns2");
    bpm.refreshNamenodes(conf);
    assertEquals(
        "create #3\n" +
        "refresh #2\n", log.toString());
  }

  @Test
  public void testInternalNameService() throws Exception {
    Configuration conf = new Configuration();
    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1,ns2,ns3");
    addNN(conf, "ns1", "mock1:8020");
    addNN(conf, "ns2", "mock1:8020");
    addNN(conf, "ns3", "mock1:8020");
    conf.set(DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
    bpm.refreshNamenodes(conf);
    assertEquals("create #1\n", log.toString());
    Map<String, BPOfferService> map = bpm.getBpByNameserviceId();
    Assert.assertFalse(map.containsKey("ns2"));
    Assert.assertFalse(map.containsKey("ns3"));
    Assert.assertTrue(map.containsKey("ns1"));
    log.setLength(0);
  }

  @Test
  public void testNameServiceNeedToBeResolved() throws Exception {
    Configuration conf = new Configuration();
    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1,ns2,ns3");
    addNN(conf, "ns1", "mock1:8020");
    addNN(conf, "ns2", "mock1:8020");
    addNN(conf, "ns3", MockDomainNameResolver.DOMAIN + ":8020");
    addDNSSettings(conf, "ns3");
    bpm.refreshNamenodes(conf);
    assertEquals(
        "create #1\n" +
            "create #2\n" +
            "create #3\n", log.toString());
    Map<String, BPOfferService> map = bpm.getBpByNameserviceId();
    Assert.assertTrue(map.containsKey("ns1"));
    Assert.assertTrue(map.containsKey("ns2"));
    Assert.assertTrue(map.containsKey("ns3"));
    Assert.assertEquals(2, map.get("ns3").getBPServiceActors().size());
    Assert.assertEquals("ns3-" +  MockDomainNameResolver.FQDN_1 + "-8020",
        map.get("ns3").getBPServiceActors().get(0).getNnId());
    Assert.assertEquals("ns3-" +  MockDomainNameResolver.FQDN_2 + "-8020",
        map.get("ns3").getBPServiceActors().get(1).getNnId());
    Assert.assertEquals(
        new InetSocketAddress(MockDomainNameResolver.FQDN_1, 8020),
        map.get("ns3").getBPServiceActors().get(0).getNNSocketAddress());
    Assert.assertEquals(
        new InetSocketAddress(MockDomainNameResolver.FQDN_2, 8020),
        map.get("ns3").getBPServiceActors().get(1).getNNSocketAddress());
    log.setLength(0);
  }


  /**
   * Add more DNS related settings to the passed in configuration.
   * @param config Configuration file to add settings to.
   */
  private void addDNSSettings(Configuration config,
      String nameservice) {
    config.setBoolean(
        DFSConfigKeys.DFS_NAMESERVICES_RESOLUTION_ENABLED + "."
            + nameservice, true);
    config.set(
        DFSConfigKeys.DFS_NAMESERVICES_RESOLVER_IMPL + "." + nameservice,
        MockDomainNameResolver.class.getName());
  }

  private static void addNN(Configuration conf, String ns, String addr) {
    String key = DFSUtil.addKeySuffixes(
        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, ns);
    conf.set(key, addr);
  }
}