LocalComputationManagerTest.java
/**
* Copyright (c) 2017, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
* SPDX-License-Identifier: MPL-2.0
*/
package com.powsybl.computation.local;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import com.powsybl.computation.*;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author Geoffroy Jamgotchian {@literal <geoffroy.jamgotchian at rte-france.com>}
*/
class LocalComputationManagerTest {
private static final String PREFIX = "test_";
private FileSystem fileSystem;
private Path localDir;
private LocalComputationConfig config;
@BeforeEach
void setUp() throws Exception {
fileSystem = Jimfs.newFileSystem(Configuration.unix());
localDir = fileSystem.getPath("/tmp");
config = new LocalComputationConfig(localDir, 1);
}
@AfterEach
void tearDown() throws Exception {
fileSystem.close();
}
@Test
void testVersion() throws Exception {
assertEquals("none (local mode)", new LocalComputationManager(config).getVersion());
}
@Test
void testLocalDir() throws Exception {
assertEquals(localDir, new LocalComputationManager(config).getLocalDir());
}
@Test
void test1() throws Exception {
LocalCommandExecutor localCommandExecutor = new AbstractLocalCommandExecutor() {
@Override
void nonZeroLog(List<String> cmdLs, int exitCode) {
}
@Override
public int execute(String program, List<String> args, Path outFile, Path errFile, Path workingDir, Map<String, String> env) throws IOException, InterruptedException {
// check command line is correct
assertEquals("prog1", program);
assertEquals(ImmutableList.of("file1", "file2", "file3"), args);
assertEquals(ImmutableMap.of("var1", "val1"), env);
// check working directory exists, contains inout files and standard output file
assertTrue(Files.exists(workingDir));
assertEquals(workingDir.resolve("prog1_cmd_0.out").toString(), outFile.toString());
assertEquals(workingDir.resolve("prog1_cmd_0.err").toString(), errFile.toString());
assertTrue(Files.exists(workingDir.resolve("file1")));
assertTrue(Files.exists(workingDir.resolve("file2")));
assertTrue(Files.exists(workingDir.resolve("file3")));
// command exits badly
return 1;
}
};
try (ComputationManager computationManager = new LocalComputationManager(config, localCommandExecutor, ForkJoinPool.commonPool())) {
computationManager.execute(new ExecutionEnvironment(ImmutableMap.of("var1", "val1"), PREFIX, false),
new AbstractExecutionHandler<Object>() {
@Override
public List<CommandExecution> before(Path workingDir) throws IOException {
// create files in the working directory
Files.createFile(workingDir.resolve("file1"));
try (OutputStream os = new GZIPOutputStream(Files.newOutputStream(workingDir.resolve("file2.gz")))) {
}
try (ZipOutputStream os = new ZipOutputStream(Files.newOutputStream(workingDir.resolve("file3.zip")))) {
os.putNextEntry(new ZipEntry("file3"));
os.closeEntry();
}
// run the command
Command command = new SimpleCommandBuilder()
.id("prog1_cmd")
.program("prog1")
.args("file1", "file2", "file3")
.timeout(60)
.inputFiles(new InputFile("file1"),
new InputFile("file2.gz", FilePreProcessor.FILE_GUNZIP),
new InputFile("file3.zip", FilePreProcessor.ARCHIVE_UNZIP))
.build();
return Collections.singletonList(new CommandExecution(command, 1));
}
@Override
public Object after(Path workingDir, ExecutionReport report) throws IOException {
// check command exits with an error
assertEquals(1, report.getErrors().size());
assertEquals("prog1_cmd", report.getErrors().get(0).getCommand().getId());
assertEquals(0, report.getErrors().get(0).getIndex());
assertEquals(1, report.getErrors().get(0).getExitCode());
return null;
}
}).join();
}
}
@Test
void test2() throws Exception {
LocalCommandExecutor localCommandExecutor = new AbstractLocalCommandExecutor() {
@Override
void nonZeroLog(List<String> cmdLs, int exitCode) {
}
@Override
public int execute(String program, List<String> args, Path outFile, Path errFile, Path workingDir, Map<String, String> env) throws IOException, InterruptedException {
// check working directory exists and standard output file
assertTrue(Files.exists(workingDir));
assertEquals(workingDir.resolve("prog2_cmd_0.out").toString(), outFile.toString());
assertEquals(workingDir.resolve("prog2_cmd_0.err").toString(), errFile.toString());
switch (program) {
case "prog2_1":
// check command line is correct
assertEquals(ImmutableList.of(), args);
break;
case "prog2_2":
// check command line is correct
assertEquals(ImmutableList.of("file1"), args);
// check input file exists
assertTrue(Files.exists(workingDir.resolve("file1")));
// create output files
Files.createFile(workingDir.resolve("outFile1"));
Files.createFile(workingDir.resolve("outFile2"));
break;
default:
fail();
}
// command is ok
return 0;
}
};
try (ComputationManager computationManager = new LocalComputationManager(config, localCommandExecutor, ForkJoinPool.commonPool())) {
// create file1 as a common file
computationManager.newCommonFile("file1").close();
computationManager.execute(new ExecutionEnvironment(ImmutableMap.of(), PREFIX, false),
new AbstractExecutionHandler<Object>() {
@Override
public List<CommandExecution> before(Path workingDir) throws IOException {
// run the group command
Command command = new GroupCommandBuilder()
.id("prog2_cmd")
.subCommand()
.program("prog2_1")
.timeout(60)
.add()
.subCommand()
.program("prog2_2")
.args("file1")
.add()
.inputFiles(new InputFile("file1"))
.outputFiles(new OutputFile("outFile1"),
new OutputFile("outFile2", FilePostProcessor.FILE_GZIP))
.build();
return Collections.singletonList(new CommandExecution(command, 1));
}
@Override
public Object after(Path workingDir, ExecutionReport report) throws IOException {
// check command exits correctly
assertTrue(report.getErrors().isEmpty());
assertTrue(Files.exists(workingDir.resolve("outFile1")));
assertTrue(Files.exists(workingDir.resolve("outFile2.gz")));
return null;
}
}).join();
}
}
@Test
void hangingIssue() throws Exception {
LocalCommandExecutor localCommandExecutor = new AbstractLocalCommandExecutor() {
@Override
void nonZeroLog(List<String> cmdLs, int exitCode) {
}
@Override
public int execute(String program, List<String> args, Path outFile, Path errFile, Path workingDir, Map<String, String> env) throws IOException, InterruptedException {
return 0;
}
};
try (ComputationManager computationManager = new LocalComputationManager(config, localCommandExecutor, ForkJoinPool.commonPool())) {
CompletableFuture<Object> result = computationManager.execute(ExecutionEnvironment.createDefault(), new AbstractExecutionHandler<Object>() {
@Override
public List<CommandExecution> before(Path workingDir) {
throw new IllegalStateException("Oups");
}
});
// check that code is not hanging anymore when a java.lang.Error is thrown inside before
assertThrows(ExecutionException.class, () -> result.get(100, TimeUnit.MILLISECONDS));
}
}
private static List<CommandExecution> dummyExecutions() {
// run the command
Command command = new SimpleCommandBuilder()
.id("prog1_cmd")
.program("prog1")
.args("arg1", "arg2")
.build();
return Collections.singletonList(new CommandExecution(command, 1));
}
@Test
void cancelBeforeExecutionShouldThrowAndNotExecute() throws Exception {
LocalCommandExecutor localCommandExecutor = new AbstractLocalCommandExecutor() {
@Override
public void nonZeroLog(List<String> cmdLs, int exitCode) {
}
@Override
public int execute(String program, List<String> args, Path outFile, Path errFile, Path workingDir, Map<String, String> env) throws IOException, InterruptedException {
fail();
return 0;
}
};
CountDownLatch waitForBefore = new CountDownLatch(1);
CountDownLatch waitForCancel = new CountDownLatch(1);
try (ComputationManager computationManager = new LocalComputationManager(config, localCommandExecutor, ForkJoinPool.commonPool())) {
Lock lock = new ReentrantLock();
lock.lock();
CompletableFuture<Object> result = computationManager.execute(ExecutionEnvironment.createDefault(), new AbstractExecutionHandler<Object>() {
@Override
public List<CommandExecution> before(Path workingDir) {
waitForBefore.countDown();
awaitUninterruptibly(waitForCancel);
return dummyExecutions();
}
@Override
public Object after(Path workingDir, ExecutionReport report) throws IOException {
fail();
return super.after(workingDir, report);
}
});
waitForBefore.await();
result.cancel(true);
waitForCancel.countDown();
assertThrows(CancellationException.class, result::get);
}
}
@Test
void cancelDuringExecutionShouldThrowAndEventuallyStopExecution() throws InterruptedException, ExecutionException, IOException {
CountDownLatch waitForExecution = new CountDownLatch(1);
CountDownLatch execution = new CountDownLatch(1); // Will be interrupted, not decremented
CountDownLatch waitForInterruption = new CountDownLatch(1);
MutableBoolean stopped = new MutableBoolean(false);
LocalCommandExecutor localCommandExecutor = new AbstractLocalCommandExecutor() {
@Override
void nonZeroLog(List<String> cmdLs, int exitCode) {
}
@Override
public int execute(String program, List<String> args, Path outFile, Path errFile, Path workingDir, Map<String, String> env) throws IOException, InterruptedException {
waitForExecution.countDown();
execution.await(); //Simulates process running
return 0;
}
@Override
public void stop(Path workingDir) {
stopped.setTrue();
waitForInterruption.countDown();
}
@Override
public void stopForcibly(Path workingDir) {
stopped.setTrue();
waitForInterruption.countDown();
}
};
try (ComputationManager computationManager = new LocalComputationManager(config, localCommandExecutor, ForkJoinPool.commonPool())) {
CompletableFuture<Object> result = computationManager.execute(ExecutionEnvironment.createDefault(), new AbstractExecutionHandler<Object>() {
@Override
public List<CommandExecution> before(Path workingDir) {
return dummyExecutions();
}
@Override
public Object after(Path workingDir, ExecutionReport report) throws IOException {
fail();
return super.after(workingDir, report);
}
});
waitForExecution.await();
result.cancel(true);
result.get();
fail("Should not happen: result has been cancelled");
} catch (CancellationException exc) {
//OK
}
waitForInterruption.await(10, TimeUnit.SECONDS);
assertTrue(stopped.isTrue());
}
}