TestFileSystemNodeAttributeStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.server.resourcemanager.NodeAttributeTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * Test class for FileSystemNodeAttributeStore.
 */
public class TestFileSystemNodeAttributeStore {

  private MockNodeAttrbuteManager mgr = null;
  private Configuration conf = null;

  private static class MockNodeAttrbuteManager
      extends NodeAttributesManagerImpl {
    @Override
    protected void initDispatcher(Configuration conf) {
      super.dispatcher = new InlineDispatcher();
    }

    @Override
    protected void startDispatcher() {
      //Do nothing
    }

    @Override
    protected void stopDispatcher() {
      //Do nothing
    }
  }

  @BeforeEach
  public void before() throws IOException {
    mgr = new MockNodeAttrbuteManager();
    conf = new Configuration();
    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
    conf = NodeAttributeTestUtils.getRandomDirConf(conf);
    mgr.init(conf);
    mgr.start();
  }

  @AfterEach
  public void after() throws IOException {
    FileSystemNodeAttributeStore fsStore =
        ((FileSystemNodeAttributeStore) mgr.store);
    fsStore.getFs().delete(fsStore.getFsWorkingPath(), true);
    mgr.stop();
  }

  @Test
  @Timeout(value = 10)
  public void testEmptyRecoverSkipInternalUdpate() throws Exception {
    // Stop manager
    mgr.stop();

    // Start new attribute manager with same path
    mgr = spy(new MockNodeAttrbuteManager());
    mgr.init(conf);
    mgr.start();

    verify(mgr, times(0))
        .internalUpdateAttributesOnNodes(any(), any(), any(), any());
  }

