ThreadPools.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.common;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* Copied from Apache Iceberg.
*/
public final class ThreadPools {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
private ThreadPools() {
}
public static final String WORKER_THREAD_POOL_SIZE_PROP = "tos.worker.num-threads";
public static final int WORKER_THREAD_POOL_SIZE =
poolSize(Math.max(2, Runtime.getRuntime().availableProcessors()));
private static final ExecutorService WORKER_POOL = newWorkerPool("tos-default-worker-pool");
public static ExecutorService defaultWorkerPool() {
return WORKER_POOL;
}
public static ExecutorService newWorkerPool(String namePrefix) {
return newWorkerPool(namePrefix, WORKER_THREAD_POOL_SIZE);
}
public static ExecutorService newWorkerPool(String namePrefix, int poolSize) {
return Executors.newFixedThreadPool(poolSize, newDaemonThreadFactory(namePrefix));
}
public static ScheduledExecutorService newScheduleWorkerPool(String namePrefix, int poolSize) {
return Executors.newScheduledThreadPool(poolSize, newDaemonThreadFactory(namePrefix));
}
/**
* Helper routine to shutdown a {@link ExecutorService}. Will wait up to a
* certain timeout for the ExecutorService to gracefully shutdown. If the
* ExecutorService did not shutdown and there are still tasks unfinished after
* the timeout period, the ExecutorService will be notified to forcibly shut
* down. Another timeout period will be waited before giving up. So, at most,
* a shutdown will be allowed to wait up to twice the timeout value before
* giving up.
* <p>
* This method is copied from
* {@link HadoopExecutors#shutdown(ExecutorService, Logger, long, TimeUnit)}.
*
* @param executorService ExecutorService to shutdown
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
*/
public static void shutdown(ExecutorService executorService, long timeout, TimeUnit unit) {
if (executorService == null) {
return;
}
try {
executorService.shutdown();
LOG.debug("Gracefully shutting down executor service. Waiting max {} {}", timeout, unit);
if (!executorService.awaitTermination(timeout, unit)) {
LOG.debug("Executor service has not shutdown yet. Forcing. Will wait up to an additional"
+ " {} {} for shutdown", timeout, unit);
executorService.shutdownNow();
}
if (executorService.awaitTermination(timeout, unit)) {
LOG.debug("Succesfully shutdown executor service");
} else {
LOG.error("Unable to shutdown executor service after timeout {} {}", (2 * timeout), unit);
}
} catch (InterruptedException e) {
LOG.error("Interrupted while attempting to shutdown", e);
executorService.shutdownNow();
} catch (Exception e) {
LOG.warn("Exception closing executor service {}", e.getMessage());
LOG.debug("Exception closing executor service", e);
throw e;
}
}
private static int poolSize(int defaultSize) {
String value = System.getProperty(WORKER_THREAD_POOL_SIZE_PROP);
if (value != null) {
try {
return Integer.parseUnsignedInt(value);
} catch (NumberFormatException e) {
// will return the default
}
}
return defaultSize;
}
public static ThreadFactory newDaemonThreadFactory(String namePrefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d")
.setUncaughtExceptionHandler(
(t, e) -> LOG.error("Thread {} encounter uncaught exception", t, e)).build();
}
public static Thread newDaemonThread(String name, Runnable runnable,
UncaughtExceptionHandler handler) {
Thread t = new Thread(runnable);
t.setName(name);
t.setDaemon(true);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
}
return t;
}
}