EmitHandler.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core.server;


import static org.apache.tika.pipes.core.server.PipesWorker.metadataIsEmpty;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.filter.IncludeFieldMetadataFilter;
import org.apache.tika.metadata.filter.MetadataFilter;
import org.apache.tika.metadata.filter.NoOpFilter;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.api.FetchEmitTuple;
import org.apache.tika.pipes.api.ParseMode;
import org.apache.tika.pipes.api.PipesResult;
import org.apache.tika.pipes.api.emitter.EmitKey;
import org.apache.tika.pipes.api.emitter.Emitter;
import org.apache.tika.pipes.api.emitter.StreamEmitter;
import org.apache.tika.pipes.core.EmitStrategy;
import org.apache.tika.pipes.core.EmitStrategyConfig;
import org.apache.tika.pipes.core.PassbackFilter;
import org.apache.tika.pipes.core.emitter.EmitDataImpl;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;

class EmitHandler {

    private static final Logger LOG = LoggerFactory.getLogger(EmitHandler.class);

    private final MetadataFilter defaultMetadataFilter;
    private final EmitStrategy emitStrategy;
    private final EmitterManager emitterManager;
    private final long directEmitThresholdBytes;


    public EmitHandler(MetadataFilter defaultMetadataFilter, EmitStrategy emitStrategy, EmitterManager emitterManager, long directEmitThresholdBytes) {
        this.defaultMetadataFilter = defaultMetadataFilter;
        this.emitStrategy = emitStrategy;
        this.emitterManager = emitterManager;
        this.directEmitThresholdBytes = directEmitThresholdBytes;
    }

