ParseHandler.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core.server;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;

import org.apache.tika.detect.Detector;
import org.apache.tika.digest.Digester;
import org.apache.tika.digest.DigesterFactory;
import org.apache.tika.digest.SkipContainerDocumentDigest;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.WriteLimitReachedException;
import org.apache.tika.extractor.EmbeddedDocumentUtil;
import org.apache.tika.extractor.UnpackHandler;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.mime.MediaType;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.ParseRecord;
import org.apache.tika.parser.ParsingIntent;
import org.apache.tika.parser.RecursiveParserWrapper;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.core.extractor.UnpackConfig;
import org.apache.tika.sax.AbstractRecursiveParserWrapperHandler;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.apache.tika.utils.ExceptionUtils;

class ParseHandler {

    private static final Logger LOG = LoggerFactory.getLogger(ParseHandler.class);

    private final Detector detector;
    private final ArrayBlockingQueue<Metadata> intermediateResult;
    private final CountDownLatch countDownLatch;
    private final AutoDetectParser autoDetectParser;
    private final RecursiveParserWrapper recursiveParserWrapper;
    private final ContentHandlerFactory defaultContentHandlerFactory;
    private final ParseMode defaultParseMode;


    ParseHandler(Detector detector, ArrayBlockingQueue<Metadata> intermediateResult,
                 CountDownLatch countDownLatch, AutoDetectParser autoDetectParser,
                 RecursiveParserWrapper recursiveParserWrapper, ContentHandlerFactory defaultContentHandlerFactory,
                 ParseMode defaultParseMode) {
        this.detector = detector;
        this.intermediateResult = intermediateResult;
        this.countDownLatch = countDownLatch;
        this.autoDetectParser = autoDetectParser;
        this.recursiveParserWrapper = recursiveParserWrapper;
        this.defaultContentHandlerFactory = defaultContentHandlerFactory;
        this.defaultParseMode = defaultParseMode;
    }

    PipesWorker.ParseDataOrPipesResult parseWithStream(FetchEmitTuple fetchEmitTuple, TikaInputStream stream, Metadata metadata, ParseContext parseContext)
            throws TikaConfigException, InterruptedException {

        List<Metadata> metadataList;
        //this adds the EmbeddedDocumentByteStore to the parsecontext
        ParseMode parseMode = getParseMode(parseContext);
        ContentHandlerFactory contentHandlerFactory = getContentHandlerFactory(parseContext);
        if (parseMode == ParseMode.NO_PARSE) {
            metadataList = detectOnly(fetchEmitTuple, stream, metadata, parseContext);
        } else if (parseMode == ParseMode.RMETA || parseMode == ParseMode.UNPACK) {
            // UNPACK uses the same recursive parsing as RMETA
            // The difference is in setup (PipesWorker) - UNPACK has mandatory byte extraction
            metadataList =
                    parseRecursive(fetchEmitTuple, contentHandlerFactory, stream, metadata, parseContext);
        } else if (parseMode == ParseMode.CONCATENATE || parseMode == ParseMode.CONTENT_ONLY) {
            // CONTENT_ONLY parses identically to CONCATENATE; the difference is
            // at emit time where emitters write only the raw content string
            metadataList = parseConcatenated(fetchEmitTuple, contentHandlerFactory, stream, metadata,
                    parseContext);
        } else {
            metadataList = parseConcatenated(fetchEmitTuple, contentHandlerFactory, stream, metadata,
                    parseContext);
        }

        return new PipesWorker.ParseDataOrPipesResult(new MetadataListAndEmbeddedBytes(metadataList,
                parseContext.get(UnpackHandler.class)), null);
    }

    private ParseMode getParseMode(ParseContext parseContext) {
        ParseMode mode = parseContext.get(ParseMode.class);
        if (mode != null) {
            return mode;
        }
        // Fall back to default loaded from TikaLoader
        return defaultParseMode;
    }

    private ContentHandlerFactory getContentHandlerFactory(ParseContext parseContext) {
        ContentHandlerFactory factory = parseContext.get(ContentHandlerFactory.class);
        if (factory != null) {
            return factory;
        }
        // Fall back to default loaded from TikaLoader
        return defaultContentHandlerFactory;
    }



