AbstractConcurrentProcessor.java
/*******************************************************************************
* Copyright 2015 Univocity Software Pty Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package com.univocity.parsers.common.processor.core;
import com.univocity.parsers.common.*;
import java.util.concurrent.*;
/**
* A {@link Processor} implementation to perform row processing tasks in parallel. The {@code ConcurrentRowProcessor}
* wraps another {@link Processor}, and collects rows read from the input.
* The actual row processing is performed in by wrapped {@link Processor} in a separate thread.
*
* <i>Note: </i> by default the {@link Context} object passed on to the wrapped {@link Processor} will <b>not</b> reflect the
* state of the parser at the time the row as generated, but the current state of the parser instead. You can enable the
* {@link #contextCopyingEnabled} flag to generate copies of the {@link Context} at the time each row was generated.
*
* @author Univocity Software Pty Ltd - <a href="mailto:parsers@univocity.com">parsers@univocity.com</a>
* @see AbstractParser
* @see Processor
*/
public abstract class AbstractConcurrentProcessor<T extends Context> implements Processor<T> {
private final Processor processor;
private boolean ended = false;
private static class Node<T> {
public Node(String[] row, T context) {
this.row = row;
this.context = context;
}
public final T context;
public final String[] row;
public Node next;
}
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private volatile long rowCount;
private Future<Void> process;
private T currentContext;
private Node<T> inputQueue;
private volatile Node<T> outputQueue;
private final int limit;
private volatile long input;
private volatile long output;
private final Object lock;
private boolean contextCopyingEnabled = false;
/**
* Creates a non-blocking {@code AbstractConcurrentProcessor}, to perform processing of rows parsed from the input in a separate thread.
*
* @param processor a regular {@link Processor} implementation which will be executed in a separate thread.
*/
public AbstractConcurrentProcessor(Processor<T> processor) {
this(processor, -1);
}
/**
* Creates a blocking {@code ConcurrentProcessor}, to perform processing of rows parsed from the input in a separate thread.
*
* @param processor a regular {@link Processor} implementation which will be executed in a separate thread.
* @param limit the limit of rows to be kept in memory before blocking the input parsing process.
*/
public AbstractConcurrentProcessor(Processor<T> processor, int limit) {
if (processor == null) {
throw new IllegalArgumentException("Row processor cannot be null");
}
this.processor = processor;
input = 0;
output = 0;
lock = new Object();
this.limit = limit;
}
/**
* Indicates whether this processor should persist the {@link Context} object that is sent to the wrapped {@link Processor}
* given in the constructor of this class, so all methods of {@link Context} reflect the parser state at the time
* each row was parsed.
*
* Defaults to {@code false}
*
* @return a flag indicating whether the parsing context must be persisted along with the parsed row
* so its methods reflect the state of the parser at the time the record was produced.
*/
public boolean isContextCopyingEnabled() {
return contextCopyingEnabled;
}
/**
* Configures this processor to persist the {@link Context} object that is sent to the wrapped {@link Processor}
* given in the constructor of this class, so all methods of {@link Context} reflect the parser state at the time
* each row was parsed.
*
* Defaults to {@code false}
*
* @param contextCopyingEnabled a flag indicating whether the parsing context must be persisted along with the parsed row
* so its methods reflect the state of the parser at the time the record was produced.
*/
public void setContextCopyingEnabled(boolean contextCopyingEnabled) {
this.contextCopyingEnabled = contextCopyingEnabled;
}
@Override
public final void processStarted(T context) {
currentContext = wrapContext(context);
processor.processStarted(currentContext);
startProcess();
}
private void startProcess() {
ended = false;
rowCount = 0;
process = executor.submit(new Callable<Void>() {
@Override
public Void call() {
while (outputQueue == null && !ended) {
Thread.yield();
}
while (!ended) {
rowCount++;
processor.rowProcessed(outputQueue.row, outputQueue.context);
while (outputQueue.next == null) {
if (ended && outputQueue.next == null) {
return null;
}
Thread.yield();
}
outputQueue = outputQueue.next;
output++;
if (limit > 1) {
synchronized (lock) {
lock.notify();
}
}
}
while (outputQueue != null) {
rowCount++;
processor.rowProcessed(outputQueue.row, outputQueue.context);
outputQueue = outputQueue.next;
}
return null;
}
});
}
@Override
public final void rowProcessed(String[] row, T context) {
if (inputQueue == null) {
inputQueue = new Node(row, grabContext(context));
outputQueue = inputQueue;
} else {
if (limit > 1) {
synchronized (lock) {
try {
if (input - output >= limit) {
lock.wait();
}
} catch (InterruptedException e) {
ended = true;
Thread.currentThread().interrupt();
return;
}
}
}
inputQueue.next = new Node(row, grabContext(context));
inputQueue = inputQueue.next;
}
input++;
}
@Override
public final void processEnded(T context) {
ended = true;
if (limit > 1) {
synchronized (lock) {
lock.notify();
}
}
try {
process.get();
} catch (ExecutionException e) {
throw new DataProcessingException("Error executing process", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
try {
processor.processEnded(grabContext(context));
} finally{
executor.shutdown();
}
}
}
private T grabContext(T context) {
if (contextCopyingEnabled) {
return copyContext(context);
}
return currentContext;
}
protected final long getRowCount(){
return rowCount;
}
protected abstract T copyContext(T context);
protected abstract T wrapContext(T context);
}