Processes.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark.testing;
import com.facebook.airlift.log.Logger;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.List;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.io.ByteStreams.copy;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
public class Processes
{
private static final Logger log = Logger.get(Processes.class);
private Processes() {}
public static String executeForOutput(String... command)
throws InterruptedException
{
return executeForOutput(ImmutableList.copyOf(command));
}
public static String executeForOutput(List<String> command)
throws InterruptedException
{
String commandString = Joiner.on(" ").join(command);
log.info("Running: %s", commandString);
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = startProcess(processBuilder);
closeInput(process);
pipeStderr(commandString, process);
String result;
try (InputStream inputStream = process.getInputStream()) {
result = CharStreams.toString(new InputStreamReader(inputStream, UTF_8));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
int exitCode = waitForProcess(process);
checkState(exitCode == 0, "[%s] process existed with status: %s", commandString, exitCode);
return result;
}
public static int execute(String... command)
throws InterruptedException
{
return execute(ImmutableList.copyOf(command));
}
public static int execute(List<String> command)
throws InterruptedException
{
log.info("Running: %s", Joiner.on(" ").join(command));
return waitForProcess(startProcess(command));
}
public static int waitForProcess(Process process)
throws InterruptedException
{
try {
return process.waitFor();
}
catch (InterruptedException e) {
destroyProcess(process);
throw e;
}
}
public static Process startProcess(List<String> command)
{
String commandString = Joiner.on(" ").join(command);
log.info("Starting: %s", commandString);
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = startProcess(processBuilder);
closeInput(process);
pipeStdout(commandString, process);
pipeStderr(commandString, process);
return process;
}
public static Process startProcess(ProcessBuilder processBuilder)
{
try {
return processBuilder.start();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public static void destroyProcess(Process process)
{
if (!process.isAlive()) {
return;
}
// stop
process.destroy();
// wait for 5 seconds
for (int i = 0; i < 5; i++) {
if (!process.isAlive()) {
break;
}
sleepUninterruptibly(1, SECONDS);
}
// kill
process.destroyForcibly();
}
private static void pipeStdout(String command, Process process)
{
pipe("PIPE STDOUT: " + command, process.getInputStream(), System.out);
}
private static void pipeStderr(String command, Process process)
{
pipe("PIPE STDERR: " + command, process.getErrorStream(), System.err);
}
private static void closeInput(Process process)
{
try {
process.getOutputStream().close();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static void pipe(String name, InputStream input, OutputStream output)
{
Thread thread = new Thread(() -> {
try {
copy(input, output);
}
catch (IOException ignored) {
}
});
thread.setName(name);
thread.start();
}
}