TikaAsyncCLI.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.async.cli;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.config.EmbeddedLimits;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
import org.apache.tika.pipes.core.async.AsyncProcessor;
import org.apache.tika.pipes.core.extractor.UnpackConfig;
import org.apache.tika.pipes.core.pipesiterator.PipesIteratorManager;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.plugins.TikaPluginManager;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.utils.StringUtils;
public class TikaAsyncCLI {
private static final long TIMEOUT_MS = 600_000;
private static final Logger LOG = LoggerFactory.getLogger(TikaAsyncCLI.class);
private static Options getOptions() {
Options options = new Options();
options.addOption("i", "inputDir", true, "input directory");
options.addOption("o", "outputDir", true, "output directory");
options.addOption("n", "numClients", true, "number of forked clients");
options.addOption(null, "Xmx", true, "heap for the forked clients, e.g. --Xmx 1g");
options.addOption("h", "help", false, "this help message");
options.addOption("T", "timeoutMs", true, "timeout for each parse in milliseconds");
options.addOption(null, "handler", true, "handler type: t=text, h=html, x=xml, m=markdown, b=body, i=ignore");
options.addOption("p", "pluginsDir", true, "plugins directory");
options.addOption("l", "fileList", true,
"file containing one path per line (relative to inputDir or absolute)");
options.addOption("c", "config", true, "tikaConfig.json");
options.addOption("z", "unzipShallow", false, "extract raw bytes from direct attachments only (depth=1)");
options.addOption("Z", "unzipRecursive", false, "extract raw bytes from all attachments recursively");
options.addOption(null, "concatenate", false, "concatenate content from all embedded documents into a single content field");
options.addOption(null, "content-only", false, "output only extracted content (no metadata, no JSON wrapper); implies --concatenate");
options.addOption(null, "unpack-format", true,
"output format for unpacking: REGULAR (default) or FRICTIONLESS");
options.addOption(null, "unpack-mode", true,
"output mode for unpacking: ZIPPED (default) or DIRECTORY");
options.addOption(null, "unpack-include-metadata", false,
"include metadata.json in Frictionless output");
return options;
}
public static void main(String[] args) throws Exception {
if (args.length == 0) {
usage(getOptions());
} else {
processCommandLine(args);
}
}
private static void processCommandLine(String[] args) throws Exception {
LOG.warn("processing args " + args.length);
if (args.length == 1) {
if (args[0].endsWith(".json")) {
LOG.warn("processing args");
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(Paths.get(args[0]));
Optional<PipesIterator> pipesIteratorOpt = PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig);
if (pipesIteratorOpt.isEmpty()) {
throw new IllegalArgumentException("Must specify a pipes iterator if supplying a .json file");
}
processWithTikaConfig(pipesIteratorOpt.get(), Paths.get(args[0]), null);
return;
}
}
SimpleAsyncConfig simpleAsyncConfig = parseCommandLine(args);
Path tmpTikaConfig = Files.createTempFile("tika-async-tmp-", ".json");
try {
PluginsWriter pluginsWriter = new PluginsWriter(simpleAsyncConfig, tmpTikaConfig);
pluginsWriter.write(tmpTikaConfig);
PipesIterator pipesIterator = buildPipesIterator(tmpTikaConfig, simpleAsyncConfig);
processWithTikaConfig(pipesIterator, tmpTikaConfig, simpleAsyncConfig);
} finally {
Files.deleteIfExists(tmpTikaConfig);
}
}
private static PipesIterator buildPipesIterator(Path pluginsConfig, SimpleAsyncConfig simpleAsyncConfig) throws TikaConfigException, IOException {
TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(pluginsConfig);
String inputDirString = simpleAsyncConfig.getInputDir();
// If a file list is provided, use it
if (!StringUtils.isBlank(simpleAsyncConfig.getFileList())) {
Path fileListPath = Paths.get(simpleAsyncConfig.getFileList());
Path basePath = StringUtils.isBlank(inputDirString) ? null : Paths.get(inputDirString);
return new FileListPipesIterator(fileListPath, basePath);
}
if (StringUtils.isBlank(inputDirString)) {
Optional<PipesIterator> pipesIteratorOpt = PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig);
if (pipesIteratorOpt.isEmpty()) {
throw new TikaConfigException("something went wrong loading: pipesIterator from the tika configs");
}
return pipesIteratorOpt.get();
}
Path p = Paths.get(simpleAsyncConfig.getInputDir());
if (Files.isRegularFile(p)) {
return new SingleFilePipesIterator(p.getFileName().toString());
}
Optional<PipesIterator> pipesIteratorOpt = PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig);
if (pipesIteratorOpt.isEmpty()) {
throw new TikaConfigException("something went wrong loading: pipesIterator from the tika configs");
}
return pipesIteratorOpt.get();
}
//not private for testing purposes
static SimpleAsyncConfig parseCommandLine(String[] args) throws TikaConfigException, ParseException, IOException {
if (args.length == 2 && ! args[0].startsWith("-")) {
return new SimpleAsyncConfig(args[0], args[1], 1,
30000L, "-Xmx1g", null, null,
BasicContentHandlerFactory.HANDLER_TYPE.TEXT,
SimpleAsyncConfig.ExtractBytesMode.NONE, null);
}
Options options = getOptions();
CommandLineParser cliParser = new DefaultParser();
CommandLine line = cliParser.parse(options, args, true);
if (line.hasOption("help")) {
usage(options);
}
String inputDir = null;
String outputDir = null;
String xmx = null;
Long timeoutMs = null;
Integer numClients = null;
String fileList = null;
String tikaConfig = null;
String asyncConfig = null;
String pluginsDir = null;
BasicContentHandlerFactory.HANDLER_TYPE handlerType = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
SimpleAsyncConfig.ExtractBytesMode extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.NONE;
if (line.hasOption("i")) {
inputDir = line.getOptionValue("i");
}
if (line.hasOption("o")) {
outputDir = line.getOptionValue("o");
}
if (line.hasOption("Xmx")) {
xmx = line.getOptionValue("Xmx");
}
if (line.hasOption("T")) {
timeoutMs = Long.parseLong(line.getOptionValue("T"));
}
if (line.hasOption("n")) {
numClients = Integer.parseInt(line.getOptionValue("n"));
}
if (line.hasOption("l")) {
fileList = line.getOptionValue("l");
}
if (line.hasOption("c")) {
tikaConfig = line.getOptionValue("c");
}
if (line.hasOption("Z")) {
extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.RECURSIVE;
} else if (line.hasOption("z")) {
extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.SHALLOW;
}
if (line.hasOption("handler")) {
handlerType = getHandlerType(line.getOptionValue("handler"));
}
if (line.hasOption('a')) {
asyncConfig = line.getOptionValue('a');
}
if (line.hasOption('p')) {
pluginsDir = line.getOptionValue('p');
}
// Parse mode options
boolean contentOnly = line.hasOption("content-only");
boolean concatenate = line.hasOption("concatenate") || contentOnly;
// Frictionless Data Package options
String unpackFormat = null;
String unpackMode = null;
boolean unpackIncludeMetadata = false;
if (line.hasOption("unpack-format")) {
unpackFormat = line.getOptionValue("unpack-format").toUpperCase(java.util.Locale.ROOT);
}
if (line.hasOption("unpack-mode")) {
unpackMode = line.getOptionValue("unpack-mode").toUpperCase(java.util.Locale.ROOT);
}
if (line.hasOption("unpack-include-metadata")) {
unpackIncludeMetadata = true;
}
if (line.getArgList().size() > 2) {
throw new TikaConfigException("Can't have more than 2 unknown args: " + line.getArgList());
}
if (line.getArgList().size() == 2) {
if (inputDir != null || outputDir != null) {
throw new TikaConfigException("Can only set inputDir and outputDir once. Extra args: " + line.getArgList());
}
String inString = line.getArgList().get(0);
String outString = line.getArgList().get(1);
if (inString.startsWith("-") || outString.startsWith("-")) {
throw new TikaConfigException("Found an unknown arg in one of the last two args: " + line.getArgList());
}
Path p = Paths.get(inString);
if (! Files.isDirectory(p) && ! Files.isRegularFile(p)) {
throw new TikaConfigException("Input file/dir must exist: " + p);
}
inputDir = inString;
outputDir = outString;
} else if (line.getArgList().size() == 1) {
if (inputDir != null) {
throw new TikaConfigException("Can only set inputDir once. Extra args: " + line.getArgList());
}
String inString = line.getArgList().get(0);
if (inString.startsWith("-")) {
throw new TikaConfigException("Found an unknown arg in one of the last arg: " + inString);
}
Path inputPath = Paths.get(inString);
if (! Files.isDirectory(inputPath) && ! Files.isRegularFile(inputPath)) {
throw new TikaConfigException("Input file/dir must exist: " + inputPath);
}
inputDir = inString;
// Only set default outputDir if not already specified via -o
if (outputDir == null) {
if (Files.isRegularFile(inputPath)) {
outputDir = Paths.get(".").toAbsolutePath().toString();
} else {
outputDir = Paths.get("output").toAbsolutePath().toString();
}
}
}
// If fileList is provided without an outputDir, default to "output"
if (fileList != null && outputDir == null) {
outputDir = Paths.get("output").toAbsolutePath().toString();
}
return new SimpleAsyncConfig(inputDir, outputDir,
numClients, timeoutMs, xmx, fileList, tikaConfig, handlerType,
extractBytesMode, pluginsDir, concatenate, contentOnly,
unpackFormat, unpackMode, unpackIncludeMetadata);
}
private static BasicContentHandlerFactory.HANDLER_TYPE getHandlerType(String t) throws TikaConfigException {
return switch (t) {
case "x" -> BasicContentHandlerFactory.HANDLER_TYPE.XML;
case "h" -> BasicContentHandlerFactory.HANDLER_TYPE.HTML;
case "m" -> BasicContentHandlerFactory.HANDLER_TYPE.MARKDOWN;
case "b" -> BasicContentHandlerFactory.HANDLER_TYPE.BODY;
case "i" -> BasicContentHandlerFactory.HANDLER_TYPE.IGNORE;
case "t" -> BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
default -> throw new TikaConfigException("Can't understand " + t + " as a handler type. Must be one of: x(ml), h(tml), m(arkdown), b(ody), i(gnore), t(ext)");
};
}
private static void processWithTikaConfig(PipesIterator pipesIterator, Path tikaConfigPath, SimpleAsyncConfig asyncConfig) throws Exception {
long start = System.currentTimeMillis();
try (AsyncProcessor processor = AsyncProcessor.load(tikaConfigPath, pipesIterator)) {
for (FetchEmitTuple t : pipesIterator) {
configureExtractBytes(t, asyncConfig);
configureHandler(t, asyncConfig);
boolean offered = processor.offer(t, TIMEOUT_MS);
if (!offered) {
throw new TimeoutException("timed out waiting to add a fetch emit tuple");
}
}
processor.finished();
while (true) {
if (processor.checkActive()) {
Thread.sleep(500);
} else {
break;
}
}
long elapsed = System.currentTimeMillis() - start;
LOG.info("Successfully finished processing {} files in {} ms", processor.getTotalProcessed(), elapsed);
}
}
private static void configureHandler(FetchEmitTuple t, SimpleAsyncConfig asyncConfig) {
if (asyncConfig == null) {
return;
}
if (asyncConfig.getHandlerType() == BasicContentHandlerFactory.HANDLER_TYPE.TEXT) {
return;
}
ContentHandlerFactory factory = new BasicContentHandlerFactory(asyncConfig.getHandlerType(), -1);
t.getParseContext().set(ContentHandlerFactory.class, factory);
}
private static void configureExtractBytes(FetchEmitTuple t, SimpleAsyncConfig asyncConfig) {
if (asyncConfig == null) {
return;
}
SimpleAsyncConfig.ExtractBytesMode mode = asyncConfig.getExtractBytesMode();
if (mode == SimpleAsyncConfig.ExtractBytesMode.NONE) {
return;
}
ParseContext parseContext = t.getParseContext();
// Use the new UNPACK ParseMode for embedded byte extraction
parseContext.set(ParseMode.class, ParseMode.UNPACK);
// For SHALLOW mode (-z), set depth limit to 2 (direct children only)
// The depth accounting in the parser chain adds extra levels:
// - Container document: depth 1 after beforeParse()
// - First-level embedded: depth 2-3 depending on parser wrapper chain
// Using maxDepth=2 allows first-level embedded while blocking recursive
if (mode == SimpleAsyncConfig.ExtractBytesMode.SHALLOW) {
EmbeddedLimits limits = new EmbeddedLimits();
limits.setMaxDepth(2);
limits.setThrowOnMaxDepth(false);
parseContext.set(EmbeddedLimits.class, limits);
}
// For RECURSIVE mode (-Z), use default unlimited depth
UnpackConfig config = new UnpackConfig();
config.setEmitter(TikaConfigAsyncWriter.EMITTER_NAME);
config.setIncludeOriginal(false);
config.setSuffixStrategy(UnpackConfig.SUFFIX_STRATEGY.DETECTED);
config.setEmbeddedIdPrefix("-");
config.setZeroPadName(8);
config.setKeyBaseStrategy(UnpackConfig.KEY_BASE_STRATEGY.DEFAULT);
// Apply Frictionless Data Package options
if (asyncConfig.getUnpackFormat() != null) {
config.setOutputFormat(UnpackConfig.OUTPUT_FORMAT.valueOf(asyncConfig.getUnpackFormat()));
}
if (asyncConfig.getUnpackMode() != null) {
config.setOutputMode(UnpackConfig.OUTPUT_MODE.valueOf(asyncConfig.getUnpackMode()));
}
if (asyncConfig.isUnpackIncludeMetadata()) {
config.setIncludeFullMetadata(true);
}
parseContext.set(UnpackConfig.class, config);
}
private static final String DEFAULT_PLUGINS_DIR = "plugins";
/**
* Resolves the default plugins directory. Looks for a "plugins" directory
* next to the running jar first, then falls back to the current working directory.
*
* @return the resolved plugins directory path, or "plugins" if neither location exists
*/
static String resolveDefaultPluginsDir() {
try {
Path jarPath = Paths.get(
TikaAsyncCLI.class.getProtectionDomain().getCodeSource().getLocation().toURI());
Path jarDir = jarPath.getParent();
if (jarDir != null) {
// The jar is typically in lib/, so look for plugins/ as a sibling of lib/
Path parent = jarDir.getParent();
if (parent != null) {
Path pluginsDir = parent.resolve(DEFAULT_PLUGINS_DIR);
if (Files.isDirectory(pluginsDir)) {
return pluginsDir.toAbsolutePath().toString();
}
}
}
} catch (Exception e) {
// Fall through to cwd-relative
}
Path cwdPlugins = Paths.get(DEFAULT_PLUGINS_DIR);
if (Files.isDirectory(cwdPlugins)) {
return cwdPlugins.toAbsolutePath().toString();
}
return DEFAULT_PLUGINS_DIR;
}
/**
* Ensures plugin-roots is set in the config. If missing, creates a merged config
* with a default plugin-roots value.
*
* @param originalConfigPath the user's config file path
* @param pluginsDir optional plugins directory from command line (may be null)
* @return the config path to use (original if plugin-roots exists, or a new merged config)
*/
static Path ensurePluginRoots(Path originalConfigPath, String pluginsDir) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(originalConfigPath.toFile());
if (rootNode.has("plugin-roots")) {
// plugin-roots already set, use original config
return originalConfigPath;
}
// Need to add plugin-roots
ObjectNode mutableRoot = (ObjectNode) rootNode;
String pluginString;
if (!StringUtils.isBlank(pluginsDir)) {
Path plugins = Paths.get(pluginsDir);
pluginString = Files.isDirectory(plugins) ?
plugins.toAbsolutePath().toString() : pluginsDir;
} else {
pluginString = resolveDefaultPluginsDir();
}
mutableRoot.put("plugin-roots", pluginString);
// Write merged config to temp file
Path mergedConfig = Files.createTempFile("tika-async-merged-config-", ".json");
mapper.writerWithDefaultPrettyPrinter().writeValue(mergedConfig.toFile(), mutableRoot);
mergedConfig.toFile().deleteOnExit();
LOG.info("Added default plugin-roots to config: {}", pluginString);
return mergedConfig;
}
private static void usage(Options options) throws IOException {
System.out.println("Two primary options:");
System.out.println("\t1. Specify a tika-config.xml on the commandline that includes the definitions for async");
System.out.println("\t2. Commandline:");
org.apache.commons.cli.help.HelpFormatter helpFormatter = org.apache.commons.cli.help.HelpFormatter.builder().get();
helpFormatter.printHelp("tikaAsyncCli", null, options, null, true);
System.exit(1);
}
private static class SingleFilePipesIterator implements PipesIterator {
private final String fName;
public SingleFilePipesIterator(String fName) {
this.fName = fName;
}
@Override
public Iterator<FetchEmitTuple> iterator() {
FetchEmitTuple t = new FetchEmitTuple("0",
new FetchKey(TikaConfigAsyncWriter.FETCHER_NAME, fName),
new EmitKey(TikaConfigAsyncWriter.EMITTER_NAME, fName)
);
return List.of(t).iterator();
}
@Override
public Integer call() throws Exception {
return 1;
}
@Override
public ExtensionConfig getExtensionConfig() {
return null;
}
}
}