TestConfigurationNodeAttributesProvider.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Test;
import org.junit.Assert;

import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;

/**
 * Test class for node configuration node attributes provider.
 */
public class TestConfigurationNodeAttributesProvider {

  private static File testRootDir = new File("target",
      TestConfigurationNodeAttributesProvider.class.getName() + "-localDir")
      .getAbsoluteFile();

  private ConfigurationNodeAttributesProvider nodeAttributesProvider;

  @BeforeClass
  public static void create() {
    testRootDir.mkdirs();
  }

  @Before
  public void setup() {
    nodeAttributesProvider = new ConfigurationNodeAttributesProvider();
  }

  @After
  public void tearDown() throws Exception {
    if (nodeAttributesProvider != null) {
      nodeAttributesProvider.close();
      nodeAttributesProvider.stop();
    }
  }

  @AfterClass
  public static void remove() throws Exception {
    if (testRootDir.exists()) {
      FileContext.getLocalFSFileContext()
          .delete(new Path(testRootDir.getAbsolutePath()), true);
    }
  }

  @Test(timeout=30000L)
  public void testNodeAttributesFetchInterval()
      throws IOException, InterruptedException {
    Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
    expectedAttributes1.add(NodeAttribute
        .newInstance("test.io", "host",
            NodeAttributeType.STRING, "host1"));

    Configuration conf = new Configuration();
    // Set fetch interval to 1s for testing
    conf.setLong(
        YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, 1000);
    ConfigurationNodeAttributesProvider spyProvider =
        Mockito.spy(nodeAttributesProvider);
    Mockito.when(spyProvider.parseAttributes(Mockito.any()))
        .thenReturn(expectedAttributes1);

    spyProvider.init(conf);
    spyProvider.start();

    // Verify init value is honored.
    Assert.assertEquals(expectedAttributes1, spyProvider.getDescriptors());

    // Configuration provider provides a different set of attributes.
    Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
    expectedAttributes2.add(NodeAttribute
        .newInstance("test.io", "os",
            NodeAttributeType.STRING, "windows"));
    Mockito.when(spyProvider.parseAttributes(Mockito.any()))
        .thenReturn(expectedAttributes2);

    // Since we set fetch interval to 1s, it needs to wait for 1s until
    // the updated attributes is updated to the provider. So we are expecting
    // to see some old values for a short window.
    ArrayList<String> keysMet = new ArrayList<>();
    int numOfOldValue = 0;
    int numOfNewValue = 0;
    // Run 5 times in 500ms interval
    int times=5;
    while(times>0) {
      Set<NodeAttribute> current = spyProvider.getDescriptors();
      Assert.assertEquals(1, current.size());
      String attributeName =
          current.iterator().next().getAttributeKey().getAttributeName();
      if ("host".equals(attributeName)){
        numOfOldValue++;
      } else if ("os".equals(attributeName)) {
        numOfNewValue++;
      }
      Thread.sleep(500);
      times--;
    }
    // We should either see the old value or the new value.
    Assert.assertEquals(5, numOfNewValue + numOfOldValue);
    // Both values should be more than 0.
    Assert.assertTrue(numOfOldValue > 0);
    Assert.assertTrue(numOfNewValue > 0);
  }

