WorkProcessor.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.Immutable;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
public interface WorkProcessor<T>
{
/**
* Call the method to progress the work.
* When this method returns true then the processor is either finished
* or has a result available via {@link WorkProcessor#getResult()}.
* When this method returns false then the processor is either
* blocked or has yielded.
*/
boolean process();
boolean isBlocked();
/**
* @return a blocked future when {@link WorkProcessor#isBlocked()} returned true.
*/
ListenableFuture<?> getBlockedFuture();
/**
* @return true if the processor is finished. No more results are expected.
*/
boolean isFinished();
/**
* Get the result once the unit of work is done and the processor hasn't finished.
*/
T getResult();
/**
* Makes {@link WorkProcessor} yield when given {@code yieldSignal} is set. The processor is
* guaranteed to progress computations on subsequent {@link WorkProcessor#process()} calls
* even if {@code yieldSignal} is permanently on.
*/
default WorkProcessor<T> yielding(BooleanSupplier yieldSignal)
{
return WorkProcessorUtils.yielding(this, yieldSignal);
}
default <R> WorkProcessor<R> flatMap(Function<T, WorkProcessor<R>> mapper)
{
return WorkProcessorUtils.flatMap(this, mapper);
}
default <R> WorkProcessor<R> map(Function<T, R> mapper)
{
return WorkProcessorUtils.map(this, mapper);
}
/**
* Flattens {@link WorkProcessor}s returned by transformation. Each {@link WorkProcessor} produced
* by transformation will be fully consumed before transformation is called again to produce more processors.
*/
default <R> WorkProcessor<R> flatTransform(Transformation<T, WorkProcessor<R>> transformation)
{
return WorkProcessorUtils.flatTransform(this, transformation);
}
/**
* Transforms {@link WorkProcessor} using {@link Transformation}. {@link Transformation} instance will be dereferenced immediately after
* {@link WorkProcessor} is exhausted.
*/
default <R> WorkProcessor<R> transform(Transformation<T, R> transformation)
{
return WorkProcessorUtils.transform(this, transformation);
}
default <R> WorkProcessor<R> transformProcessor(Function<WorkProcessor<T>, WorkProcessor<R>> transformation)
{
return transformation.apply(this);
}
/**
* Converts {@link WorkProcessor} into an {@link Iterator}. The iterator will throw {@link IllegalStateException} when underlying {@link WorkProcessor}
* yields or becomes blocked. {@link WorkProcessor} instance will be dereferenced immediately after iterator is finished.
*/
default Iterator<T> iterator()
{
return WorkProcessorUtils.iteratorFrom(this);
}
/**
* Converts {@link WorkProcessor} into an yielding {@link Iterator}. The iterator will throw {@link IllegalStateException} when underlying {@link WorkProcessor}
* becomes blocked. {@link WorkProcessor} instance will be dereferenced immediately after iterator is exhausted.
*/
default Iterator<Optional<T>> yieldingIterator()
{
return WorkProcessorUtils.yieldingIteratorFrom(this);
}
static <T> WorkProcessor<T> flatten(WorkProcessor<WorkProcessor<T>> processor)
{
return WorkProcessorUtils.flatten(processor);
}
@SafeVarargs
static <T> WorkProcessor<T> of(T... elements)
{
return fromIterator(Iterators.forArray(elements));
}
static <T> WorkProcessor<T> fromIterable(Iterable<T> iterable)
{
return WorkProcessorUtils.fromIterator(iterable.iterator());
}
static <T> WorkProcessor<T> fromIterator(Iterator<T> iterator)
{
return WorkProcessorUtils.fromIterator(iterator);
}
/**
* Creates {@link WorkProcessor} from {@link Process}. {@link Process} instance will be dereferenced immediately after {@link WorkProcessor} is finished.
*/
static <T> WorkProcessor<T> create(Process<T> process)
{
return WorkProcessorUtils.create(process);
}
static <T> WorkProcessor<T> mergeSorted(Iterable<WorkProcessor<T>> processorIterable, Comparator<T> comparator)
{
return WorkProcessorUtils.mergeSorted(processorIterable, comparator);
}
interface Transformation<T, R>
{
/**
* Processes input elements and returns current transformation state.
*
* @param elementOptional an element to be transformed. Will be empty
* when there are no more elements. In such case transformation should
* finish processing and flush any remaining data.
* @return the current transformation state, optionally bearing a result
* @see TransformationState#needsMoreData()
* @see TransformationState#blocked(ListenableFuture)
* @see TransformationState#yield()
* @see TransformationState#ofResult(Object)
* @see TransformationState#ofResult(Object, boolean)
* @see TransformationState#finished()
*/
TransformationState<R> process(Optional<T> elementOptional);
}
interface Process<T>
{
/**
* Does some work and returns current state.
*
* @return the current state, optionally bearing a result
* @see ProcessState#blocked(ListenableFuture)
* @see ProcessState#yield()
* @see ProcessState#ofResult(Object)
* @see ProcessState#finished()
*/
ProcessState<T> process();
}
@Immutable
final class TransformationState<T>
{
private static final TransformationState<?> NEEDS_MORE_DATE_STATE = new TransformationState<>(Type.NEEDS_MORE_DATA, true, Optional.empty(), Optional.empty());
private static final TransformationState<?> YIELD_STATE = new TransformationState<>(Type.YIELD, false, Optional.empty(), Optional.empty());
private static final TransformationState<?> FINISHED_STATE = new TransformationState<>(Type.FINISHED, false, Optional.empty(), Optional.empty());
enum Type
{
NEEDS_MORE_DATA,
BLOCKED,
YIELD,
RESULT,
FINISHED
}
private final Type type;
private final boolean needsMoreData;
private final Optional<T> result;
private final Optional<ListenableFuture<?>> blocked;
private TransformationState(Type type, boolean needsMoreData, Optional<T> result, Optional<ListenableFuture<?>> blocked)
{
this.type = requireNonNull(type, "type is null");
this.needsMoreData = needsMoreData;
this.result = requireNonNull(result, "result is null");
this.blocked = requireNonNull(blocked, "blocked is null");
}
/**
* Signals that transformation requires more data in order to continue and no result has been produced.
* {@link #process()} will be called with a new input element or with {@link Optional#empty()} if there
* are no more elements.
*/
@SuppressWarnings("unchecked")
public static <T> TransformationState<T> needsMoreData()
{
return (TransformationState<T>) NEEDS_MORE_DATE_STATE;
}
/**
* Signals that transformation is blocked. {@link #process()} will be called again with the same input
* element after {@code blocked} future is done.
*/
public static <T> TransformationState<T> blocked(ListenableFuture<?> blocked)
{
return new TransformationState<>(Type.BLOCKED, false, Optional.empty(), Optional.of(blocked));
}
/**
* Signals that transformation has yielded. {@link #process()} will be called again with the same input element.
*/
@SuppressWarnings("unchecked")
public static <T> TransformationState<T> yield()
{
return (TransformationState<T>) YIELD_STATE;
}
/**
* Signals that transformation has produced a result from its input. {@link #process()} will be called again with
* a new element or with {@link Optional#empty()} if there are no more elements.
*/
public static <T> TransformationState<T> ofResult(T result)
{
return ofResult(result, true);
}
/**
* Signals that transformation has produced a result. If {@code needsMoreData}, {@link #process()} will be called again
* with a new element (or with {@link Optional#empty()} if there are no more elements). If not @{code needsMoreData},
* {@link #process()} will be called again with the same element.
*/
public static <T> TransformationState<T> ofResult(T result, boolean needsMoreData)
{
return new TransformationState<>(Type.RESULT, needsMoreData, Optional.of(result), Optional.empty());
}
/**
* Signals that transformation has finished. {@link #process()} method will not be called again.
*/
@SuppressWarnings("unchecked")
public static <T> TransformationState<T> finished()
{
return (TransformationState<T>) FINISHED_STATE;
}
Type getType()
{
return type;
}
boolean isNeedsMoreData()
{
return needsMoreData;
}
Optional<T> getResult()
{
return result;
}
Optional<ListenableFuture<?>> getBlocked()
{
return blocked;
}
}
@Immutable
final class ProcessState<T>
{
private static final ProcessState<?> YIELD_STATE = new ProcessState<>(Type.YIELD, Optional.empty(), Optional.empty());
private static final ProcessState<?> FINISHED_STATE = new ProcessState<>(Type.FINISHED, Optional.empty(), Optional.empty());
enum Type
{
BLOCKED,
YIELD,
RESULT,
FINISHED
}
private final Type type;
private final Optional<T> result;
private final Optional<ListenableFuture<?>> blocked;
private ProcessState(Type type, Optional<T> result, Optional<ListenableFuture<?>> blocked)
{
this.type = requireNonNull(type, "type is null");
this.result = requireNonNull(result, "result is null");
this.blocked = requireNonNull(blocked, "blocked is null");
}
/**
* Signals that process is blocked. {@link #process()} will be called again after {@code blocked} future is done.
*/
public static <T> ProcessState<T> blocked(ListenableFuture<?> blocked)
{
return new ProcessState<>(Type.BLOCKED, Optional.empty(), Optional.of(blocked));
}
/**
* Signals that process has yielded. {@link #process()} will be called again later.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessState<T> yield()
{
return (ProcessState<T>) YIELD_STATE;
}
/**
* Signals that process has produced a result. {@link #process()} will be called again.
*/
public static <T> ProcessState<T> ofResult(T result)
{
return new ProcessState<>(Type.RESULT, Optional.of(result), Optional.empty());
}
/**
* Signals that process has finished. {@link #process()} method will not be called again.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessState<T> finished()
{
return (ProcessState<T>) FINISHED_STATE;
}
Type getType()
{
return type;
}
Optional<T> getResult()
{
return result;
}
Optional<ListenableFuture<?>> getBlocked()
{
return blocked;
}
}
}