TikaAsyncCLI.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.async.cli;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.config.EmbeddedLimits;
import org.apache.tika.config.loader.TikaJsonConfig;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.fetcher.FetchKey;
import org.apache.tika.pipes.api.pipesiterator.PipesIterator;
import org.apache.tika.pipes.core.async.AsyncProcessor;
import org.apache.tika.pipes.core.extractor.UnpackConfig;
import org.apache.tika.pipes.core.pipesiterator.PipesIteratorManager;
import org.apache.tika.plugins.ExtensionConfig;
import org.apache.tika.plugins.TikaPluginManager;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.utils.StringUtils;

public class TikaAsyncCLI {

    private static final long TIMEOUT_MS = 600_000;
    private static final Logger LOG = LoggerFactory.getLogger(TikaAsyncCLI.class);

    private static Options getOptions() {
        Options options = new Options();
        options.addOption("i", "inputDir", true, "input directory");
        options.addOption("o", "outputDir", true, "output directory");
        options.addOption("n", "numClients", true, "number of forked clients");
        options.addOption(null, "Xmx", true, "heap for the forked clients, e.g. --Xmx 1g");
        options.addOption("h", "help", false, "this help message");
        options.addOption("T", "timeoutMs", true, "timeout for each parse in milliseconds");
        options.addOption(null, "handler", true, "handler type: t=text, h=html, x=xml, m=markdown, b=body, i=ignore");
        options.addOption("p", "pluginsDir", true, "plugins directory");
        options.addOption("l", "fileList", true,
                "file containing one path per line (relative to inputDir or absolute)");
        options.addOption("c", "config", true, "tikaConfig.json");
        options.addOption("z", "unzipShallow", false, "extract raw bytes from direct attachments only (depth=1)");
        options.addOption("Z", "unzipRecursive", false, "extract raw bytes from all attachments recursively");
        options.addOption(null, "concatenate", false, "concatenate content from all embedded documents into a single content field");
        options.addOption(null, "content-only", false, "output only extracted content (no metadata, no JSON wrapper); implies --concatenate");
        options.addOption(null, "unpack-format", true,
                "output format for unpacking: REGULAR (default) or FRICTIONLESS");
        options.addOption(null, "unpack-mode", true,
                "output mode for unpacking: ZIPPED (default) or DIRECTORY");
        options.addOption(null, "unpack-include-metadata", false,
                "include metadata.json in Frictionless output");

        return options;
    }

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            usage(getOptions());
        } else {
            processCommandLine(args);
        }
    }

    private static void processCommandLine(String[] args) throws Exception {
        LOG.warn("processing args " + args.length);
        if (args.length == 1) {
            if (args[0].endsWith(".json")) {
                LOG.warn("processing args");
                TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(Paths.get(args[0]));
                Optional<PipesIterator> pipesIteratorOpt = PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig);
                if (pipesIteratorOpt.isEmpty()) {
                    throw new IllegalArgumentException("Must specify a pipes iterator if supplying a .json file");
                }
                processWithTikaConfig(pipesIteratorOpt.get(), Paths.get(args[0]), null);
                return;
            }
        }

        SimpleAsyncConfig simpleAsyncConfig = parseCommandLine(args);

        Path tmpTikaConfig = Files.createTempFile("tika-async-tmp-", ".json");
        try {
            PluginsWriter pluginsWriter = new PluginsWriter(simpleAsyncConfig, tmpTikaConfig);
            pluginsWriter.write(tmpTikaConfig);

            PipesIterator pipesIterator = buildPipesIterator(tmpTikaConfig, simpleAsyncConfig);
            processWithTikaConfig(pipesIterator, tmpTikaConfig, simpleAsyncConfig);
        } finally {
            Files.deleteIfExists(tmpTikaConfig);
        }
    }


    private static PipesIterator buildPipesIterator(Path pluginsConfig, SimpleAsyncConfig simpleAsyncConfig) throws TikaConfigException, IOException {
        TikaJsonConfig tikaJsonConfig = TikaJsonConfig.load(pluginsConfig);
        String inputDirString = simpleAsyncConfig.getInputDir();

        // If a file list is provided, use it
        if (!StringUtils.isBlank(simpleAsyncConfig.getFileList())) {
            Path fileListPath = Paths.get(simpleAsyncConfig.getFileList());
            Path basePath = StringUtils.isBlank(inputDirString) ? null : Paths.get(inputDirString);
            return new FileListPipesIterator(fileListPath, basePath);
        }

        if (StringUtils.isBlank(inputDirString)) {
            Optional<PipesIterator> pipesIteratorOpt =  PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig);
            if (pipesIteratorOpt.isEmpty()) {
                throw new TikaConfigException("something went wrong loading: pipesIterator from the tika configs");
            }
            return pipesIteratorOpt.get();
        }
        Path p = Paths.get(simpleAsyncConfig.getInputDir());
        if (Files.isRegularFile(p)) {
            return new SingleFilePipesIterator(p.getFileName().toString());
        }
        Optional<PipesIterator> pipesIteratorOpt = PipesIteratorManager.load(TikaPluginManager.load(tikaJsonConfig), tikaJsonConfig);
        if (pipesIteratorOpt.isEmpty()) {
            throw new TikaConfigException("something went wrong loading: pipesIterator from the tika configs");
        }
        return pipesIteratorOpt.get();
    }

    //not private for testing purposes
    static SimpleAsyncConfig parseCommandLine(String[] args) throws TikaConfigException, ParseException, IOException {
        if (args.length == 2 && ! args[0].startsWith("-")) {
            return new SimpleAsyncConfig(args[0], args[1], 1,
                    30000L, "-Xmx1g", null, null,
                    BasicContentHandlerFactory.HANDLER_TYPE.TEXT,
                    SimpleAsyncConfig.ExtractBytesMode.NONE, null);
        }

        Options options = getOptions();

        CommandLineParser cliParser = new DefaultParser();

        CommandLine line = cliParser.parse(options, args, true);
        if (line.hasOption("help")) {
            usage(options);
        }
        String inputDir = null;
        String outputDir = null;
        String xmx = null;
        Long timeoutMs = null;
        Integer numClients = null;
        String fileList = null;
        String tikaConfig = null;
        String asyncConfig = null;
        String pluginsDir = null;
        BasicContentHandlerFactory.HANDLER_TYPE handlerType = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
        SimpleAsyncConfig.ExtractBytesMode extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.NONE;
        if (line.hasOption("i")) {
            inputDir = line.getOptionValue("i");
        }
        if (line.hasOption("o")) {
            outputDir = line.getOptionValue("o");
        }
        if (line.hasOption("Xmx")) {
            xmx = line.getOptionValue("Xmx");
        }
        if (line.hasOption("T")) {
            timeoutMs = Long.parseLong(line.getOptionValue("T"));
        }
        if (line.hasOption("n")) {
            numClients = Integer.parseInt(line.getOptionValue("n"));
        }
        if (line.hasOption("l")) {
            fileList = line.getOptionValue("l");
        }
        if (line.hasOption("c")) {
            tikaConfig = line.getOptionValue("c");
        }
        if (line.hasOption("Z")) {
            extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.RECURSIVE;
        } else if (line.hasOption("z")) {
            extractBytesMode = SimpleAsyncConfig.ExtractBytesMode.SHALLOW;
        }
        if (line.hasOption("handler")) {
            handlerType = getHandlerType(line.getOptionValue("handler"));
        }
        if (line.hasOption('a')) {
            asyncConfig = line.getOptionValue('a');
        }
        if (line.hasOption('p')) {
            pluginsDir = line.getOptionValue('p');
        }

        // Parse mode options
        boolean contentOnly = line.hasOption("content-only");
        boolean concatenate = line.hasOption("concatenate") || contentOnly;

        // Frictionless Data Package options
        String unpackFormat = null;
        String unpackMode = null;
        boolean unpackIncludeMetadata = false;
        if (line.hasOption("unpack-format")) {
            unpackFormat = line.getOptionValue("unpack-format").toUpperCase(java.util.Locale.ROOT);
        }
        if (line.hasOption("unpack-mode")) {
            unpackMode = line.getOptionValue("unpack-mode").toUpperCase(java.util.Locale.ROOT);
        }
        if (line.hasOption("unpack-include-metadata")) {
            unpackIncludeMetadata = true;
        }

        if (line.getArgList().size() > 2) {
            throw new TikaConfigException("Can't have more than 2 unknown args: " + line.getArgList());
        }

        if (line.getArgList().size() == 2) {
            if (inputDir != null || outputDir != null) {
                throw new TikaConfigException("Can only set inputDir and outputDir once. Extra args: " + line.getArgList());
            }
            String inString = line.getArgList().get(0);
            String outString = line.getArgList().get(1);
            if (inString.startsWith("-") || outString.startsWith("-")) {
                throw new TikaConfigException("Found an unknown arg in one of the last two args: " + line.getArgList());
            }
            Path p = Paths.get(inString);
            if (! Files.isDirectory(p) && ! Files.isRegularFile(p)) {
                throw new TikaConfigException("Input file/dir must exist: " + p);
            }
            inputDir = inString;
            outputDir = outString;
        } else if (line.getArgList().size() == 1) {
            if (inputDir != null) {
                throw new TikaConfigException("Can only set inputDir once. Extra args: " + line.getArgList());
            }
            String inString = line.getArgList().get(0);
            if (inString.startsWith("-")) {
                throw new TikaConfigException("Found an unknown arg in one of the last arg: " + inString);
            }
            Path inputPath = Paths.get(inString);
            if (! Files.isDirectory(inputPath) && ! Files.isRegularFile(inputPath)) {
                throw new TikaConfigException("Input file/dir must exist: " + inputPath);
            }
            inputDir = inString;
            // Only set default outputDir if not already specified via -o
            if (outputDir == null) {
                if (Files.isRegularFile(inputPath)) {
                    outputDir = Paths.get(".").toAbsolutePath().toString();
                } else {
                    outputDir = Paths.get("output").toAbsolutePath().toString();
                }
            }
        }

        // If fileList is provided without an outputDir, default to "output"
        if (fileList != null && outputDir == null) {
            outputDir = Paths.get("output").toAbsolutePath().toString();
        }

        return new SimpleAsyncConfig(inputDir, outputDir,
                numClients, timeoutMs, xmx, fileList, tikaConfig, handlerType,
                extractBytesMode, pluginsDir, concatenate, contentOnly,
                unpackFormat, unpackMode, unpackIncludeMetadata);
    }

    private static BasicContentHandlerFactory.HANDLER_TYPE getHandlerType(String t) throws TikaConfigException {
        return switch (t) {
            case "x" -> BasicContentHandlerFactory.HANDLER_TYPE.XML;
            case "h" -> BasicContentHandlerFactory.HANDLER_TYPE.HTML;
            case "m" -> BasicContentHandlerFactory.HANDLER_TYPE.MARKDOWN;
            case "b" -> BasicContentHandlerFactory.HANDLER_TYPE.BODY;
            case "i" -> BasicContentHandlerFactory.HANDLER_TYPE.IGNORE;
            case "t" -> BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
            default -> throw new TikaConfigException("Can't understand " + t + " as a handler type. Must be one of: x(ml), h(tml), m(arkdown), b(ody), i(gnore), t(ext)");
        };
    }


    private static void processWithTikaConfig(PipesIterator pipesIterator, Path tikaConfigPath, SimpleAsyncConfig asyncConfig) throws Exception {
        long start = System.currentTimeMillis();
        try (AsyncProcessor processor = AsyncProcessor.load(tikaConfigPath, pipesIterator)) {

            for (FetchEmitTuple t : pipesIterator) {
                configureExtractBytes(t, asyncConfig);
                configureHandler(t, asyncConfig);
                boolean offered = processor.offer(t, TIMEOUT_MS);
                if (!offered) {
                    throw new TimeoutException("timed out waiting to add a fetch emit tuple");
                }
            }
            processor.finished();
            while (true) {
                if (processor.checkActive()) {
                    Thread.sleep(500);
                } else {
                    break;
                }
            }
            long elapsed = System.currentTimeMillis() - start;
            LOG.info("Successfully finished processing {} files in {} ms", processor.getTotalProcessed(), elapsed);

        }
    }

    private static void configureHandler(FetchEmitTuple t, SimpleAsyncConfig asyncConfig) {
        if (asyncConfig == null) {
            return;
        }
        if (asyncConfig.getHandlerType() == BasicContentHandlerFactory.HANDLER_TYPE.TEXT) {
            return;
        }
        ContentHandlerFactory factory = new BasicContentHandlerFactory(asyncConfig.getHandlerType(), -1);
        t.getParseContext().set(ContentHandlerFactory.class, factory);
    }

    private static void configureExtractBytes(FetchEmitTuple t, SimpleAsyncConfig asyncConfig) {
        if (asyncConfig == null) {
            return;
        }
        SimpleAsyncConfig.ExtractBytesMode mode = asyncConfig.getExtractBytesMode();
        if (mode == SimpleAsyncConfig.ExtractBytesMode.NONE) {
            return;
        }
        ParseContext parseContext = t.getParseContext();
        // Use the new UNPACK ParseMode for embedded byte extraction
        parseContext.set(ParseMode.class, ParseMode.UNPACK);

        // For SHALLOW mode (-z), set depth limit to 2 (direct children only)
        // The depth accounting in the parser chain adds extra levels:
        // - Container document: depth 1 after beforeParse()
        // - First-level embedded: depth 2-3 depending on parser wrapper chain
        // Using maxDepth=2 allows first-level embedded while blocking recursive
        if (mode == SimpleAsyncConfig.ExtractBytesMode.SHALLOW) {
            EmbeddedLimits limits = new EmbeddedLimits();
            limits.setMaxDepth(2);
            limits.setThrowOnMaxDepth(false);
            parseContext.set(EmbeddedLimits.class, limits);
        }
        // For RECURSIVE mode (-Z), use default unlimited depth

        UnpackConfig config = new UnpackConfig();
        config.setEmitter(TikaConfigAsyncWriter.EMITTER_NAME);
        config.setIncludeOriginal(false);
        config.setSuffixStrategy(UnpackConfig.SUFFIX_STRATEGY.DETECTED);
        config.setEmbeddedIdPrefix("-");
        config.setZeroPadName(8);
        config.setKeyBaseStrategy(UnpackConfig.KEY_BASE_STRATEGY.DEFAULT);

        // Apply Frictionless Data Package options
        if (asyncConfig.getUnpackFormat() != null) {
            config.setOutputFormat(UnpackConfig.OUTPUT_FORMAT.valueOf(asyncConfig.getUnpackFormat()));
        }
        if (asyncConfig.getUnpackMode() != null) {
            config.setOutputMode(UnpackConfig.OUTPUT_MODE.valueOf(asyncConfig.getUnpackMode()));
        }
        if (asyncConfig.isUnpackIncludeMetadata()) {
            config.setIncludeFullMetadata(true);
        }

        parseContext.set(UnpackConfig.class, config);
    }

    private static final String DEFAULT_PLUGINS_DIR = "plugins";

    /**
     * Resolves the default plugins directory. Looks for a "plugins" directory
     * next to the running jar first, then falls back to the current working directory.
     *
     * @return the resolved plugins directory path, or "plugins" if neither location exists
     */
    static String resolveDefaultPluginsDir() {
        try {
            Path jarPath = Paths.get(
                    TikaAsyncCLI.class.getProtectionDomain().getCodeSource().getLocation().toURI());
            Path jarDir = jarPath.getParent();
            if (jarDir != null) {
                // The jar is typically in lib/, so look for plugins/ as a sibling of lib/
                Path parent = jarDir.getParent();
                if (parent != null) {
                    Path pluginsDir = parent.resolve(DEFAULT_PLUGINS_DIR);
                    if (Files.isDirectory(pluginsDir)) {
                        return pluginsDir.toAbsolutePath().toString();
                    }
                }
            }
        } catch (Exception e) {
            // Fall through to cwd-relative
        }
        Path cwdPlugins = Paths.get(DEFAULT_PLUGINS_DIR);
        if (Files.isDirectory(cwdPlugins)) {
            return cwdPlugins.toAbsolutePath().toString();
        }
        return DEFAULT_PLUGINS_DIR;
    }

    /**
     * Ensures plugin-roots is set in the config. If missing, creates a merged config
     * with a default plugin-roots value.
     *
     * @param originalConfigPath the user's config file path
     * @param pluginsDir optional plugins directory from command line (may be null)
     * @return the config path to use (original if plugin-roots exists, or a new merged config)
     */
    static Path ensurePluginRoots(Path originalConfigPath, String pluginsDir) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode rootNode = mapper.readTree(originalConfigPath.toFile());

        if (rootNode.has("plugin-roots")) {
            // plugin-roots already set, use original config
            return originalConfigPath;
        }

        // Need to add plugin-roots
        ObjectNode mutableRoot = (ObjectNode) rootNode;
        String pluginString;
        if (!StringUtils.isBlank(pluginsDir)) {
            Path plugins = Paths.get(pluginsDir);
            pluginString = Files.isDirectory(plugins) ?
                    plugins.toAbsolutePath().toString() : pluginsDir;
        } else {
            pluginString = resolveDefaultPluginsDir();
        }
        mutableRoot.put("plugin-roots", pluginString);

        // Write merged config to temp file
        Path mergedConfig = Files.createTempFile("tika-async-merged-config-", ".json");
        mapper.writerWithDefaultPrettyPrinter().writeValue(mergedConfig.toFile(), mutableRoot);
        mergedConfig.toFile().deleteOnExit();

        LOG.info("Added default plugin-roots to config: {}", pluginString);
        return mergedConfig;
    }

    private static void usage(Options options) throws IOException {
        System.out.println("Two primary options:");
        System.out.println("\t1. Specify a tika-config.xml on the commandline that includes the definitions for async");
        System.out.println("\t2. Commandline:");
        org.apache.commons.cli.help.HelpFormatter helpFormatter = org.apache.commons.cli.help.HelpFormatter.builder().get();
        helpFormatter.printHelp("tikaAsyncCli", null, options, null, true);
        System.exit(1);
    }

    private static class SingleFilePipesIterator implements PipesIterator {
        private final String fName;
        public SingleFilePipesIterator(String fName) {
            this.fName = fName;
        }


        @Override
        public Iterator<FetchEmitTuple> iterator() {
            FetchEmitTuple t = new FetchEmitTuple("0",
                    new FetchKey(TikaConfigAsyncWriter.FETCHER_NAME, fName),
                    new EmitKey(TikaConfigAsyncWriter.EMITTER_NAME, fName)
            );
            return List.of(t).iterator();
        }

        @Override
        public Integer call() throws Exception {
            return 1;
        }

        @Override
        public ExtensionConfig getExtensionConfig() {
            return null;
        }
    }
}