TestNodeStatusUpdaterForAttributes.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.Thread.State;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
 * Test NodeStatusUpdater for node attributes.
 */
public class TestNodeStatusUpdaterForAttributes extends NodeLabelTestBase {
  private static final RecordFactory RECORD_FACTORY =
      RecordFactoryProvider.getRecordFactory(null);

  private NodeManager nm;
  private DummyNodeAttributesProvider dummyAttributesProviderRef;

  @Before
  public void setup() {
    dummyAttributesProviderRef = new DummyNodeAttributesProvider();
  }

  @After
  public void tearDown() {
    if (null != nm) {
      ServiceOperations.stop(nm);
    }
  }

  private class ResourceTrackerForAttributes implements ResourceTracker {
    private int heartbeatID = 0;
    private Set<NodeAttribute> attributes;

    private boolean receivedNMHeartbeat = false;
    private boolean receivedNMRegister = false;

    private MasterKey createMasterKey() {
      MasterKey masterKey = new MasterKeyPBImpl();
      masterKey.setKeyId(123);
      masterKey.setBytes(
          ByteBuffer.wrap(new byte[] {new Integer(123).byteValue() }));
      return masterKey;
    }

    @Override
    public RegisterNodeManagerResponse registerNodeManager(
        RegisterNodeManagerRequest request) throws YarnException, IOException {
      attributes = request.getNodeAttributes();
      RegisterNodeManagerResponse response =
          RECORD_FACTORY.newRecordInstance(RegisterNodeManagerResponse.class);
      response.setNodeAction(NodeAction.NORMAL);
      response.setContainerTokenMasterKey(createMasterKey());
      response.setNMTokenMasterKey(createMasterKey());
      response.setAreNodeAttributesAcceptedByRM(attributes != null);
      synchronized (ResourceTrackerForAttributes.class) {
        receivedNMRegister = true;
        ResourceTrackerForAttributes.class.notifyAll();
      }
      return response;
    }

    public void waitTillHeartbeat()
        throws InterruptedException, TimeoutException {
      GenericTestUtils.waitFor(() -> receivedNMHeartbeat, 100, 30000);
      if (!receivedNMHeartbeat) {
        Assert.fail("Heartbeat is not received even after waiting");
      }
    }

    public void waitTillRegister()
        throws InterruptedException, TimeoutException {
      GenericTestUtils.waitFor(() -> receivedNMRegister, 100, 30000);
      if (!receivedNMRegister) {
        Assert.fail("Registration is not received even after waiting");
      }
    }

    /**
     * Flag to indicate received any.
     */
    public void resetNMHeartbeatReceiveFlag() {
      synchronized (ResourceTrackerForAttributes.class) {
        receivedNMHeartbeat = false;
      }
    }

    @Override
    public NodeHeartbeatResponse nodeHeartbeat(
        NodeHeartbeatRequest request) {
      attributes = request.getNodeAttributes();
      NodeStatus nodeStatus = request.getNodeStatus();
      nodeStatus.setResponseId(heartbeatID++);

      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
          .newNodeHeartbeatResponse(heartbeatID, NodeAction.NORMAL, null, null,
              null, null, 1000L);

      // to ensure that heartbeats are sent only when required.
      nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
      nhResponse.setAreNodeAttributesAcceptedByRM(attributes != null);

      synchronized (ResourceTrackerForAttributes.class) {
        receivedNMHeartbeat = true;
        ResourceTrackerForAttributes.class.notifyAll();
      }
      return nhResponse;
    }

    @Override
    public UnRegisterNodeManagerResponse unRegisterNodeManager(
        UnRegisterNodeManagerRequest request) {
      return null;
    }
  }

  /**
   * A dummy NodeAttributesProvider class for tests.
   */
  public static class DummyNodeAttributesProvider
      extends NodeAttributesProvider {

    public DummyNodeAttributesProvider() {
      super("DummyNodeAttributesProvider");
      // disable the fetch timer.
      setIntervalTime(-1);
    }

    @Override
    protected void cleanUp() throws Exception {
      // fake implementation, nothing to cleanup
    }

    @Override
    public TimerTask createTimerTask() {
      return new TimerTask() {
        @Override
        public void run() {
          setDescriptors(Collections.unmodifiableSet(new HashSet<>(0)));
        }
      };
    }
  }

  private YarnConfiguration createNMConfigForDistributeNodeAttributes() {
    YarnConfiguration conf = new YarnConfiguration();
    return conf;
  }

  @Test(timeout = 20000)
  public void testNodeStatusUpdaterForNodeAttributes()
      throws InterruptedException, IOException, TimeoutException {
    final ResourceTrackerForAttributes resourceTracker =
        new ResourceTrackerForAttributes();
    nm = new NodeManager() {
      @Override
      protected NodeAttributesProvider createNodeAttributesProvider(
          Configuration conf) throws IOException {
        return dummyAttributesProviderRef;
      }

      @Override
      protected NodeStatusUpdater createNodeStatusUpdater(
          Context context, Dispatcher dispatcher,
          NodeHealthCheckerService healthChecker) {

        return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
            metrics) {
          @Override
          protected ResourceTracker getRMClient() {
            return resourceTracker;
          }

          @Override
          protected void stopRMProxy() {
            return;
          }
        };
      }
    };