  @Test
  public void testDisableFetchNodeAttributes() throws IOException,
      InterruptedException {
    Set<NodeAttribute> expectedAttributes1 = new HashSet<>();
    expectedAttributes1.add(NodeAttribute
        .newInstance("test.io", "host",
            NodeAttributeType.STRING, "host1"));

    Configuration conf = new Configuration();
    // Set fetch interval to -1 to disable refresh.
    conf.setLong(
        YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
    ConfigurationNodeAttributesProvider spyProvider =
        Mockito.spy(nodeAttributesProvider);
    Mockito.when(spyProvider.parseAttributes(Mockito.any()))
        .thenReturn(expectedAttributes1);
    spyProvider.init(conf);
    spyProvider.start();
    Assert.assertEquals(expectedAttributes1,
        spyProvider.getDescriptors());

    // The configuration added another attribute,
    // as we disabled the fetch interval, this value cannot be
    // updated to the provider.
    Set<NodeAttribute> expectedAttributes2 = new HashSet<>();
    expectedAttributes2.add(NodeAttribute
        .newInstance("test.io", "os",
            NodeAttributeType.STRING, "windows"));
    Mockito.when(spyProvider.parseAttributes(Mockito.anyString()))
        .thenReturn(expectedAttributes2);

    // Wait a few seconds until we get the value update, expecting a failure.
    try {
      GenericTestUtils.waitFor(() -> {
        Set<NodeAttribute> attributes = spyProvider.getDescriptors();
        return "os".equalsIgnoreCase(attributes
            .iterator().next().getAttributeKey().getAttributeName());
      }, 500, 1000);
    } catch (Exception e) {
      // Make sure we get the timeout exception.
      Assert.assertTrue(e instanceof TimeoutException);
      return;
    }

    Assert.fail("Expecting a failure in previous check!");
  }

  @Test
  public void testFetchAttributesFromConfiguration() {
    Configuration conf = new Configuration();
    // Set fetch interval to -1 to disable refresh.
    conf.setLong(
        YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_FETCH_INTERVAL_MS, -1);
    conf.setStrings(
        YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_ATTRIBUTES, "");
  }

  @Test
  public void testParseConfiguration() throws IOException {
    // ATTRIBUTE_NAME,ATTRIBUTE_TYPE,ATTRIBUTE_VALUE
    String attributesStr = "hostname,STRING,host1234:uptime,STRING,321543";
    Set<NodeAttribute> attributes = nodeAttributesProvider
        .parseAttributes(attributesStr);
    Assert.assertEquals(2, attributes.size());
    Iterator<NodeAttribute> ait = attributes.iterator();

    while(ait.hasNext()) {
      NodeAttribute attr = ait.next();
      NodeAttributeKey at = attr.getAttributeKey();
      if (at.getAttributeName().equals("hostname")) {
        Assert.assertEquals("hostname", at.getAttributeName());
        Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
            at.getAttributePrefix());
        Assert.assertEquals(NodeAttributeType.STRING,
            attr.getAttributeType());
        Assert.assertEquals("host1234", attr.getAttributeValue());
      } else if (at.getAttributeName().equals("uptime")) {
        Assert.assertEquals("uptime", at.getAttributeName());
        Assert.assertEquals(NodeAttribute.PREFIX_DISTRIBUTED,
            at.getAttributePrefix());
        Assert.assertEquals(NodeAttributeType.STRING,
            attr.getAttributeType());
        Assert.assertEquals("321543", attr.getAttributeValue());
      } else {
        Assert.fail("Unexpected attribute");
      }
    }
    // Missing type
    attributesStr = "hostname,host1234";
    try {
      nodeAttributesProvider.parseAttributes(attributesStr);
      Assert.fail("Expecting a parsing failure");
    } catch (IOException e) {
      Assert.assertNotNull(e);
      Assert.assertTrue(e.getMessage().contains("Invalid value"));
    }

    // Extra prefix
    attributesStr = "prefix/hostname,STRING,host1234";
    try {
      nodeAttributesProvider.parseAttributes(attributesStr);
      Assert.fail("Expecting a parsing failure");
    } catch (IOException e) {
      Assert.assertNotNull(e);
      Assert.assertTrue(e.getMessage()
          .contains("should not contain any prefix."));
    }

    // Invalid type
    attributesStr = "hostname,T,host1234";
    try {
      nodeAttributesProvider.parseAttributes(attributesStr);
      Assert.fail("Expecting a parsing failure");
    } catch (IOException e) {
      e.printStackTrace();
      Assert.assertNotNull(e);
      Assert.assertTrue(e.getMessage()
          .contains("Invalid node attribute type"));
    }
  }
}