FileSystemPipesIterator.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.iterator.fs;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.config.ConfigValidator;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.api.pipesiterator.TotalCountResult;
import org.apache.tika.pipes.api.pipesiterator.TotalCounter;
import org.apache.tika.pipes.pipesiterator.PipesIteratorBase;
import org.apache.tika.plugins.ExtensionConfig;
public class FileSystemPipesIterator extends PipesIteratorBase implements TotalCounter, Closeable {
public static FileSystemPipesIterator build(ExtensionConfig pluginConfig) throws TikaConfigException, IOException {
FileSystemPipesIterator pipesIterator = new FileSystemPipesIterator(pluginConfig);
pipesIterator.configure();
return pipesIterator;
}
private FileSystemPipesIteratorConfig config;
private void configure() throws IOException, TikaConfigException {
config = FileSystemPipesIteratorConfig.load(pluginConfig.json());
checkConfig(config);
if (config.isCountTotal()) {
fileCountWorker = new FileCountWorker(config.getBasePath());
}
}
private static final Logger LOG = LoggerFactory.getLogger(FileSystemPipesIterator.class);
private FileCountWorker fileCountWorker;
private FileSystemPipesIterator(ExtensionConfig pluginConfig) {
super(pluginConfig);
}
@Override
protected void enqueue() throws InterruptedException, IOException, TimeoutException {
if (!Files.isDirectory(config.getBasePath())) {
throw new IllegalArgumentException(
"\"basePath\" directory does not exist: " + config
.getBasePath().toAbsolutePath());
}
try {
Files.walkFileTree(config.getBasePath(), new FSFileVisitor(config.getFetcherId(), config.getEmitterId()));
} catch (IOException e) {
Throwable cause = e.getCause();
if (cause != null && cause instanceof TimeoutException) {
throw (TimeoutException) cause;
}
throw e;
}
}
public void checkConfig(FileSystemPipesIteratorConfig config)
throws TikaConfigException {
//these should all be fatal
ConfigValidator.mustNotBeEmpty("basePath", config.getBasePath());
}
@Override
public void startTotalCount() {
if (!config.isCountTotal()) {
return;
}
fileCountWorker.startTotalCount();
}
@Override
public TotalCountResult getTotalCount() {
if (!config.isCountTotal()) {
return TotalCountResult.UNSUPPORTED;
}
return fileCountWorker.getTotalCount();
}
@Override
public void close() throws IOException {
if (fileCountWorker != null) {
fileCountWorker.close();
}
}
private class FSFileVisitor implements FileVisitor<Path> {
private final String fetcherId;
private final String emitterId;
private FSFileVisitor(String fetcherId, String emitterId) {
this.fetcherId = fetcherId;
this.emitterId = emitterId;
}
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String relPath = FileSystemPipesIterator.this.config
.getBasePath().relativize(file).toString();
try {
ParseContext parseContext = new ParseContext();
// ContentHandlerFactory, ParseMode, and onParseException come from PipesConfig loaded via TikaLoader
tryToAdd(new FetchEmitTuple(relPath, new FetchKey(fetcherId, relPath),
new EmitKey(emitterId, relPath), new Metadata(), parseContext,
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT));
} catch (TimeoutException e) {
throw new IOException(e);
} catch (InterruptedException e) {
return FileVisitResult.TERMINATE;
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
}
private static class FileCountWorker implements TotalCounter, Closeable {
private Thread totalCounterThread;
private final AtomicLong totalCount = new AtomicLong(0);
private TotalCountResult.STATUS status;
private TotalCountResult finalResult;
private final Path basePath;
public FileCountWorker(Path basePath) {
this.basePath = basePath;
this.status = TotalCountResult.STATUS.NOT_COMPLETED;
}
@Override
public void startTotalCount() {
totalCounterThread = new Thread(() -> {
try {
Files.walkFileTree(basePath, new FSFileCounter(totalCount));
status = TotalCountResult.STATUS.COMPLETED;
finalResult = new TotalCountResult(totalCount.get(), status);
} catch (IOException e) {
LOG.warn("problem counting files", e);
status = TotalCountResult.STATUS.EXCEPTION;
finalResult = new TotalCountResult(totalCount.get(), status);
}
});
totalCounterThread.setDaemon(true);
totalCounterThread.start();
}
@Override
public TotalCountResult getTotalCount() {
if (finalResult != null) {
return finalResult;
}
return new TotalCountResult(totalCount.get(), status);
}
@Override
public void close() throws IOException {
totalCounterThread.interrupt();
}
private static class FSFileCounter implements FileVisitor<Path> {
private final AtomicLong count;
private FSFileCounter(AtomicLong count) {
this.count = count;
}
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
count.incrementAndGet();
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
}
}
}