ClusterNode.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;

import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
 * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
 */
public class ClusterNode {
  /**
   * Properties class used to initialize/change fields in ClusterNode.
   */
  public static final class Properties {
    private int queueLength = 0;
    private int queueWaitTime = -1;
    private long timestamp;
    private int queueCapacity = 0;
    private boolean queueCapacityIsSet = false;
    private final HashSet<String> labels;
    private Resource capability = null;
    private Resource allocatedResource = null;

    public static Properties newInstance() {
      return new Properties();
    }

    Properties setQueueLength(int qLength) {
      this.queueLength = qLength;
      return this;
    }

    Properties setQueueWaitTime(int wTime) {
      this.queueWaitTime = wTime;
      return this;
    }

    Properties updateTimestamp() {
      this.timestamp = System.currentTimeMillis();
      return this;
    }

    Properties setQueueCapacity(int capacity) {
      this.queueCapacity = capacity;
      this.queueCapacityIsSet = true;
      return this;
    }

    Properties setNodeLabels(Collection<String> labelsToAdd) {
      labels.clear();
      labels.addAll(labelsToAdd);
      return this;
    }

    Properties setCapability(Resource nodeCapability) {
      this.capability = nodeCapability;
      return this;
    }

    Properties setAllocatedResource(Resource allocResource) {
      this.allocatedResource = allocResource;
      return this;
    }

    private Properties() {
      labels = new HashSet<>();
    }
  }

  private int queueLength = 0;
  private int queueWaitTime = -1;
  private long timestamp;
  final NodeId nodeId;
  private int queueCapacity = 0;
  private final HashSet<String> labels;
  private Resource capability = Resources.none();
  private Resource allocatedResource = Resources.none();
  private final ReentrantReadWriteLock.WriteLock writeLock;
  private final ReentrantReadWriteLock.ReadLock readLock;

  public ClusterNode(NodeId nodeId) {
    this.nodeId = nodeId;
    this.labels = new HashSet<>();
    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.writeLock = lock.writeLock();
    this.readLock = lock.readLock();
    this.timestamp = System.currentTimeMillis();
  }

  public ClusterNode setProperties(final Properties properties) {
    writeLock.lock();
    try {
      if (properties.capability == null) {
        this.capability = Resources.none();
      } else {
        this.capability = properties.capability;
      }

      if (properties.allocatedResource == null) {
        this.allocatedResource = Resources.none();
      } else {
        this.allocatedResource = properties.allocatedResource;
      }

      this.queueLength = properties.queueLength;
      this.queueWaitTime = properties.queueWaitTime;
      this.timestamp = properties.timestamp;
      if (properties.queueCapacityIsSet) {
        // queue capacity is only set on node add, not on node updates
        this.queueCapacity = properties.queueCapacity;
      }
      this.labels.clear();
      this.labels.addAll(properties.labels);
      return this;
    } finally {
      writeLock.unlock();
    }
  }

  public Resource getAllocatedResource() {
    readLock.lock();
    try {
      return this.allocatedResource;
    } finally {
      readLock.unlock();
    }
  }

  public Resource getAvailableResource() {
    readLock.lock();
    try {
      return Resources.subtractNonNegative(capability, allocatedResource);
    } finally {
      readLock.unlock();
    }
  }

  public Resource getCapability() {
    readLock.lock();
    try {
      return this.capability;
    } finally {
      readLock.unlock();
    }
  }

  public boolean hasLabel(String label) {
    readLock.lock();
    try {
      return this.labels.contains(label);
    } finally {
      readLock.unlock();
    }
  }

  public long getTimestamp() {
    readLock.lock();
    try {
      return this.timestamp;
    } finally {
      readLock.unlock();
    }
  }

  public int getQueueLength() {
    readLock.lock();
    try {
      return this.queueLength;
    } finally {
      readLock.unlock();
    }
  }

  public int getQueueWaitTime() {
    readLock.lock();
    try {
      return this.queueWaitTime;
    } finally {
      readLock.unlock();
    }
  }

  public int getQueueCapacity() {
    readLock.lock();
    try {
      return this.queueCapacity;
    } finally {
      readLock.unlock();
    }
  }

  public boolean compareAndIncrementAllocation(
      final int incrementQLen,
      final ResourceCalculator resourceCalculator,
      final Resource requested) {
    writeLock.lock();
    try {
      final Resource currAvailable = Resources.subtractNonNegative(
          capability, allocatedResource);
      if (resourceCalculator.fitsIn(requested, currAvailable)) {
        allocatedResource = Resources.add(allocatedResource, requested);
        return true;
      }

      if (!resourceCalculator.fitsIn(requested, capability)) {
        // If does not fit at all, do not allocate
        return false;
      }

      return compareAndIncrementAllocation(incrementQLen);
    } finally {
      writeLock.unlock();
    }
  }

  public boolean compareAndIncrementAllocation(final int incrementQLen) {
    writeLock.lock();
    try {
      final int added = queueLength + incrementQLen;
      if (added <= queueCapacity) {
        queueLength = added;
        return true;
      }
      return false;
    } finally {
      writeLock.unlock();
    }
  }

  public boolean isQueueFull() {
    readLock.lock();
    try {
      return this.queueCapacity > 0 &&
          this.queueLength >= this.queueCapacity;
    } finally {
      readLock.unlock();
    }
  }
}