FileSystemStatusReporter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.reporter.fs;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.api.reporter.PipesReporter;
import org.apache.tika.plugins.AbstractTikaExtension;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.utils.ExceptionUtils;
/**
* This is intended to write summary statistics to disk
* periodically.
*
* As of the 2.5.0 release, this is ALPHA version. There may be breaking changes
* in the future.
*
* Because {@link AsyncStatus uses {@link java.time.Instant}, if you are deserializing
* with jackson-databind, you'll need to add jackson-datatype-jsr310. See
* the unit tests for how to deserialize AsyncStatus.
*
*/
public class FileSystemStatusReporter extends AbstractTikaExtension implements PipesReporter {
public static FileSystemStatusReporter build(ExtensionConfig pluginConfig) throws TikaConfigException, IOException {
FileSystemReporterConfig config = FileSystemReporterConfig.load(pluginConfig.json());
FileSystemStatusReporter fileSystemStatusReporter = new FileSystemStatusReporter(pluginConfig, config);
fileSystemStatusReporter.configure();
return fileSystemStatusReporter;
}
private static final Logger LOG = LoggerFactory.getLogger(FileSystemStatusReporter.class);
ObjectMapper objectMapper;
private final FileSystemReporterConfig config;
private volatile boolean crashed = false;
Thread reporterThread;
private ConcurrentHashMap<PipesResult.RESULT_STATUS, LongAdder> counts = new ConcurrentHashMap<>();
private AsyncStatus asyncStatus = new AsyncStatus();
private TotalCountResult totalCountResult = new TotalCountResult(0,
TotalCountResult.STATUS.NOT_COMPLETED);
private FileSystemStatusReporter(ExtensionConfig pluginConfig, FileSystemReporterConfig config) {
super(pluginConfig);
this.config = config;
}
private void configure() throws TikaConfigException {
if (config.statusFile() == null) {
throw new TikaConfigException("must initialize 'statusFile'");
}
if (! Files.isDirectory(config.statusFile().getParent())) {
try {
Files.createDirectories(config.statusFile().getParent());
} catch (IOException e) {
throw new TikaConfigException("couldn't create directory for status file", e);
}
}
objectMapper = JsonMapper.builder()
.addModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.build();
reporterThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
Thread.sleep(config.reportUpdateMs());
report(AsyncStatus.ASYNC_STATUS.STARTED);
}
} catch (InterruptedException e) {
//no op
}
}
});
reporterThread.setDaemon(true);
reporterThread.start();
}
private synchronized void report(AsyncStatus.ASYNC_STATUS status) {
Map<PipesResult.RESULT_STATUS, Long> localCounts = new HashMap<>();
counts.entrySet().forEach( e -> localCounts.put(e.getKey(), e.getValue().longValue()));
asyncStatus.update(localCounts, totalCountResult, status);
try (Writer writer = Files.newBufferedWriter(config.statusFile(), StandardCharsets.UTF_8)) {
objectMapper.writeValue(writer, asyncStatus);
} catch (IOException e) {
LOG.warn("couldn't write report", e);
}
}
private synchronized void crash(String crashMessage) {
asyncStatus.updateCrash(crashMessage);
try (Writer writer = Files.newBufferedWriter(config.statusFile(), StandardCharsets.UTF_8)) {
objectMapper.writeValue(writer, asyncStatus);
} catch (IOException e) {
LOG.warn("couldn't write report", e);
}
}
@Override
public void close() throws IOException {
LOG.debug("finishing and writing last report");
interuptThread();
if (! crashed) {
report(AsyncStatus.ASYNC_STATUS.COMPLETED);
}
}
private void interuptThread() {
reporterThread.interrupt();
try {
reporterThread.join(1000);
} catch (InterruptedException e) {
//swallow
}
}
@Override
public void error(Throwable t) {
crashed = true;
interuptThread();
crash(ExceptionUtils.getStackTrace(t));
}
@Override
public void error(String msg) {
crashed = true;
interuptThread();
crash(msg);
}
@Override
public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
counts.computeIfAbsent(result.status(),
k -> new LongAdder()).increment();
}
@Override
public void report(TotalCountResult totalCountResult) {
_report(totalCountResult);
}
private synchronized void _report(TotalCountResult totalCountResult) {
this.totalCountResult = totalCountResult;
}
@Override
public boolean supportsTotalCount() {
return true;
}
@Override
public ExtensionConfig getExtensionConfig() {
return null;
}
}