ContainersMonitorImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;

import java.util.Arrays;
import java.io.File;
import java.util.Map;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Monitors containers collecting resource usage and preempting the container
 * if it exceeds its limits.
 */
public class ContainersMonitorImpl extends AbstractService implements
    ContainersMonitor {

  private final static Logger LOG =
       LoggerFactory.getLogger(ContainersMonitorImpl.class);
  private final static Logger AUDITLOG =
       LoggerFactory.getLogger(ContainersMonitorImpl.class.getName()+".audit");

  private long monitoringInterval;
  private MonitoringThread monitoringThread;
  private int logCheckInterval;
  private LogMonitorThread logMonitorThread;
  private long logDirSizeLimit;
  private long logTotalSizeLimit;
  private CGroupElasticMemoryController oomListenerThread;
  private boolean containerMetricsEnabled;
  private long containerMetricsPeriodMs;
  private long containerMetricsUnregisterDelayMs;

  @VisibleForTesting
  final Map<ContainerId, ProcessTreeInfo> trackingContainers =
      new ConcurrentHashMap<>();

  private final ContainerExecutor containerExecutor;
  private final Dispatcher eventDispatcher;
  private final Context context;
  private ResourceCalculatorPlugin resourceCalculatorPlugin;
  private Configuration conf;
  private static float vmemRatio;
  private Class<? extends ResourceCalculatorProcessTree> processTreeClass;

  /** Maximum virtual memory in bytes. */
  private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
  /** Maximum physical memory in bytes. */
  private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;

  private boolean pmemCheckEnabled;
  private boolean vmemCheckEnabled;
  private boolean elasticMemoryEnforcement;
  private boolean strictMemoryEnforcement;
  private boolean containersMonitorEnabled;
  private boolean logMonitorEnabled;

  private long maxVCoresAllottedForContainers;

  private static final long UNKNOWN_MEMORY_LIMIT = -1L;
  private int nodeCpuPercentageForYARN;

  /**
   * Type of container metric.
   */
  @Private
  public enum ContainerMetric {
    CPU, MEMORY
  }

  private ResourceUtilization containersUtilization;

  private volatile boolean stopped = false;

  public ContainersMonitorImpl(ContainerExecutor exec,
      AsyncDispatcher dispatcher, Context context) {
    super("containers-monitor");

    this.containerExecutor = exec;
    this.eventDispatcher = dispatcher;
    this.context = context;

    this.monitoringThread = new MonitoringThread();

    this.logMonitorThread = new LogMonitorThread();

    this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
  }

  @Override
  protected void serviceInit(Configuration myConf) throws Exception {
    this.conf = myConf;
    this.monitoringInterval =
        this.conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
            this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
                YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));

    this.logCheckInterval =
        conf.getInt(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
            YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MON_INTERVAL_MS);
    this.logDirSizeLimit =
        conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES,
            YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES);
    this.logTotalSizeLimit =
        conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES,
            YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES);

    this.resourceCalculatorPlugin =
        ResourceCalculatorPlugin.getContainersMonitorPlugin(this.conf);
    LOG.info("Using ResourceCalculatorPlugin: {}",
        this.resourceCalculatorPlugin);
    processTreeClass = this.conf.getClass(
            YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
            ResourceCalculatorProcessTree.class);
    LOG.info("Using ResourceCalculatorProcessTree: {}", this.processTreeClass);

    this.containerMetricsEnabled =
        this.conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
            YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
    this.containerMetricsPeriodMs =
        this.conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
            YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
    this.containerMetricsUnregisterDelayMs = this.conf.getLong(
        YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
        YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);

    long configuredPMemForContainers =
        NodeManagerHardwareUtils.getContainerMemoryMB(
            this.resourceCalculatorPlugin, this.conf);

    int configuredVCoresForContainers =
        NodeManagerHardwareUtils.getVCores(
            this.resourceCalculatorPlugin, this.conf);

    // ///////// Virtual memory configuration //////
    vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
        YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
    Preconditions.checkArgument(vmemRatio > 0.99f,
        YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");

    // Setting these irrespective of whether checks are enabled.
    // Required in the UI.
    Resource resourcesForContainers = Resource.newInstance(
        configuredPMemForContainers, configuredVCoresForContainers);
    setAllocatedResourcesForContainers(resourcesForContainers);

    pmemCheckEnabled = this.conf.getBoolean(
        YarnConfiguration.NM_PMEM_CHECK_ENABLED,
        YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
    vmemCheckEnabled = this.conf.getBoolean(
        YarnConfiguration.NM_VMEM_CHECK_ENABLED,
        YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
    elasticMemoryEnforcement = this.conf.getBoolean(
        YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,
        YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);
    strictMemoryEnforcement = conf.getBoolean(
        YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
        YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
    LOG.info("Physical memory check enabled: {}", pmemCheckEnabled);
    LOG.info("Virtual memory check enabled: {}", vmemCheckEnabled);
    LOG.info("Elastic memory control enabled: {}", elasticMemoryEnforcement);
    LOG.info("Strict memory control enabled: {}", strictMemoryEnforcement);

    if (elasticMemoryEnforcement) {
      if (!CGroupElasticMemoryController.isAvailable()) {
        // Test for availability outside the constructor
        // to be able to write non-Linux unit tests for
        // CGroupElasticMemoryController
        throw new YarnException(
            "CGroup Elastic Memory controller enabled but " +
            "it is not available. Exiting.");
      } else {
        this.oomListenerThread = new CGroupElasticMemoryController(
            conf,
            context,
            ResourceHandlerModule.getCGroupsHandler(),
            pmemCheckEnabled,
            vmemCheckEnabled,
            pmemCheckEnabled ?
                maxPmemAllottedForContainers : maxVmemAllottedForContainers
        );
      }
    }

    containersMonitorEnabled =
        isContainerMonitorEnabled() && monitoringInterval > 0;
    LOG.info("ContainersMonitor enabled: {}", containersMonitorEnabled);

    logMonitorEnabled =
            conf.getBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
                    YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MONITOR_ENABLED);
    LOG.info("Container Log Monitor Enabled: "+ logMonitorEnabled);

    nodeCpuPercentageForYARN =
        NodeManagerHardwareUtils.getNodeCpuPercentage(this.conf);

    if (pmemCheckEnabled) {
      // Logging if actual pmem cannot be determined.
      long totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
      if (this.resourceCalculatorPlugin != null) {
        totalPhysicalMemoryOnNM = this.resourceCalculatorPlugin
            .getPhysicalMemorySize();
        if (totalPhysicalMemoryOnNM <= 0) {
          LOG.warn("NodeManager's totalPmem could not be calculated. "
              + "Setting it to {}", UNKNOWN_MEMORY_LIMIT);
          totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;
        }
      }

      if (totalPhysicalMemoryOnNM != UNKNOWN_MEMORY_LIMIT &&
          this.maxPmemAllottedForContainers > totalPhysicalMemoryOnNM * 0.80f) {
        LOG.warn(
            "NodeManager configured with {} physical memory allocated to " +
            "containers, which is more than 80% of the total physical memory " +
            "available ({}). Thrashing might happen.",
            TraditionalBinaryPrefix.long2String(
                maxPmemAllottedForContainers, "B", 1),
            TraditionalBinaryPrefix.long2String(
                totalPhysicalMemoryOnNM, "B", 1));
      }
    }
    super.serviceInit(this.conf);
  }

  private boolean isContainerMonitorEnabled() {
    return conf.getBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED,
        YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED);
  }

  /**
   * Get the best process tree calculator.
   * @param pId container process id
   * @return process tree calculator
   */
  private ResourceCalculatorProcessTree
      getResourceCalculatorProcessTree(String pId) {
    return ResourceCalculatorProcessTree.
        getResourceCalculatorProcessTree(
            pId, processTreeClass, conf);
  }

  private boolean isResourceCalculatorAvailable() {
    if (resourceCalculatorPlugin == null) {
      LOG.info("ResourceCalculatorPlugin is unavailable on this system. "
          + "{} is disabled.", this.getClass().getName());
      return false;
    }
    if (getResourceCalculatorProcessTree("1") == null) {
      LOG.info("ResourceCalculatorProcessTree is unavailable on this system. "
          + "{} is disabled.", this.getClass().getName());
      return false;
    }
    return true;
  }

  @Override
  protected void serviceStart() throws Exception {
    if (containersMonitorEnabled) {
      this.monitoringThread.start();
    }
    if (oomListenerThread != null) {
      oomListenerThread.start();
    }
    if (logMonitorEnabled) {
      this.logMonitorThread.start();
    }
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    stopped = true;
    if (containersMonitorEnabled) {
      this.monitoringThread.interrupt();
      try {
        this.monitoringThread.join();
      } catch (InterruptedException e) {
        LOG.info("ContainersMonitorImpl monitoring thread interrupted");
      }
      if (this.oomListenerThread != null) {
        this.oomListenerThread.stopListening();
        try {
          this.oomListenerThread.join();
        } finally {
          this.oomListenerThread = null;
        }
      }
    }
    if (logMonitorEnabled) {
      this.logMonitorThread.interrupt();
      try {
        this.logMonitorThread.join();
      } catch (InterruptedException e) {
      }
    }
    super.serviceStop();
  }

  /**
   * Encapsulates resource requirements of a process and its tree.
   */
  public static class ProcessTreeInfo {
    private ContainerId containerId;
    private String pid;
    private ResourceCalculatorProcessTree pTree;
    private long vmemLimit;
    private long pmemLimit;
    private int cpuVcores;

    public ProcessTreeInfo(ContainerId containerId, String pid,
        ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit,
        int cpuVcores) {
      this.containerId = containerId;
      this.pid = pid;
      this.pTree = pTree;
      this.vmemLimit = vmemLimit;
      this.pmemLimit = pmemLimit;
      this.cpuVcores = cpuVcores;
    }

    public ContainerId getContainerId() {
      return this.containerId;
    }

    public String getPID() {
      return this.pid;
    }

    public void setPid(String pid) {
      this.pid = pid;
    }

    ResourceCalculatorProcessTree getProcessTree() {
      return this.pTree;
    }

    void setProcessTree(ResourceCalculatorProcessTree mypTree) {
      this.pTree = mypTree;
    }

    /**
     * @return Virtual memory limit for the process tree in bytes
     */
    public synchronized long getVmemLimit() {
      return this.vmemLimit;
    }

    /**
     * @return Physical memory limit for the process tree in bytes
     */
    public synchronized long getPmemLimit() {
      return this.pmemLimit;
    }

    /**
     * @return Number of cpu vcores assigned
     */
    public synchronized int getCpuVcores() {
      return this.cpuVcores;
    }

    /**
     * Set resource limit for enforcement.
     * @param myPmemLimit
     *          Physical memory limit for the process tree in bytes
     * @param myVmemLimit
     *          Virtual memory limit for the process tree in bytes
     * @param myCpuVcores
     *          Number of cpu vcores assigned
     */
    synchronized void setResourceLimit(
        long myPmemLimit, long myVmemLimit, int myCpuVcores) {
      this.pmemLimit = myPmemLimit;
      this.vmemLimit = myVmemLimit;
      this.cpuVcores = myCpuVcores;
    }
  }

  /**
   * Check whether a container's process tree's current memory usage is over
   * limit.
   *
   * When a java process exec's a program, it could momentarily account for
   * double the size of it's memory, because the JVM does a fork()+exec()
   * which at fork time creates a copy of the parent's memory. If the
   * monitoring thread detects the memory used by the container tree at the
   * same instance, it could assume it is over limit and kill the tree, for no
   * fault of the process itself.
   *
   * We counter this problem by employing a heuristic check: - if a process
   * tree exceeds the memory limit by more than twice, it is killed
   * immediately - if a process tree has processes older than the monitoring
   * interval exceeding the memory limit by even 1 time, it is killed. Else it
   * is given the benefit of doubt to lie around for one more iteration.
   *
   * @param containerId
   *          Container Id for the container tree
   * @param currentMemUsage
   *          Memory usage of a container tree
   * @param curMemUsageOfAgedProcesses
   *          Memory usage of processes older than an iteration in a container
   *          tree
   * @param memLimit
   *          The limit specified for the container
   * @return true if the memory usage is more than twice the specified limit,
   *         or if processes in the tree, older than this thread's monitoring
   *         interval, exceed the memory limit. False, otherwise.
   */
  private boolean isProcessTreeOverLimit(String containerId,
                                  long currentMemUsage,
                                  long curMemUsageOfAgedProcesses,
                                  long memLimit) {
    boolean isOverLimit = false;

    if (currentMemUsage > (2 * memLimit)) {
      LOG.warn("Process tree for container: {} running over twice "
          + "the configured limit. Limit={}, current usage = {}",
          containerId, memLimit, currentMemUsage);
      isOverLimit = true;
    } else if (curMemUsageOfAgedProcesses > memLimit) {
      LOG.warn("Process tree for container: {} has processes older than 1 "
          + "iteration running over the configured limit. "
          + "Limit={}, current usage = {}",
          containerId, memLimit, curMemUsageOfAgedProcesses);
      isOverLimit = true;
    }

    return isOverLimit;
  }

  // method provided just for easy testing purposes
  boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree,
      String containerId, long limit) {
    long currentMemUsage = pTree.getVirtualMemorySize();
    // as processes begin with an age 1, we want to see if there are processes
    // more than 1 iteration old.
    long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
    return isProcessTreeOverLimit(containerId, currentMemUsage,
                                  curMemUsageOfAgedProcesses, limit);
  }

  private class MonitoringThread extends Thread {
    MonitoringThread() {
      super("Container Monitor");
    }

    @Override
    public void run() {

      while (!stopped && !Thread.currentThread().isInterrupted()) {
        long start = Time.monotonicNow();
        // Print the processTrees for debugging.
        if (LOG.isDebugEnabled()) {
          StringBuilder tmp = new StringBuilder("[ ");
          for (ProcessTreeInfo p : trackingContainers.values()) {
            tmp.append(p.getPID());
            tmp.append(" ");
          }
          tmp.append("]");
          LOG.debug("Current ProcessTree list : {}", tmp);
        }

        // Temporary structure to calculate the total resource utilization of
        // the containers
        ResourceUtilization trackedContainersUtilization  =
            ResourceUtilization.newInstance(0, 0, 0.0f);

        // Now do the monitoring for the trackingContainers
        // Check memory usage and kill any overflowing containers
        long vmemUsageByAllContainers = 0;
        long pmemByAllContainers = 0;
        long cpuUsagePercentPerCoreByAllContainers = 0;
        for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
            .entrySet()) {
          ContainerId containerId = entry.getKey();
          ProcessTreeInfo ptInfo = entry.getValue();
          try {
            // Initialize uninitialized process trees
            initializeProcessTrees(entry);

            String pId = ptInfo.getPID();
            if (pId == null || !isResourceCalculatorAvailable()) {
              continue; // processTree cannot be tracked
            }
            LOG.debug(
                "Constructing ProcessTree for : PID = {} ContainerId = {}",
                pId, containerId);
            ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
            pTree.updateProcessTree();    // update process-tree
            long currentVmemUsage = pTree.getVirtualMemorySize();
            long currentPmemUsage = pTree.getRssMemorySize();
            if (currentVmemUsage < 0 || currentPmemUsage < 0) {
              // YARN-6862/YARN-5021 If the container just exited or for
              // another reason the physical/virtual memory is UNAVAILABLE (-1)
              // the values shouldn't be aggregated.
              LOG.info("Skipping monitoring container {} because "
                  + "memory usage is not available.", containerId);
              continue;
            }

            // if machine has 6 cores and 3 are used,
            // cpuUsagePercentPerCore should be 300%
            float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
            if (cpuUsagePercentPerCore < 0) {
              // CPU usage is not available likely because the container just
              // started. Let us skip this turn and consider this container
              // in the next iteration.
              LOG.info("Skipping monitoring container {} since "
                  + "CPU usage is not yet available.", containerId);
              continue;
            }

            recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
                    currentPmemUsage, trackedContainersUtilization);

            checkLimit(containerId, pId, pTree, ptInfo,
                    currentVmemUsage, currentPmemUsage);

            // Accounting the total memory in usage for all containers
            vmemUsageByAllContainers += currentVmemUsage;
            pmemByAllContainers += currentPmemUsage;
            // Accounting the total cpu usage for all containers
            cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore;

            reportResourceUsage(containerId, currentPmemUsage,
                    cpuUsagePercentPerCore);
          } catch (Exception e) {
            // Log the exception and proceed to the next container.
            LOG.warn("Uncaught exception in ContainersMonitorImpl "
                + "while monitoring resource of {}", containerId, e);
          }
        }
        LOG.debug("Total Resource Usage stats in NM by all containers : "
            + "Virtual Memory= {}, Physical Memory= {}, "
            + "Total CPU usage(% per core)= {}", vmemUsageByAllContainers,
            pmemByAllContainers, cpuUsagePercentPerCoreByAllContainers);


        // Save the aggregated utilization of the containers
        setContainersUtilization(trackedContainersUtilization);

        long duration = Time.monotonicNow() - start;
        LOG.debug("Finished monitoring container cost {} ms", duration);

        // Publish the container utilization metrics to node manager
        // metrics system.
        NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics();
        if (nmMetrics != null) {
          nmMetrics.setContainerUsedMemGB(
              trackedContainersUtilization.getPhysicalMemory());
          nmMetrics.setContainerUsedVMemGB(
              trackedContainersUtilization.getVirtualMemory());
          nmMetrics.setContainerCpuUtilization(
              trackedContainersUtilization.getCPU());
          nmMetrics.addContainerMonitorCostTime(duration);
        }

        try {
          Thread.sleep(monitoringInterval);
        } catch (InterruptedException e) {
          LOG.warn("{} is interrupted. Exiting.",
              ContainersMonitorImpl.class.getName());
          break;
        }
      }
    }

    /**
     * Initialize any uninitialized processTrees.
     * @param entry process tree entry to fill in
     */
    private void initializeProcessTrees(
            Entry<ContainerId, ProcessTreeInfo> entry)
        throws ContainerExecutionException {
      ContainerId containerId = entry.getKey();
      ProcessTreeInfo ptInfo = entry.getValue();
      String pId = ptInfo.getPID();

      // Initialize any uninitialized processTrees
      if (pId == null) {
        // get pid from ContainerId
        pId = containerExecutor.getProcessId(ptInfo.getContainerId());
        if (pId != null) {
          // pId will be null, either if the container is not spawned yet
          // or if the container's pid is removed from ContainerExecutor
          LOG.debug("Tracking ProcessTree {} for the first time", pId);
          ResourceCalculatorProcessTree pt =
              getResourceCalculatorProcessTree(pId);
          ptInfo.setPid(pId);
          ptInfo.setProcessTree(pt);

          if (containerMetricsEnabled) {
            ContainerMetrics usageMetrics = ContainerMetrics
                    .forContainer(containerId, containerMetricsPeriodMs,
                      containerMetricsUnregisterDelayMs);
            usageMetrics.recordProcessId(pId);
          }

          Container container = context.getContainers().get(containerId);

          if (container != null) {
            String[] ipAndHost = containerExecutor.getIpAndHost(container);

            if ((ipAndHost != null) && (ipAndHost[0] != null) &&
                (ipAndHost[1] != null)) {
              container.setIpAndHost(ipAndHost);
              LOG.info("{}'s ip = {}, and hostname = {}",
                  containerId, ipAndHost[0], ipAndHost[1]);
            } else {
              LOG.info("Can not get both ip and hostname: {}",
                  Arrays.toString(ipAndHost));
            }
            String exposedPorts = containerExecutor.getExposedPorts(container);
            container.setExposedPorts(exposedPorts);
          } else {
            LOG.info("{} is missing. Not setting ip and hostname", containerId);
          }
        }
      }
      // End of initializing any uninitialized processTrees
    }

    /**
     * Record usage metrics.
     * @param containerId container id
     * @param pId process id
     * @param pTree valid process tree entry with CPU measurement
     * @param ptInfo process tree info with limit information
     * @param currentVmemUsage virtual memory measurement
     * @param currentPmemUsage physical memory measurement
     * @param trackedContainersUtilization utilization tracker to update
     */
    private void recordUsage(ContainerId containerId, String pId,
                             ResourceCalculatorProcessTree pTree,
                             ProcessTreeInfo ptInfo,
                             long currentVmemUsage, long currentPmemUsage,
                             ResourceUtilization trackedContainersUtilization) {
      // if machine has 6 cores and 3 are used,
      // cpuUsagePercentPerCore should be 300% and
      // cpuUsageTotalCoresPercentage should be 50%
      float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
      float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
              resourceCalculatorPlugin.getNumProcessors();

      // Multiply by 1000 to avoid losing data when converting to int
      int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
              * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
      long vmemLimit = ptInfo.getVmemLimit();
      long pmemLimit = ptInfo.getPmemLimit();
      if (AUDITLOG.isDebugEnabled()) {
        int vcoreLimit = ptInfo.getCpuVcores();
        long cumulativeCpuTime = pTree.getCumulativeCpuTime();
        AUDITLOG.debug(
            "Resource usage of ProcessTree {} for container-id {}:" +
            " {} %CPU: {} %CPU-cores: {}" +
            " vCores-used: {} of {} Cumulative-CPU-ms: {}",
            pId, containerId,
            formatUsageString(
                currentVmemUsage, vmemLimit,
                currentPmemUsage, pmemLimit),
            cpuUsagePercentPerCore,
            cpuUsageTotalCoresPercentage,
            milliVcoresUsed / 1000, vcoreLimit,
            cumulativeCpuTime);
      }

      // Add resource utilization for this container
      trackedContainersUtilization.addTo(
              (int) (currentPmemUsage >> 20),
              (int) (currentVmemUsage >> 20),
              milliVcoresUsed / 1000.0f);

      // Add usage to container metrics
      if (containerMetricsEnabled) {
        ContainerMetrics.forContainer(
                containerId, containerMetricsPeriodMs,
                containerMetricsUnregisterDelayMs).recordMemoryUsage(
                (int) (currentPmemUsage >> 20));
        ContainerMetrics.forContainer(
                containerId, containerMetricsPeriodMs,
                containerMetricsUnregisterDelayMs).recordCpuUsage((int)
                cpuUsagePercentPerCore, milliVcoresUsed);
      }
    }

    /**
     * Check resource limits and take actions if the limit is exceeded.
     * @param containerId container id
     * @param pId process id
     * @param pTree valid process tree entry with CPU measurement
     * @param ptInfo process tree info with limit information
     * @param currentVmemUsage virtual memory measurement
     * @param currentPmemUsage physical memory measurement
     */
    @SuppressWarnings("unchecked")
    private void checkLimit(ContainerId containerId, String pId,
                            ResourceCalculatorProcessTree pTree,
                            ProcessTreeInfo ptInfo,
                            long currentVmemUsage,
                            long currentPmemUsage) {
      if (strictMemoryEnforcement && !elasticMemoryEnforcement) {
        // When cgroup-based strict memory enforcement is used alone without
        // elastic memory control, the oom-kill would take care of it.
        // However, when elastic memory control is also enabled, the oom killer
        // would be disabled at the root yarn container cgroup level (all child
        // cgroups would inherit that setting). Hence, we fall back to the
        // polling-based mechanism.
        return;
      }
      boolean isMemoryOverLimit = false;
      String msg = "";
      int containerExitStatus = ContainerExitStatus.INVALID;

      long vmemLimit = ptInfo.getVmemLimit();
      long pmemLimit = ptInfo.getPmemLimit();
      // as processes begin with an age 1, we want to see if there
      // are processes more than 1 iteration old.
      long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
      long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
      if (isVmemCheckEnabled()
          && isProcessTreeOverLimit(containerId.toString(),
          currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
        // The current usage (age=0) is always higher than the aged usage. We
        // do not show the aged size in the message, base the delta on the
        // current usage
        long delta = currentVmemUsage - vmemLimit;
        // Container (the root process) is still alive and overflowing
        // memory.
        // Dump the process-tree and then clean it up.
        msg = formatErrorMessage("virtual",
            formatUsageString(currentVmemUsage, vmemLimit,
                currentPmemUsage, pmemLimit),
            pId, containerId, pTree, delta);
        isMemoryOverLimit = true;
        containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
      } else if (isPmemCheckEnabled()
          && isProcessTreeOverLimit(containerId.toString(),
          currentPmemUsage, curRssMemUsageOfAgedProcesses,
          pmemLimit)) {
        // The current usage (age=0) is always higher than the aged usage. We
        // do not show the aged size in the message, base the delta on the
        // current usage
        long delta = currentPmemUsage - pmemLimit;
        // Container (the root process) is still alive and overflowing
        // memory.
        // Dump the process-tree and then clean it up.
        msg = formatErrorMessage("physical",
            formatUsageString(currentVmemUsage, vmemLimit,
                currentPmemUsage, pmemLimit),
            pId, containerId, pTree, delta);
        isMemoryOverLimit = true;
        containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
      }

      if (isMemoryOverLimit
          && trackingContainers.remove(containerId) != null) {
        // Virtual or physical memory over limit. Fail the container and
        // remove
        // the corresponding process tree
        LOG.warn(msg);
        // warn if not a leader
        if (!pTree.checkPidPgrpidForMatch()) {
          LOG.error("Killed container process with PID {} "
              + "but it is not a process group leader.", pId);
        }
        // kill the container
        eventDispatcher.getEventHandler().handle(
                new ContainerKillEvent(containerId,
                      containerExitStatus, msg));
        LOG.info("Removed ProcessTree with root {}", pId);
      }
    }

    /**
     * Report usage metrics to the timeline service.
     * @param containerId container id
     * @param currentPmemUsage physical memory measurement
     * @param cpuUsagePercentPerCore CPU usage
     */
    private void reportResourceUsage(ContainerId containerId,
        long currentPmemUsage, float cpuUsagePercentPerCore) {
      ContainerImpl container =
              (ContainerImpl) context.getContainers().get(containerId);
      if (container != null) {
        NMTimelinePublisher nmMetricsPublisher =
                container.getNMTimelinePublisher();
        if (nmMetricsPublisher != null) {
          nmMetricsPublisher.reportContainerResourceUsage(container,
                  currentPmemUsage, cpuUsagePercentPerCore);
        }
      } else {
        LOG.info("{} does not exist to report", containerId);
      }
    }

    /**
     * Format string when memory limit has been exceeded.
     * @param memTypeExceeded type of memory
     * @param usageString general memory usage information string
     * @param pId process id
     * @param containerId container id
     * @param pTree process tree to dump full resource utilization graph
     * @return formatted resource usage information
     */
    private String formatErrorMessage(String memTypeExceeded,
        String usageString, String pId, ContainerId containerId,
        ResourceCalculatorProcessTree pTree, long delta) {
      return
        String.format("Container [pid=%s,containerID=%s] is " +
            "running %dB beyond the '%S' memory limit. ",
            pId, containerId, delta, memTypeExceeded) +
        "Current usage: " + usageString +
        ". Killing container.\n" +
        "Dump of the process-tree for " + containerId + " :\n" +
        pTree.getProcessTreeDump();
    }

    /**
     * Format memory usage string for reporting.
     * @param currentVmemUsage virtual memory usage
     * @param vmemLimit virtual memory limit
     * @param currentPmemUsage physical memory usage
     * @param pmemLimit physical memory limit
     * @return formatted memory information
     */
    private String formatUsageString(long currentVmemUsage, long vmemLimit,
        long currentPmemUsage, long pmemLimit) {
      return String.format("%sB of %sB physical memory used; " +
          "%sB of %sB virtual memory used",
          TraditionalBinaryPrefix.long2String(currentPmemUsage, "", 1),
          TraditionalBinaryPrefix.long2String(pmemLimit, "", 1),
          TraditionalBinaryPrefix.long2String(currentVmemUsage, "", 1),
          TraditionalBinaryPrefix.long2String(vmemLimit, "", 1));
    }
  }

  private class LogMonitorThread extends Thread {
    LogMonitorThread() {
      super("Container Log Monitor");
    }

    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        for (Entry<ContainerId, ProcessTreeInfo> entry :
            trackingContainers.entrySet()) {
          ContainerId containerId = entry.getKey();
          ProcessTreeInfo ptInfo = entry.getValue();
          Container container = context.getContainers().get(containerId);
          if (container == null) {
            continue;
          }
          try {
            List<File> logDirs = ContainerLogsUtils.getContainerLogDirs(
                containerId, container.getUser(), context);
            long totalLogDataBytes = 0;
            for (File dir : logDirs) {
              long currentDirSizeBytes = FileUtil.getDU(dir);
              totalLogDataBytes += currentDirSizeBytes;
              String killMsg = null;
              if (currentDirSizeBytes > logDirSizeLimit) {
                killMsg = String.format(
                    "Container [pid=%s,containerID=%s] is logging beyond "
                        + "the container single log directory limit.%n"
                        + "Limit: %d Log Directory Size: %d Log Directory: %s"
                        + "%nKilling container.%n",
                    ptInfo.getPID(), containerId, logDirSizeLimit,
                    currentDirSizeBytes, dir);
              } else if (totalLogDataBytes > logTotalSizeLimit) {
                killMsg = String.format(
                    "Container [pid=%s,containerID=%s] is logging beyond "
                        + "the container total log limit.%n"
                        + "Limit: %d Total Size: >=%d"
                        + "%nKilling container.%n",
                    ptInfo.getPID(), containerId, logTotalSizeLimit,
                    totalLogDataBytes);
              }
              if (killMsg != null
                  && trackingContainers.remove(containerId) != null) {
                LOG.warn(killMsg);
                eventDispatcher.getEventHandler().handle(
                    new ContainerKillEvent(containerId,
                        ContainerExitStatus.KILLED_FOR_EXCESS_LOGS, killMsg));
                LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
                break;
              }
            }
          } catch (Exception e) {
            LOG.warn("Uncaught exception in ContainerMemoryManager "
                + "while monitoring log usage for " + containerId, e);
          }
        }
        try {
          Thread.sleep(logCheckInterval);
        } catch (InterruptedException e) {
          LOG.info("Log monitor thread was interrupted. "
              + "Stopping container log monitoring.");
        }
      }
    }
  }

  private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
    if (!containerMetricsEnabled || monitoringEvent == null) {
      return;
    }

    ContainerId containerId = monitoringEvent.getContainerId();
    ContainerMetrics usageMetrics;

    int vmemLimitMBs;
    int pmemLimitMBs;
    int cpuVcores;
    switch (monitoringEvent.getType()) {
    case START_MONITORING_CONTAINER:
      usageMetrics = ContainerMetrics
          .forContainer(containerId, containerMetricsPeriodMs,
          containerMetricsUnregisterDelayMs);
      ContainerStartMonitoringEvent startEvent =
          (ContainerStartMonitoringEvent) monitoringEvent;
      usageMetrics.recordStateChangeDurations(
          startEvent.getLaunchDuration(),
          startEvent.getLocalizationDuration());
      cpuVcores = startEvent.getCpuVcores();
      vmemLimitMBs = (int) (startEvent.getVmemLimit() >> 20);
      pmemLimitMBs = (int) (startEvent.getPmemLimit() >> 20);
      usageMetrics.recordResourceLimit(
          vmemLimitMBs, pmemLimitMBs, cpuVcores);
      break;
    case STOP_MONITORING_CONTAINER:
      ContainerStopMonitoringEvent stopEvent =
          (ContainerStopMonitoringEvent) monitoringEvent;
      usageMetrics = ContainerMetrics.getContainerMetrics(
          containerId);
      if (usageMetrics != null) {
        usageMetrics.finished(stopEvent.isForReInit());
      }
      break;
    case CHANGE_MONITORING_CONTAINER_RESOURCE:
      usageMetrics = ContainerMetrics
          .forContainer(containerId, containerMetricsPeriodMs,
          containerMetricsUnregisterDelayMs);
      ChangeMonitoringContainerResourceEvent changeEvent =
          (ChangeMonitoringContainerResourceEvent) monitoringEvent;
      Resource resource = changeEvent.getResource();
      pmemLimitMBs = (int) resource.getMemorySize();
      vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio);
      cpuVcores = resource.getVirtualCores();
      usageMetrics.recordResourceLimit(
          vmemLimitMBs, pmemLimitMBs, cpuVcores);
      break;
    default:
      break;
    }
  }

  @Override
  public long getVmemAllocatedForContainers() {
    return this.maxVmemAllottedForContainers;
  }

  /**
   * Is the total physical memory check enabled?
   *
   * @return true if total physical memory check is enabled.
   */
  @Override
  public boolean isPmemCheckEnabled() {
    return this.pmemCheckEnabled;
  }

  @Override
  public long getPmemAllocatedForContainers() {
    return this.maxPmemAllottedForContainers;
  }

  @Override
  public long getVCoresAllocatedForContainers() {
    return this.maxVCoresAllottedForContainers;
  }

  @Override
  public void setAllocatedResourcesForContainers(final Resource resource) {
    LOG.info("Setting the resources allocated to containers to {}", resource);
    this.maxVCoresAllottedForContainers = resource.getVirtualCores();
    this.maxPmemAllottedForContainers = convertMBytesToBytes(
        resource.getMemorySize());
    this.maxVmemAllottedForContainers =
        (long) (getVmemRatio() * maxPmemAllottedForContainers);
  }

  /**
   * Is the total virtual memory check enabled?
   *
   * @return true if total virtual memory check is enabled.
   */
  @Override
  public boolean isVmemCheckEnabled() {
    return this.vmemCheckEnabled;
  }

  @Override
  public ResourceUtilization getContainersUtilization() {
    return this.containersUtilization;
  }

  private void setContainersUtilization(ResourceUtilization utilization) {
    this.containersUtilization = utilization;
  }

  @Override
  public void subtractNodeResourcesFromResourceUtilization(
      ResourceUtilization resourceUtil) {
    resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20),
        (int) (getVmemAllocatedForContainers() >> 20),
        getVCoresAllocatedForContainers());
  }

  @Override
  public float getVmemRatio() {
    return vmemRatio;
  }

  @Override
  @SuppressWarnings("unchecked")
  public void handle(ContainersMonitorEvent monitoringEvent) {
    ContainerId containerId = monitoringEvent.getContainerId();

    switch (monitoringEvent.getType()) {
    case START_MONITORING_CONTAINER:
      onStartMonitoringContainer(monitoringEvent, containerId);
      break;
    case STOP_MONITORING_CONTAINER:
      onStopMonitoringContainer(monitoringEvent, containerId);
      break;
    case CHANGE_MONITORING_CONTAINER_RESOURCE:
      onChangeMonitoringContainerResource(monitoringEvent, containerId);
      break;
    default:
      // TODO: Wrong event.
    }
  }

  private void onChangeMonitoringContainerResource(
      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
    ChangeMonitoringContainerResourceEvent changeEvent =
        (ChangeMonitoringContainerResourceEvent) monitoringEvent;
    if (containersMonitorEnabled) {
      ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
      if (processTreeInfo == null) {
        LOG.warn("Failed to track container {}. It may have already completed.",
            containerId);
        return;
      }
      LOG.info("Changing resource-monitoring for {}", containerId);
      updateContainerMetrics(monitoringEvent);
      Resource resource = changeEvent.getResource();
      long pmemLimit = convertMBytesToBytes(resource.getMemorySize());
      long vmemLimit = (long) (pmemLimit * vmemRatio);
      int cpuVcores = resource.getVirtualCores();
      processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
    }
  }

  private void onStopMonitoringContainer(
      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
    LOG.info("Stopping resource-monitoring for {}", containerId);
    updateContainerMetrics(monitoringEvent);
    trackingContainers.remove(containerId);
  }

  private void onStartMonitoringContainer(
      ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
    ContainerStartMonitoringEvent startEvent =
        (ContainerStartMonitoringEvent) monitoringEvent;
    LOG.info("Starting resource-monitoring for {}", containerId);
    updateContainerMetrics(monitoringEvent);
    trackingContainers.put(containerId,
        new ProcessTreeInfo(containerId, null, null,
            startEvent.getVmemLimit(), startEvent.getPmemLimit(),
            startEvent.getCpuVcores()));
  }

  /**
   * Convert MegaBytes to Bytes.
   * @param mb MegaBytes (MB).
   * @return Bytes representing the input MB.
   */
  private static long convertMBytesToBytes(long mb) {
    return mb * 1024L * 1024L;
  }
}