    private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
                           ParseContext parseContext) {
        // Get DigesterFactory from ParseContext (configured via parse-context)
        DigesterFactory digesterFactory = parseContext.get(DigesterFactory.class);
        if (digesterFactory != null && !digesterFactory.isSkipContainerDocumentDigest()) {
            try {
                Digester digester = digesterFactory.build();
                digester.digest(tis, metadata, parseContext);
                // Mark that we've already digested the container document so AutoDetectParser
                // won't re-digest it during parsing
                parseContext.set(SkipContainerDocumentDigest.class,
                        SkipContainerDocumentDigest.INSTANCE);
            } catch (IOException e) {
                LOG.warn("problem digesting: " + t.getId(), e);
            }
        }
        // Signal to detectors that parsing will follow, so they can prepare
        // resources (e.g., ZipSalvager for truncated zips)
        parseContext.set(ParsingIntent.class, ParsingIntent.WILL_PARSE);
        try {
            MediaType mt = detector.detect(tis, metadata, parseContext);
            metadata.set(Metadata.CONTENT_TYPE,
                    EmbeddedDocumentUtil.normalizeMediaType(mt.toString()));
            metadata.set(TikaCoreProperties.CONTENT_TYPE_PARSER_OVERRIDE, mt.toString());
        } catch (IOException e) {
            LOG.warn("problem detecting: " + t.getId(), e);
        }
        UnpackConfig unpackConfig = parseContext.get(UnpackConfig.class);
        if (unpackConfig != null &&
                unpackConfig.isIncludeOriginal()) {
            UnpackHandler unpackHandler = parseContext.get(UnpackHandler.class);
            try (InputStream is = Files.newInputStream(tis.getPath())) {
                unpackHandler.add(0, metadata, is);
            } catch (IOException e) {
                LOG.warn("problem reading source file into embedded document byte store", e);
            }
        }
    }

    private Metadata preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata,
                          ParseContext parseContext) {
        _preParse(t, tis, metadata, parseContext);
        return metadata;
    }

    /**
     * Performs digest (if configured) and content type detection only, without parsing.
     */
    private List<Metadata> detectOnly(FetchEmitTuple fetchEmitTuple, TikaInputStream stream,
                                      Metadata metadata, ParseContext parseContext) {
        _preParse(fetchEmitTuple, stream, metadata, parseContext);
        return Collections.singletonList(metadata);
    }

    public List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
                                              ContentHandlerFactory contentHandlerFactory, TikaInputStream stream,
                                              Metadata metadata, ParseContext parseContext) throws InterruptedException {
        //Intentionally do not add the metadata filter here!
        //We need to let stacktraces percolate
        // Embedded limits are now configured via EmbeddedLimits in ParseContext
        RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
                contentHandlerFactory);

        long start = System.currentTimeMillis();

        preParse(fetchEmitTuple, stream, metadata, parseContext);
        //queue better be empty. we deserve an exception if not
        intermediateResult.add(metadata);
        countDownLatch.await();
        try {
            recursiveParserWrapper.parse(stream, handler, metadata, parseContext);
        } catch (SAXException e) {
            LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
        } catch (EncryptedDocumentException e) {
            LOG.warn("encrypted document:" + fetchEmitTuple.getId(), e);
        } catch (SecurityException e) {
            LOG.warn("security exception:" + fetchEmitTuple.getId(), e);
            throw e;
        } catch (Exception e) {
            LOG.warn("parse exception: " + fetchEmitTuple.getId(), e);
        } finally {
            if (LOG.isTraceEnabled()) {
                LOG.trace("timer -- parse only time: {} ms", System.currentTimeMillis() - start);
            }
        }
        return handler.getMetadataList();
    }

    public List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
                                             ContentHandlerFactory contentHandlerFactory, TikaInputStream stream,
                                             Metadata metadata, ParseContext parseContext) throws InterruptedException {

        ContentHandler handler = contentHandlerFactory.createHandler();

        // Configure ParseRecord for embedded document limits
        // ParseRecord.newInstance reads from EmbeddedLimits in ParseContext
        ParseRecord parseRecord = parseContext.get(ParseRecord.class);
        if (parseRecord == null) {
            parseRecord = ParseRecord.newInstance(parseContext);
            parseContext.set(ParseRecord.class, parseRecord);
        }

        String containerException = null;
        long start = System.currentTimeMillis();
        preParse(fetchEmitTuple, stream, metadata, parseContext);
        //queue better be empty. we deserve an exception if not
        intermediateResult.add(metadata);
        countDownLatch.await();
        boolean writeLimitReached = false;
        try {
            autoDetectParser.parse(stream, handler, metadata, parseContext);
        } catch (SAXException e) {
            containerException = ExceptionUtils.getStackTrace(e);
            LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
            if (WriteLimitReachedException.isWriteLimitReached(e)) {
                writeLimitReached = true;
            }
        } catch (EncryptedDocumentException e) {
            containerException = ExceptionUtils.getStackTrace(e);
            LOG.warn("encrypted document:" + fetchEmitTuple.getId(), e);
        } catch (SecurityException e) {
            LOG.warn("security exception:" + fetchEmitTuple.getId(), e);
            throw e;
        } catch (Exception e) {
            containerException = ExceptionUtils.getStackTrace(e);
            LOG.warn("parse exception: " + fetchEmitTuple.getId(), e);
        } finally {
            metadata.add(TikaCoreProperties.TIKA_CONTENT, handler.toString());
            metadata.set(TikaCoreProperties.TIKA_CONTENT_HANDLER_TYPE,
                    contentHandlerFactory.handlerTypeName());
            if (containerException != null) {
                metadata.add(TikaCoreProperties.CONTAINER_EXCEPTION, containerException);
            }
            if (writeLimitReached) {
                metadata.set(TikaCoreProperties.WRITE_LIMIT_REACHED, true);
            }
            // Set limit reached flags from ParseRecord
            if (parseRecord.isEmbeddedCountLimitReached()) {
                metadata.set(AbstractRecursiveParserWrapperHandler.EMBEDDED_RESOURCE_LIMIT_REACHED, true);
            }
            if (parseRecord.isEmbeddedDepthLimitReached()) {
                metadata.set(AbstractRecursiveParserWrapperHandler.EMBEDDED_DEPTH_LIMIT_REACHED, true);
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("timer -- parse only time: {} ms", System.currentTimeMillis() - start);
            }
        }
        return Collections.singletonList(metadata);
    }

}