CSQueueStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

public class CSQueueStore {
  //This map is the single source of truth, this will store ALL queues
  //using the queue path as the key
  private final Map<String, CSQueue> fullNameQueues = new HashMap<>();

  //this map will contain all short names and the paths they can be derived from
  //this set is required for remove operation to properly set the short name
  //mapping when the ambiguity is resolved.
  private final Map<String, Set<String>> shortNameToLongNames = new HashMap<>();

  //This map will store the result to the get calls to prevent unnecessary
  //checks, this will be updated on queue add / remove
  private final Map<String, CSQueue> getMap = new HashMap<>();

  //this lock will be used to make sure isAmbiguous can be called parallel
  //it will be only blocked during add / remove operations.
  private ReadWriteLock modificationLock = new ReentrantReadWriteLock();

  /**
   * This getter method will return an immutable map with all the queues with
   * queue path as the key.
   * @return Map containing all queues and having path as key
   */
  Map<String, CSQueue> getFullNameQueues() {
    return ImmutableMap.copyOf(fullNameQueues);
  }

  /**
   * This getter method will return an immutable map with all queues
   * which can be disambiguously referenced by short name, using short name
   * as the key.
   * @return Map containing queues and having short name as key
   */
  @VisibleForTesting
  Map<String, CSQueue> getShortNameQueues() {
    //this is not the most efficient way to create a short named list
    //but this method is only used in tests
    try {
      modificationLock.readLock().lock();
      return ImmutableMap.copyOf(
          fullNameQueues
              //getting all queues from path->queue map
              .entrySet()
              .stream()
              //filtering the list to contain only disambiguous short names
              .filter(
                  //keeping queues where get(queueShortname) == queue
                  //these are the ambigous references
                  entry -> getMap.get(entry.getValue().getQueueShortName())
                      == entry.getValue())
              //making a map from the stream
              .collect(
                  Collectors.toMap(
                      //using the queue's short name as key
                      entry->entry.getValue().getQueueShortName(),
                      //using the queue as value
                      entry->entry.getValue()))
      );
    } finally {
      modificationLock.readLock().unlock();
    }
  }

  /**
   * This method will update the getMap for the short name provided, depending
   * on how many queues are present with the same shortname.
   * @param shortName The short name of the queue to be updated
   */
  private void updateGetMapForShortName(String shortName) {
    //we protect the root, since root can be both a full path and a short name
    //we simply deny adding root as a shortname to the getMap.
    if (shortName.equals(CapacitySchedulerConfiguration.ROOT)) {
      return;
    }
    //getting all queues with the same short name
    Set<String> fullNames = this.shortNameToLongNames.get(shortName);

    //if there is only one queue we add it to the getMap
    if (fullNames != null && fullNames.size() == 1) {
      getMap.put(shortName,
          fullNameQueues.get(fullNames.iterator().next()));
    } else {
      //in all other cases using only shortName cannot disambigously identifiy
      //a queue
      getMap.remove(shortName);
    }
  }

  /**
   * Method for adding a queue to the store.
   * @param queue Queue to be added
   */
  public void add(CSQueue queue) {
    String fullName = queue.getQueuePath();
    String shortName = queue.getQueueShortName();

    try {
      modificationLock.writeLock().lock();

      fullNameQueues.put(fullName, queue);
      getMap.put(fullName, queue);

      //we only update short queue name ambiguity for non root queues
      if (!shortName.equals(CapacitySchedulerConfiguration.ROOT)) {
        //getting or creating the ambiguity set for the current queue
        Set<String> fullNamesSet =
            this.shortNameToLongNames.getOrDefault(shortName, new HashSet<>());

        //adding the full name to the queue
        fullNamesSet.add(fullName);
        this.shortNameToLongNames.put(shortName, fullNamesSet);
      }

      //updating the getMap references for the queue
      updateGetMapForShortName(shortName);
    } finally {
      modificationLock.writeLock().unlock();
    }
  }

  /**
   * Method for removing a queue from the store.
   * @param queue The queue to be removed
   */
  public void remove(CSQueue queue) {
    //if no queue is specified, we can consider it already removed,
    //also consistent with hashmap behaviour
    if (queue == null) {
      return;
    }
    try {
      modificationLock.writeLock().lock();

      String fullName = queue.getQueuePath();
      String shortName = queue.getQueueShortName();

      fullNameQueues.remove(fullName);
      getMap.remove(fullName);

      //we only update short queue name ambiguity for non root queues
      if (!shortName.equals(CapacitySchedulerConfiguration.ROOT)) {
        Set<String> fullNamesSet = this.shortNameToLongNames.get(shortName);
        fullNamesSet.remove(fullName);
        //if there are no more queues with the current short name, we simply
        //remove the set to free up some memory
        if (fullNamesSet.size() == 0) {
          this.shortNameToLongNames.remove(shortName);
        }
      }

      //updating the getMap references for the queue
      updateGetMapForShortName(shortName);

    } finally {
      modificationLock.writeLock().unlock();
    }
  }

  /**
   * Method for removing a queue from the store by name.
   * @param name A deterministic name for the queue to be removed
   */
  public void remove(String name) {
    CSQueue queue = get(name);
    if (queue != null) {
      remove(queue);
    }
  }

  /**
   * Returns a queue by looking it up by its fully qualified name.
   * @param fullName The full name/path of the queue
   * @return The queue or null if none found
   */
  CSQueue getByFullName(String fullName) {
    if (fullName == null) {
      return null;
    }

    try {
      modificationLock.readLock().lock();
      return fullNameQueues.getOrDefault(fullName, null);
    } finally {
      modificationLock.readLock().unlock();
    }
  }

  /**
   * Check for name ambiguity returns true, if there are at least two queues
   * with the same short name. Queue named "root" is protected, and it will
   * always return the root queue regardless of ambiguity.
   * @param shortName The short name to be checked for ambiguity
   * @return true if there are at least two queues found false otherwise
   */
  boolean isAmbiguous(String shortName) {
    if (shortName == null) {
      return false;
    }

    boolean ret = true;
    try {
      modificationLock.readLock().lock();
      Set<String> fullNamesSet = this.shortNameToLongNames.get(shortName);

      if (fullNamesSet == null || fullNamesSet.size() <= 1) {
        ret = false;
      }
    } finally {
      modificationLock.readLock().unlock();
    }

    return ret;
  }

  /**
   * Getter method for the queue it can find queues by both full and
   * short names.
   * @param name Full or short name of the queue
   * @return the queue
   */
  public CSQueue get(String name) {
    if (name == null) {
      return null;
    }
    try {
      modificationLock.readLock().lock();
      return getMap.getOrDefault(name, null);
    } finally {
      modificationLock.readLock().unlock();
    }
  }

  /**
   * Clears the store, removes all queue references.
   */
  public void clear() {
    try {
      modificationLock.writeLock().lock();
      fullNameQueues.clear();
      shortNameToLongNames.clear();
      getMap.clear();
    } finally {
      modificationLock.writeLock().unlock();
    }
  }

  /**
   * Returns all queues as a list.
   * @return List containing all the queues
   */
  public Collection<CSQueue> getQueues() {
    try {
      modificationLock.readLock().lock();
      return ImmutableList.copyOf(fullNameQueues.values());
    } finally {
      modificationLock.readLock().unlock();
    }
  }
}