AsyncEmitter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.pipes.core.async;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.api.emitter.EmitData;
import org.apache.tika.pipes.api.emitter.Emitter;
import org.apache.tika.pipes.core.PipesConfig;
import org.apache.tika.pipes.core.emitter.EmitterManager;
import org.apache.tika.utils.ExceptionUtils;
/**
* Worker thread that takes EmitData off the queue, batches it
* and tries to emit it as a batch
*/
public class AsyncEmitter implements Callable<Integer> {
static final EmitDataPair EMIT_DATA_STOP_SEMAPHORE = new EmitDataPair(null, null);
static final int EMITTER_FUTURE_CODE = 2;
private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
private final PipesConfig asyncConfig;
private final EmitterManager emitterManager;
private final ArrayBlockingQueue<EmitDataPair> emitDataQueue;
Instant lastEmitted = Instant.now();
public AsyncEmitter(PipesConfig asyncConfig, ArrayBlockingQueue<EmitDataPair> emitData,
EmitterManager emitterManager) {
this.asyncConfig = asyncConfig;
this.emitDataQueue = emitData;
this.emitterManager = emitterManager;
}
@Override
public Integer call() throws Exception {
EmitDataCache cache = new EmitDataCache(asyncConfig.getEmitMaxEstimatedBytes());
while (true) {
EmitDataPair emitDataPair = emitDataQueue.poll(500, TimeUnit.MILLISECONDS);
if (emitDataPair == EMIT_DATA_STOP_SEMAPHORE) {
cache.emitAll();
return EMITTER_FUTURE_CODE;
}
if (emitDataPair != null) {
//this can block on emitAll
cache.add(emitDataPair);
} else {
LOG.trace("Nothing on the async queue");
}
LOG.debug("cache size: ({}) bytes and extract count: {}", cache.estimatedSize,
cache.size);
long elapsed = ChronoUnit.MILLIS.between(lastEmitted, Instant.now());
if (elapsed > asyncConfig.getEmitWithinMillis()) {
LOG.debug("{} elapsed > {}, going to emitAll", elapsed, asyncConfig.getEmitWithinMillis());
//this can block
cache.emitAll();
}
}
}
private class EmitDataCache {
private final long maxBytes;
long estimatedSize = 0;
int size = 0;
Map<String, List<EmitData>> map = new HashMap<>();
public EmitDataCache(long maxBytes) {
this.maxBytes = maxBytes;
}
void updateEstimatedSize(long newBytes) {
estimatedSize += newBytes;
}
void add(EmitDataPair emitDataPair) {
size++;
long sz = emitDataPair.emitData().getEstimatedSizeBytes();
if (estimatedSize + sz > maxBytes) {
LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
(estimatedSize + sz), maxBytes);
emitAll();
}
List<EmitData> cached = map.computeIfAbsent(emitDataPair.emitterId(), k -> new ArrayList<>());
updateEstimatedSize(sz);
cached.add(emitDataPair.emitData());
}
private void emitAll() {
int emitted = 0;
LOG.debug("about to emit {} files, {} estimated bytes", size, estimatedSize);
for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
Emitter emitter = null;
try {
emitter = emitterManager.getEmitter(e.getKey());
} catch (IOException | TikaException ex) {
LOG.warn("emitter id={} failed on instantiation", e.getKey(), ex);
return;
}
tryToEmit(emitter, e.getValue());
emitted += e.getValue().size();
}
LOG.debug("emitted: {} files", emitted);
estimatedSize = 0;
size = 0;
map.clear();
lastEmitted = Instant.now();
}
private void tryToEmit(Emitter emitter, List<? extends EmitData> emitData) {
try {
emitter.emit(emitData);
} catch (IOException e) {
LOG.warn("emitter class ({}): {}", emitter.getClass(),
ExceptionUtils.getStackTrace(e));
}
}
}
}