MemoryPlacementConstraintManager.java

/*
 * *
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 * /
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * In memory implementation of the {@link PlacementConstraintManagerService}.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MemoryPlacementConstraintManager
    extends PlacementConstraintManagerService {

  private static final Logger LOG =
      LoggerFactory.getLogger(MemoryPlacementConstraintManager.class);

  private ReentrantReadWriteLock.ReadLock readLock;
  private ReentrantReadWriteLock.WriteLock writeLock;

  /**
   * Stores the global constraints that will be manipulated by the cluster
   * admin. The key of each entry is the tag that will enable the corresponding
   * constraint.
   */
  private Map<String, PlacementConstraint> globalConstraints;
  /**
   * Stores the constraints for each application, along with the allocation tags
   * that will enable each of the constraints for a given application.
   */
  private Map<ApplicationId, Map<String, PlacementConstraint>> appConstraints;

  public MemoryPlacementConstraintManager() {
    this.globalConstraints = new HashMap<>();
    this.appConstraints = new HashMap<>();
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    readLock = lock.readLock();
    writeLock = lock.writeLock();
  }

  @Override
  protected void serviceInit(Configuration conf) throws Exception {
    super.serviceInit(conf);
  }

  @Override
  public void registerApplication(ApplicationId appId,
      Map<Set<String>, PlacementConstraint> constraintMap) {
    // Check if app already exists. If not, prepare its constraint map.
    Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
    readLock.lock();
    try {
      if (appConstraints.get(appId) != null) {
        LOG.warn("Application {} has already been registered.", appId);
        return;
      }
      // Go over each sourceTag-constraint pair, validate it, and add it to the
      // constraint map for this app.
      for (Map.Entry<Set<String>, PlacementConstraint> entry : constraintMap
          .entrySet()) {
        Set<String> sourceTags = entry.getKey();
        PlacementConstraint constraint = entry.getValue();
        if (validateConstraint(sourceTags, constraint)) {
          String sourceTag = getValidSourceTag(sourceTags);
          constraintsForApp.put(sourceTag, constraint);
        }
      }
    } finally {
      readLock.unlock();
    }

    if (constraintsForApp.isEmpty()) {
      LOG.info("Application {} was registered, but no constraints were added.",
          appId);
    }
    // Update appConstraints.
    writeLock.lock();
    try {
      appConstraints.put(appId, constraintsForApp);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void addConstraint(ApplicationId appId, Set<String> sourceTags,
      PlacementConstraint placementConstraint, boolean replace) {
    writeLock.lock();
    try {
      Map<String, PlacementConstraint> constraintsForApp =
          appConstraints.get(appId);
      if (constraintsForApp == null) {
        LOG.info("Cannot add constraint to application {}, as it has not "
            + "been registered yet.", appId);
        return;
      }

      addConstraintToMap(constraintsForApp, sourceTags, placementConstraint,
          replace);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void addGlobalConstraint(Set<String> sourceTags,
      PlacementConstraint placementConstraint, boolean replace) {
    writeLock.lock();
    try {
      addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
          replace);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Helper method that adds a constraint to a map for a given source tag.
   * Assumes there is already a lock on the constraint map.
   *
   * @param constraintMap constraint map to which the constraint will be added
   * @param sourceTags the source tags that will enable this constraint
   * @param placementConstraint the new constraint to be added
   * @param replace if true, an existing constraint for these sourceTags will be
   *          replaced with the new one
   */
  private void addConstraintToMap(
      Map<String, PlacementConstraint> constraintMap, Set<String> sourceTags,
      PlacementConstraint placementConstraint, boolean replace) {
    if (validateConstraint(sourceTags, placementConstraint)) {
      String sourceTag = getValidSourceTag(sourceTags);
      if (constraintMap.get(sourceTag) == null || replace) {
        if (replace) {
          LOG.info("Replacing the constraint associated with tag {} with {}.",
              sourceTag, placementConstraint);
        }
        constraintMap.put(sourceTag, placementConstraint);
      } else {
        LOG.info("Constraint {} will not be added. There is already a "
                + "constraint associated with tag {}.",
            placementConstraint, sourceTag);
      }
    }
  }

  @Override
  public Map<Set<String>, PlacementConstraint> getConstraints(
      ApplicationId appId) {
    readLock.lock();
    try {
      if (appConstraints.get(appId) == null) {
        LOG.debug("Application {} is not registered in the Placement "
            + "Constraint Manager.", appId);
        return null;
      }

      // Copy to a new map and return an unmodifiable version of it.
      // Each key of the map is a set with a single source tag.
      Map<Set<String>, PlacementConstraint> constraintMap =
          appConstraints.get(appId).entrySet().stream()
              .collect(Collectors.toMap(
                  e -> Stream.of(e.getKey()).collect(Collectors.toSet()),
                  e -> e.getValue()));

      return Collections.unmodifiableMap(constraintMap);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public PlacementConstraint getConstraint(ApplicationId appId,
      Set<String> sourceTags) {
    if (!validateSourceTags(sourceTags)) {
      return null;
    }
    String sourceTag = getValidSourceTag(sourceTags);
    readLock.lock();
    try {
      if (appConstraints.get(appId) == null) {
        LOG.debug("Application {} is not registered in the Placement "
            + "Constraint Manager.", appId);
        return null;
      }
      // TODO: Merge this constraint with the global one for this tag, if one
      // exists.
      return appConstraints.get(appId).get(sourceTag);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public PlacementConstraint getGlobalConstraint(Set<String> sourceTags) {
    if (!validateSourceTags(sourceTags)) {
      return null;
    }
    String sourceTag = getValidSourceTag(sourceTags);
    readLock.lock();
    try {
      return globalConstraints.get(sourceTag);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public PlacementConstraint getMultilevelConstraint(ApplicationId appId,
      Set<String> sourceTags, PlacementConstraint schedulingRequestConstraint) {
    List<PlacementConstraint> constraints = new ArrayList<>();
    // Add scheduling request-level constraint.
    if (schedulingRequestConstraint != null) {
      constraints.add(schedulingRequestConstraint);
    }
    // Add app-level constraint if appId is given.
    if (appId != null && sourceTags != null
        && !sourceTags.isEmpty()) {
      constraints.add(getConstraint(appId, sourceTags));
    }
    // Add global constraint.
    if (sourceTags != null && !sourceTags.isEmpty()) {
      constraints.add(getGlobalConstraint(sourceTags));
    }

    // Remove all null or duplicate constraints.
    List<PlacementConstraint.AbstractConstraint> allConstraints =
        constraints.stream()
            .filter(placementConstraint -> placementConstraint != null
            && placementConstraint.getConstraintExpr() != null)
            .map(PlacementConstraint::getConstraintExpr)
            .distinct()
            .collect(Collectors.toList());

    // Compose an AND constraint
    // When merge request(RC), app(AC) and global constraint(GC),
    // we do a merge on them with CC=AND(GC, AC, RC) and returns a
    // composite AND constraint. Subsequently we check if CC could
    // be satisfied. This ensures that every level of constraint
    // is satisfied.
    PlacementConstraint.And andConstraint = PlacementConstraints.and(
        allConstraints.toArray(new PlacementConstraint
            .AbstractConstraint[allConstraints.size()]));
    return andConstraint.build();
  }

  @Override
  public void unregisterApplication(ApplicationId appId) {
    writeLock.lock();
    try {
      appConstraints.remove(appId);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void removeGlobalConstraint(Set<String> sourceTags) {
    if (!validateSourceTags(sourceTags)) {
      return;
    }
    String sourceTag = getValidSourceTag(sourceTags);
    writeLock.lock();
    try {
      globalConstraints.remove(sourceTag);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public int getNumRegisteredApplications() {
    readLock.lock();
    try {
      return appConstraints.size();
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public int getNumGlobalConstraints() {
    readLock.lock();
    try {
      return globalConstraints.size();
    } finally {
      readLock.unlock();
    }
  }
}