AsyncEmitter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tika.pipes.core.async;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.api.emitter.EmitData;
import org.apache.tika.pipes.api.emitter.Emitter;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.utils.ExceptionUtils;

/**
 * Worker thread that takes EmitData off the queue, batches it
 * and tries to emit it as a batch
 */
public class AsyncEmitter implements Callable<Integer> {

    static final EmitDataPair EMIT_DATA_STOP_SEMAPHORE = new EmitDataPair(null, null);
    static final int EMITTER_FUTURE_CODE = 2;

    private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);

    private final PipesConfig asyncConfig;
    private final EmitterManager emitterManager;
    private final ArrayBlockingQueue<EmitDataPair> emitDataQueue;

    Instant lastEmitted = Instant.now();

    public AsyncEmitter(PipesConfig asyncConfig, ArrayBlockingQueue<EmitDataPair> emitData,
                        EmitterManager emitterManager) {
        this.asyncConfig = asyncConfig;
        this.emitDataQueue = emitData;
        this.emitterManager = emitterManager;
    }

    @Override
    public Integer call() throws Exception {
        EmitDataCache cache = new EmitDataCache(asyncConfig.getEmitMaxEstimatedBytes());

        while (true) {
            EmitDataPair emitDataPair = emitDataQueue.poll(500, TimeUnit.MILLISECONDS);
            if (emitDataPair == EMIT_DATA_STOP_SEMAPHORE) {
                cache.emitAll();
                return EMITTER_FUTURE_CODE;
            }
            if (emitDataPair != null) {
                //this can block on emitAll
                cache.add(emitDataPair);
            } else {
                LOG.trace("Nothing on the async queue");
            }
            LOG.debug("cache size: ({}) bytes and extract count: {}", cache.estimatedSize,
                    cache.size);
            long elapsed = ChronoUnit.MILLIS.between(lastEmitted, Instant.now());
            if (elapsed > asyncConfig.getEmitWithinMillis()) {
                LOG.debug("{} elapsed > {}, going to emitAll", elapsed, asyncConfig.getEmitWithinMillis());
                //this can block
                cache.emitAll();
            }
        }
    }

    private class EmitDataCache {
        private final long maxBytes;

        long estimatedSize = 0;
        int size = 0;
        Map<String, List<EmitData>> map = new HashMap<>();

        public EmitDataCache(long maxBytes) {
            this.maxBytes = maxBytes;
        }

        void updateEstimatedSize(long newBytes) {
            estimatedSize += newBytes;
        }

        void add(EmitDataPair emitDataPair) {
            size++;
            long sz = emitDataPair.emitData().getEstimatedSizeBytes();
            if (estimatedSize + sz > maxBytes) {
                LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
                        (estimatedSize + sz), maxBytes);
                emitAll();
            }
            List<EmitData> cached = map.computeIfAbsent(emitDataPair.emitterId(), k -> new ArrayList<>());
            updateEstimatedSize(sz);
            cached.add(emitDataPair.emitData());
        }

        private void emitAll() {
            int emitted = 0;
            LOG.debug("about to emit {} files, {} estimated bytes", size, estimatedSize);
            for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
                Emitter emitter = null;
                try {
                    emitter = emitterManager.getEmitter(e.getKey());
                } catch (IOException | TikaException ex) {
                    LOG.warn("emitter id={} failed on instantiation", e.getKey(), ex);
                    return;
                }
                tryToEmit(emitter, e.getValue());
                emitted += e.getValue().size();
            }

            LOG.debug("emitted: {} files", emitted);
            estimatedSize = 0;
            size = 0;
            map.clear();
            lastEmitted = Instant.now();
        }

        private void tryToEmit(Emitter emitter, List<? extends EmitData> emitData) {

            try {
                emitter.emit(emitData);
            } catch (IOException e) {
                LOG.warn("emitter class ({}): {}", emitter.getClass(),
                        ExceptionUtils.getStackTrace(e));
            }
        }
    }
}