LowMemoryMonitor.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.memory;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.executor.TaskExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newScheduledThreadPool;
public class LowMemoryMonitor
{
private static final Logger log = Logger.get(LowMemoryMonitor.class);
private final ScheduledExecutorService lowMemoryExecutor = newScheduledThreadPool(1, daemonThreadsNamed("low-memory-monitor-executor"));
private final TaskExecutor taskExecutor;
private final double threshold;
private static final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
@Inject
public LowMemoryMonitor(TaskExecutor taskExecutor, TaskManagerConfig taskManagerConfig)
{
this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor is null");
this.threshold = taskManagerConfig.getMemoryBasedSlowDownThreshold();
}
@PostConstruct
public void start()
{
if (threshold < 1.0) {
lowMemoryExecutor.scheduleWithFixedDelay(() -> checkLowMemory(), 1, 1, TimeUnit.SECONDS);
}
}
@PreDestroy
public void stop()
{
lowMemoryExecutor.shutdown();
}
private void checkLowMemory()
{
MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
long usedMemory = memoryUsage.getUsed();
long maxMemory = memoryUsage.getMax();
long memoryThreshold = (long) (maxMemory * threshold);
if (usedMemory > memoryThreshold) {
if (!taskExecutor.isLowMemory()) {
log.debug("Enabling Low Memory: Used: %s Max: %s Threshold: %s", usedMemory, maxMemory, memoryThreshold);
taskExecutor.setLowMemory(true);
}
}
else {
if (taskExecutor.isLowMemory()) {
log.debug("Disabling Low Memory: Used: %s Max: %s Threshold: %s", usedMemory, maxMemory, memoryThreshold);
taskExecutor.setLowMemory(false);
}
}
}
}