    YarnConfiguration conf = createNMConfigForDistributeNodeAttributes();
    conf.setLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, 2000);
    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
        "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10));

    nm.init(conf);
    resourceTracker.resetNMHeartbeatReceiveFlag();
    nm.start();
    resourceTracker.waitTillRegister();
    assertTrue(NodeLabelUtil
        .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
            resourceTracker.attributes));

    resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // heartbeat with updated attributes
    NodeAttribute attribute1 = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
            NodeAttributeType.STRING, "V1");
    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));

    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertTrue(NodeLabelUtil
        .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
            resourceTracker.attributes));
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // heartbeat without updating attributes
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    resourceTracker.resetNMHeartbeatReceiveFlag();
    assertNull("If no change in attributes"
            + " then null should be sent as part of request",
        resourceTracker.attributes);

    // provider return with null attributes
    dummyAttributesProviderRef.setDescriptors(null);
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertNotNull("If provider sends null"
            + " then empty label set should be sent and not null",
        resourceTracker.attributes);
    assertTrue("If provider sends null then empty attributes should be sent",
        resourceTracker.attributes.isEmpty());
    resourceTracker.resetNMHeartbeatReceiveFlag();
    // Since the resync interval is set to 2 sec in every alternate heartbeat
    // the attributes will be send along with heartbeat.
    // In loop we sleep for 1 sec
    // so that every sec 1 heartbeat is send.
    int nullAttributes = 0;
    int nonNullAttributes = 0;
    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
    for (int i = 0; i < 5; i++) {
      sendOutofBandHeartBeat();
      resourceTracker.waitTillHeartbeat();
      if (null == resourceTracker.attributes) {
        nullAttributes++;
      } else {
        Assert.assertTrue("In heartbeat PI attributes should be send",
            NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1),
                resourceTracker.attributes));
        nonNullAttributes++;
      }
      resourceTracker.resetNMHeartbeatReceiveFlag();
      Thread.sleep(1000);
    }
    Assert.assertTrue("More than one heartbeat with empty attributes expected",
        nullAttributes > 1);
    Assert.assertTrue("More than one heartbeat with attributes expected",
        nonNullAttributes > 1);
    nm.stop();
  }

  @Test(timeout = 20000)
  public void testInvalidNodeAttributesFromProvider()
      throws InterruptedException, IOException, TimeoutException {
    final ResourceTrackerForAttributes resourceTracker =
        new ResourceTrackerForAttributes();
    nm = new NodeManager() {
      @Override protected NodeAttributesProvider createNodeAttributesProvider(
          Configuration conf) throws IOException {
        return dummyAttributesProviderRef;
      }

      @Override protected NodeStatusUpdater createNodeStatusUpdater(
          Context context, Dispatcher dispatcher,
          NodeHealthCheckerService healthChecker) {

        return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
            metrics) {
          @Override protected ResourceTracker getRMClient() {
            return resourceTracker;
          }

          @Override protected void stopRMProxy() {
            return;
          }
        };
      }
    };

    YarnConfiguration conf = createNMConfigForDistributeNodeAttributes();
    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
        "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10));
    nm.init(conf);
    resourceTracker.resetNMHeartbeatReceiveFlag();
    nm.start();
    resourceTracker.waitTillRegister();
    assertTrue(NodeLabelUtil
        .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
            resourceTracker.attributes));

    resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // update attribute1
    NodeAttribute attribute1 = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
            NodeAttributeType.STRING, "V1");
    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1),
        resourceTracker.attributes));
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // update attribute2
    NodeAttribute attribute2 = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
            NodeAttributeType.STRING, "V2");
    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute2));
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute2),
        resourceTracker.attributes));
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // update attribute2 & attribute2
    dummyAttributesProviderRef
        .setDescriptors(ImmutableSet.of(attribute1, attribute2));
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertTrue(NodeLabelUtil
        .isNodeAttributesEquals(ImmutableSet.of(attribute1, attribute2),
            resourceTracker.attributes));
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // heartbeat with invalid attributes
    NodeAttribute invalidAttribute = NodeAttribute
        .newInstance("_.P", "Attr1", NodeAttributeType.STRING, "V1");
    dummyAttributesProviderRef
        .setDescriptors(ImmutableSet.of(invalidAttribute));
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertNull("On Invalid Attributes we need to retain earlier attributes, HB"
        + " needs to send null", resourceTracker.attributes);
    resourceTracker.resetNMHeartbeatReceiveFlag();

    // on next heartbeat same invalid attributes will be given by the provider,
    // but again validation check and reset RM with invalid attributes set
    // should not happen
    sendOutofBandHeartBeat();
    resourceTracker.waitTillHeartbeat();
    assertNull("NodeStatusUpdater need not send repeatedly empty attributes on"
        + " invalid attributes from provider ", resourceTracker.attributes);
    resourceTracker.resetNMHeartbeatReceiveFlag();
  }

  /**
   * This is to avoid race condition in the test case. NodeStatusUpdater
   * heartbeat thread after sending the heartbeat needs some time to process the
   * response and then go wait state. But in the test case once the main test
   * thread returns back after resourceTracker.waitTillHeartbeat() we proceed
   * with next sendOutofBandHeartBeat before heartbeat thread is blocked on
   * wait.
   * @throws InterruptedException
   * @throws IOException
   */
  private void sendOutofBandHeartBeat()
      throws InterruptedException, IOException {
    int i = 0;
    do {
      State statusUpdaterThreadState =
          ((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater())
              .getStatusUpdaterThreadState();
      if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING)
          || statusUpdaterThreadState.equals(Thread.State.WAITING)) {
        nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
        break;
      }
      if (++i <= 10) {
        Thread.sleep(50);
      } else {
        throw new IOException("Waited for 500 ms"
            + " but NodeStatusUpdaterThread not in waiting state");
      }
    } while (true);
  }
}