TestFileSystemStatusReporter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.reporter.fs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
import org.apache.tika.pipes.api.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.api.reporter.PipesReporter;
import org.apache.tika.plugins.ExtensionConfig;
public class TestFileSystemStatusReporter {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static String JSON_TEMPLATE = """
{
"statusFile": "STATUS_FILE",
"reportUpdateMs": 100
}
""";
@Test
public void testBasic(@TempDir Path tmpDir) throws Exception {
Path path = Files.createTempFile(tmpDir, "tika-fssr-", ".xml");
String jsonStr = JSON_TEMPLATE.replace("STATUS_FILE", path.toAbsolutePath().toString()
.replace("\\", "/"));
FileSystemStatusReporter reporter = new FileSystemReporterFactory().buildExtension(
new ExtensionConfig("test-fs-reporter", "fs-status-reporter", jsonStr));
final ObjectMapper objectMapper = JsonMapper.builder()
.addModule(new JavaTimeModule())
.build();
Thread readerThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
AsyncStatus asyncStatus =
objectMapper.readValue(path.toFile(), AsyncStatus.class);
assertEquals(TotalCountResult.STATUS.NOT_COMPLETED,
asyncStatus.getTotalCountResult().getStatus());
} catch (IOException e) {
//there will be problems reading from the file
//before it is originally written, etc. Ignore these
//problems
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
return;
}
}
}
});
readerThread.start();
Map<PipesResult.RESULT_STATUS, Long> total = runBatch(reporter, 10, 200);
readerThread.interrupt();
readerThread.join(1000);
reporter.report(new TotalCountResult(30000, TotalCountResult.STATUS.COMPLETED));
reporter.close();
AsyncStatus asyncStatus = objectMapper.readValue(path.toFile(), AsyncStatus.class);
Map<PipesResult.RESULT_STATUS, Long> map = asyncStatus.getStatusCounts();
assertEquals(total.size(), map.size());
for (Map.Entry<PipesResult.RESULT_STATUS, Long> e : total.entrySet()) {
assertTrue(map.containsKey(e.getKey()), e.getKey().toString());
assertEquals(e.getValue(), map.get(e.getKey()), e.getKey().toString());
}
assertEquals(AsyncStatus.ASYNC_STATUS.COMPLETED, asyncStatus.getAsyncStatus());
assertEquals(30000, asyncStatus.getTotalCountResult().getTotalCount());
assertEquals(TotalCountResult.STATUS.COMPLETED, asyncStatus.getTotalCountResult().getStatus());
}
private Map<PipesResult.RESULT_STATUS, Long> runBatch(FileSystemStatusReporter reporter,
int numThreads,
int numIterations)
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
ExecutorCompletionService executorCompletionService =
new ExecutorCompletionService(executorService);
List<ReportWorker> workerList = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
ReportWorker reportWorker = new ReportWorker(reporter, numIterations);
workerList.add(reportWorker);
executorCompletionService.submit(reportWorker);
}
Map<PipesResult.RESULT_STATUS, Long> total = new HashMap<>();
int finished = 0;
while (finished < numThreads) {
Future<Integer> future = executorCompletionService.poll();
if (future != null) {
future.get();
finished++;
}
}
for (ReportWorker r : workerList) {
Map<PipesResult.RESULT_STATUS, Long> local = r.getWritten();
for (Map.Entry<PipesResult.RESULT_STATUS, Long> e : local.entrySet()) {
Long t = total.get(e.getKey());
if (t == null) {
t = e.getValue();
} else {
t += e.getValue();
}
total.put(e.getKey(), t);
}
}
return total;
}
private class ReportWorker implements Callable<Integer> {
Map<PipesResult.RESULT_STATUS, Long> written = new HashMap<>();
private final PipesReporter reporter;
private final int numIterations;
private ReportWorker(PipesReporter reporter, int numIterations) {
this.reporter = reporter;
this.numIterations = numIterations;
}
@Override
public Integer call() throws Exception {
PipesResult.RESULT_STATUS[] statuses = PipesResult.RESULT_STATUS.values();
Random random = new Random();
for (int i = 0; i < numIterations; i++) {
PipesResult.RESULT_STATUS status = statuses[random.nextInt(statuses.length)];
PipesResult pipesResult = new PipesResult(status);
reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l);
Long cnt = written.get(status);
if (cnt == null) {
written.put(status, 1l);
} else {
cnt++;
written.put(status, cnt);
}
if (i % 100 == 0) {
Thread.sleep(94);
reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)),
TotalCountResult.STATUS.NOT_COMPLETED));
}
}
return 1;
}
Map<PipesResult.RESULT_STATUS, Long> getWritten() {
return written;
}
}
}