  @Test
  @Timeout(value = 10)
  public void testRecoverWithMirror() throws Exception {

    //------host0----
    // add       -GPU & FPGA
    // remove    -GPU
    // replace   -Docker
    //------host1----
    // add--GPU
    NodeAttribute docker = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
            NodeAttributeType.STRING, "docker-0");
    NodeAttribute gpu = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
            NodeAttributeType.STRING, "nvidia");
    NodeAttribute fpga = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
            NodeAttributeType.STRING, "asus");

    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
    toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
    toAddAttributes.put("host1", ImmutableSet.of(gpu));
    // Add node attribute
    mgr.addNodeAttributes(toAddAttributes);

    assertEquals(2, mgr.getAttributesForNode("host0").size(), "host0 size");
    // Add test to remove
    toAddAttributes.clear();
    toAddAttributes.put("host0", ImmutableSet.of(gpu));
    mgr.removeNodeAttributes(toAddAttributes);

    // replace nodeattribute
    toAddAttributes.clear();
    toAddAttributes.put("host0", ImmutableSet.of(docker));
    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
        toAddAttributes);
    Map<NodeAttribute, AttributeValue> attrs =
        mgr.getAttributesForNode("host0");
    assertThat(attrs).hasSize(1);
    assertThat(attrs.keySet().toArray()[0]).isEqualTo(docker);
    mgr.stop();

    // Start new attribute manager with same path
    mgr = new MockNodeAttrbuteManager();
    mgr.init(conf);
    mgr.start();

    mgr.getAttributesForNode("host0");
    assertEquals(1, mgr.getAttributesForNode("host0").size(), "host0 size");
    assertEquals(1, mgr.getAttributesForNode("host1").size(), "host1 size");
    attrs = mgr.getAttributesForNode("host0");
    assertThat(attrs).hasSize(1);
    assertThat(attrs.keySet().toArray()[0]).isEqualTo(docker);
    //------host0----
    // current       - docker
    // replace       - gpu
    //----- host1----
    // current       - gpu
    // add           - docker
    toAddAttributes.clear();
    toAddAttributes.put("host0", ImmutableSet.of(gpu));
    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
        toAddAttributes);

    toAddAttributes.clear();
    toAddAttributes.put("host1", ImmutableSet.of(docker));
    mgr.addNodeAttributes(toAddAttributes);
    // Recover from mirror and edit log
    mgr.stop();

    mgr = new MockNodeAttrbuteManager();
    mgr.init(conf);
    mgr.start();
    assertEquals(1, mgr.getAttributesForNode("host0").size(), "host0 size");
    assertEquals(2, mgr.getAttributesForNode("host1").size(), "host1 size");
    attrs = mgr.getAttributesForNode("host0");
    assertThat(attrs).hasSize(1);
    assertThat(attrs.keySet().toArray()[0]).isEqualTo(gpu);
    attrs = mgr.getAttributesForNode("host1");
    assertTrue(attrs.keySet().contains(docker));
    assertTrue(attrs.keySet().contains(gpu));
  }

  @Test
  @Timeout(value = 10)
  public void testRecoverFromEditLog() throws Exception {
    NodeAttribute docker = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "DOCKER",
            NodeAttributeType.STRING, "docker-0");
    NodeAttribute gpu = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
            NodeAttributeType.STRING, "nvidia");
    NodeAttribute fpga = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "FPGA",
            NodeAttributeType.STRING, "asus");

    Map<String, Set<NodeAttribute>> toAddAttributes = new HashMap<>();
    toAddAttributes.put("host0", ImmutableSet.of(gpu, fpga));
    toAddAttributes.put("host1", ImmutableSet.of(docker));

    // Add node attribute
    mgr.addNodeAttributes(toAddAttributes);

    assertEquals(2, mgr.getAttributesForNode("host0").size(), "host0 size");

    //  Increase editlog operation
    for (int i = 0; i < 5; i++) {
      // Add gpu host1
      toAddAttributes.clear();
      toAddAttributes.put("host0", ImmutableSet.of(gpu));
      mgr.removeNodeAttributes(toAddAttributes);

      // Add gpu host1
      toAddAttributes.clear();
      toAddAttributes.put("host1", ImmutableSet.of(docker));
      mgr.addNodeAttributes(toAddAttributes);

      // Remove GPU replace
      toAddAttributes.clear();
      toAddAttributes.put("host0", ImmutableSet.of(gpu));
      mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
          toAddAttributes);

      // Add fgpa host1
      toAddAttributes.clear();
      toAddAttributes.put("host1", ImmutableSet.of(gpu));
      mgr.addNodeAttributes(toAddAttributes);
    }
    mgr.stop();

    // Start new attribute manager with same path
    mgr = new MockNodeAttrbuteManager();
    mgr.init(conf);
    mgr.start();

    assertEquals(1, mgr.getAttributesForNode("host0").size(), "host0 size");
    assertEquals(2, mgr.getAttributesForNode("host1").size(), "host1 size");

    toAddAttributes.clear();
    NodeAttribute replaced =
        NodeAttribute.newInstance("GPU2", NodeAttributeType.STRING, "nvidia2");
    toAddAttributes.put("host0", ImmutableSet.of(replaced));
    mgr.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
        toAddAttributes);
    mgr.stop();

    mgr = new MockNodeAttrbuteManager();
    mgr.init(conf);
    mgr.start();
    Map<NodeAttribute, AttributeValue> valueMap =
        mgr.getAttributesForNode("host0");
    Map.Entry<NodeAttribute, AttributeValue> entry =
        valueMap.entrySet().iterator().next();
    NodeAttribute attribute = entry.getKey();
    assertEquals(1, mgr.getAttributesForNode("host0").size(), "host0 size");
    assertEquals(2, mgr.getAttributesForNode("host1").size(), "host1 size");
    checkNodeAttributeEqual(replaced, attribute);
  }

  public void checkNodeAttributeEqual(NodeAttribute atr1, NodeAttribute atr2) {
    assertEquals(atr1.getAttributeType(), atr2.getAttributeType());
    assertEquals(atr1.getAttributeKey().getAttributeName(),
        atr2.getAttributeKey().getAttributeName());
    assertEquals(atr1.getAttributeKey().getAttributePrefix(),
        atr2.getAttributeKey().getAttributePrefix());
    assertEquals(atr1.getAttributeValue(), atr2.getAttributeValue());
  }
}