    public PipesResult emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseData, ParseContext parseContext) {
        long start = System.currentTimeMillis();
        String stack = getContainerStacktrace(t, parseData.getMetadataList());
        //we need to apply the metadata filter after we pull out the stacktrace
        filterMetadata(parseData, parseContext);
        FetchEmitTuple.ON_PARSE_EXCEPTION onParseException = t.getOnParseException();
        if (StringUtils.isBlank(stack) ||
                onParseException == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) {
            injectUserMetadata(t.getMetadata(), parseData.getMetadataList());
            EmitKey emitKey = t.getEmitKey();
            if (StringUtils.isBlank(emitKey.getEmitKey())) {
                emitKey = new EmitKey(emitKey.getEmitterId(), t.getFetchKey().getFetchKey());
                t.setEmitKey(emitKey);
            }
            EmitDataImpl emitDataTuple = new EmitDataImpl(t.getEmitKey().getEmitKey(), parseData.getMetadataList(), stack);
            ParseMode parseMode = parseContext.get(ParseMode.class);
            if (shouldEmit(parseMode, parseData, emitDataTuple, parseContext)) {
                return emit(t.getId(), emitKey, parseMode == ParseMode.UNPACK,
                        parseData, stack, parseContext);
            } else {
                if (StringUtils.isBlank(stack)) {
                    return new PipesResult(PipesResult.RESULT_STATUS.PARSE_SUCCESS, emitDataTuple);
                } else {
                    return new PipesResult(PipesResult.RESULT_STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitDataTuple);
                }
            }
        } else {
            return new PipesResult(PipesResult.RESULT_STATUS.PARSE_EXCEPTION_NO_EMIT, stack);
        }
    }

    private PipesResult emit(String taskId, EmitKey emitKey,
                      boolean isExtractEmbeddedBytes, MetadataListAndEmbeddedBytes parseData,
                      String parseExceptionStack, ParseContext parseContext) {
        Emitter emitter = null;

        try {
            emitter = emitterManager.getEmitter(emitKey.getEmitterId());
        } catch (org.apache.tika.pipes.api.emitter.EmitterNotFoundException e) {
            String noEmitterMsg = getNoEmitterMsg(taskId);
            LOG.warn(noEmitterMsg);
            return new PipesResult(PipesResult.RESULT_STATUS.EMITTER_NOT_FOUND, noEmitterMsg);
        } catch (IOException | TikaException e) {
            LOG.warn("Couldn't initialize emitter for task id '" + taskId + "'", e);
            return new PipesResult(PipesResult.RESULT_STATUS.EMITTER_INITIALIZATION_EXCEPTION, ExceptionUtils.getStackTrace(e));
        }
        try {
            ParseMode parseMode = parseContext.get(ParseMode.class);
            if (parseMode == ParseMode.CONTENT_ONLY && emitter instanceof StreamEmitter) {
                emitContentOnly((StreamEmitter) emitter, emitKey, parseData, parseContext);
            } else if (isExtractEmbeddedBytes &&
                    parseData.toBePackagedForStreamEmitter()) {
                emitContentsAndBytes(emitter, emitKey, parseData);
            } else {
                emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList(), parseContext);
            }
        } catch (IOException e) {
            LOG.warn("emit exception", e);
            String msg = ExceptionUtils.getStackTrace(e);
            //for now, we're hiding the parse exception if there was also an emit exception
            return new PipesResult(PipesResult.RESULT_STATUS.EMIT_EXCEPTION, msg);
        }
        PassbackFilter passbackFilter = parseContext.get(PassbackFilter.class);
        if (passbackFilter != null) {
            try {
                passbackFilter.filter(parseData.metadataList);
            } catch (TikaException e) {
                LOG.warn("problem filtering for pass back", e);
            }
            if (StringUtils.isBlank(parseExceptionStack)) {
                return new PipesResult(PipesResult.RESULT_STATUS.EMIT_SUCCESS_PASSBACK, new EmitDataImpl(emitKey.getEmitKey(), parseData.metadataList));
            } else {
                return new PipesResult(PipesResult.RESULT_STATUS.EMIT_SUCCESS_PARSE_EXCEPTION, new EmitDataImpl(emitKey.getEmitKey(), parseData.metadataList), parseExceptionStack);
            }

        }
        if (StringUtils.isBlank(parseExceptionStack)) {
            return new PipesResult(PipesResult.RESULT_STATUS.EMIT_SUCCESS);
        } else {
            return new PipesResult(PipesResult.RESULT_STATUS.EMIT_SUCCESS_PARSE_EXCEPTION, parseExceptionStack);
        }
    }

    private void emitContentOnly(StreamEmitter emitter, EmitKey emitKey,
                                  MetadataListAndEmbeddedBytes parseData,
                                  ParseContext parseContext) throws IOException {
        List<Metadata> metadataList = parseData.getMetadataList();
        String content = "";
        if (metadataList != null && !metadataList.isEmpty()) {
            String val = metadataList.get(0).get(TikaCoreProperties.TIKA_CONTENT);
            if (val != null) {
                content = val;
            }
        }
        byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
        try (InputStream is = new ByteArrayInputStream(bytes)) {
            emitter.emit(emitKey.getEmitKey(), is, new Metadata(), parseContext);
        }
    }

    private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey,
                                      MetadataListAndEmbeddedBytes parseData) {
        if (!(emitter instanceof StreamEmitter)) {
            throw new IllegalArgumentException("The emitter for embedded document byte store must" +
                    " be a StreamEmitter. I see: " + emitter.getClass());
        }
        //TODO: implement this
        throw new UnsupportedOperationException("this is not yet implemented");
    }


    private boolean shouldEmit(ParseMode parseMode, MetadataListAndEmbeddedBytes parseData,
                               EmitDataImpl emitDataTuple, ParseContext parseContext) {
        EmitStrategy strategy = emitStrategy;
        long thresholdBytes = directEmitThresholdBytes;

        EmitStrategyConfig overrideConfig = parseContext.get(EmitStrategyConfig.class);
        if (overrideConfig != null) {
            strategy = overrideConfig.getType();
            if (overrideConfig.getThresholdBytes() != null) {
                thresholdBytes = overrideConfig.getThresholdBytes();
            }
        }

        // UNPACK mode: bytes are already emitted during parsing
        // For PASSBACK_ALL, don't emit metadata - pass it back to client instead
        // For other strategies, also emit metadata
        if (parseMode == ParseMode.UNPACK) {
            if (strategy == EmitStrategy.PASSBACK_ALL) {
                // Bytes were emitted during parsing, metadata will be passed back
                return false;
            }
            return true;
        }

        if (strategy == EmitStrategy.EMIT_ALL) {
            return true;
        } else if (strategy == EmitStrategy.PASSBACK_ALL) {
            return false;
        } else if (strategy == EmitStrategy.DYNAMIC) {
            if (emitDataTuple.getEstimatedSizeBytes() >= thresholdBytes) {
                return true;
            }
        }
        return false;
    }

    private static String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataList) {
        if (metadataIsEmpty(metadataList)) {
            return StringUtils.EMPTY;
        }
        String stack = metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION);
        return (stack != null) ? stack : StringUtils.EMPTY;
    }

    private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
        for (String n : userMetadata.names()) {
            //overwrite whatever was there
            metadataList.get(0).set(n, null);
            for (String val : userMetadata.getValues(n)) {
                metadataList.get(0).add(n, val);
            }
        }
    }

    private void filterMetadata(MetadataListAndEmbeddedBytes parseData, ParseContext parseContext) {
        MetadataFilter filter = parseContext.get(MetadataFilter.class);
        if (filter == null) {
            ParseMode parseMode = parseContext.get(ParseMode.class);
            if (parseMode == ParseMode.CONTENT_ONLY) {
                filter = new IncludeFieldMetadataFilter(
                        Set.of(TikaCoreProperties.TIKA_CONTENT.getName(),
                                TikaCoreProperties.CONTAINER_EXCEPTION.getName()));
            } else {
                filter = defaultMetadataFilter;
            }
        }
        if (filter instanceof NoOpFilter) {
            return;
        }
        try {
            parseData.filter(filter, parseContext);
        } catch (TikaException e) {
            LOG.warn("failed to filter metadata list", e);
        }
    }

    private String getNoEmitterMsg(String emitterName) {
        StringBuilder sb = new StringBuilder();
        sb.append("Emitter '").append(emitterName).append("'");
        sb.append(" not found.");
        sb.append("\nThe configured emitterManager supports:");
        int i = 0;
        for (String e : emitterManager.getSupported()) {
            if (i++ > 0) {
                sb.append(", ");
            }
            sb.append(e);
        }
        return sb.toString();
    }

}