TestWorkProcessor.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.operator.WorkProcessor.ProcessState;
import com.facebook.presto.operator.WorkProcessor.TransformationState;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import org.testng.annotations.Test;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.facebook.presto.operator.WorkProcessorAssertion.assertBlocks;
import static com.facebook.presto.operator.WorkProcessorAssertion.assertFinishes;
import static com.facebook.presto.operator.WorkProcessorAssertion.assertResult;
import static com.facebook.presto.operator.WorkProcessorAssertion.assertUnblocks;
import static com.facebook.presto.operator.WorkProcessorAssertion.assertYields;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class TestWorkProcessor
{
@Test
public void testIterator()
{
WorkProcessor<Integer> processor = processorFrom(ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.ofResult(2),
ProcessState.finished()));
Iterator<Integer> iterator = processor.iterator();
assertTrue(iterator.hasNext());
assertEquals(iterator.next(), (Integer) 1);
assertTrue(iterator.hasNext());
assertEquals(iterator.next(), (Integer) 2);
assertFalse(iterator.hasNext());
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Cannot iterate over yielding WorkProcessor")
public void testIteratorFailsWhenWorkProcessorHasYielded()
{
// iterator should fail if underlying work has yielded
WorkProcessor<Integer> processor = processorFrom(ImmutableList.of(ProcessState.yield()));
Iterator<Integer> iterator = processor.iterator();
iterator.hasNext();
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Cannot iterate over blocking WorkProcessor")
public void testIteratorFailsWhenWorkProcessorIsBlocked()
{
// iterator should fail if underlying work is blocked
WorkProcessor<Integer> processor = processorFrom(ImmutableList.of(ProcessState.blocked(SettableFuture.create())));
Iterator<Integer> iterator = processor.iterator();
iterator.hasNext();
}
@Test(timeOut = 5000)
public void testMergeSorted()
{
List<ProcessState<Integer>> firstStream = ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.ofResult(3),
ProcessState.yield(),
ProcessState.ofResult(5),
ProcessState.finished());
SettableFuture<?> secondFuture = SettableFuture.create();
List<ProcessState<Integer>> secondStream = ImmutableList.of(
ProcessState.ofResult(2),
ProcessState.ofResult(4),
ProcessState.blocked(secondFuture),
ProcessState.finished());
WorkProcessor<Integer> mergedStream = WorkProcessorUtils.mergeSorted(
ImmutableList.of(processorFrom(firstStream), processorFrom(secondStream)),
Comparator.comparingInt(firstInteger -> firstInteger));
// first stream result 1
assertResult(mergedStream, 1);
// second stream result 2
assertResult(mergedStream, 2);
// first stream result 3
assertResult(mergedStream, 3);
// first stream yield
assertYields(mergedStream);
// second stream result 4
assertResult(mergedStream, 4);
// second stream blocked
assertBlocks(mergedStream);
// second stream unblock
assertUnblocks(mergedStream, secondFuture);
// first stream result 5
assertResult(mergedStream, 5);
// both streams finished
assertFinishes(mergedStream);
}
@Test(timeOut = 5000)
public void testMergeSortedEmptyStreams()
{
SettableFuture<?> firstFuture = SettableFuture.create();
List<ProcessState<Integer>> firstStream = ImmutableList.of(
ProcessState.blocked(firstFuture),
ProcessState.yield(),
ProcessState.finished());
SettableFuture<?> secondFuture = SettableFuture.create();
List<ProcessState<Integer>> secondStream = ImmutableList.of(
ProcessState.blocked(secondFuture),
ProcessState.finished());
WorkProcessor<Integer> mergedStream = WorkProcessorUtils.mergeSorted(
ImmutableList.of(processorFrom(firstStream), processorFrom(secondStream)),
Comparator.comparingInt(firstInteger -> firstInteger));
assertFalse(mergedStream.isBlocked());
assertFalse(mergedStream.isFinished());
// first stream blocked
assertBlocks(mergedStream);
// first stream unblock
assertUnblocks(mergedStream, firstFuture);
// first stream yield
assertYields(mergedStream);
// second stream blocked
assertBlocks(mergedStream);
// first stream unblock
assertUnblocks(mergedStream, secondFuture);
// both streams finished
assertFinishes(mergedStream);
}
@Test(timeOut = 5000)
public void testMergeSortedEmptyStreamsWithFinishedOnly()
{
List<ProcessState<Integer>> firstStream = ImmutableList.of(
ProcessState.finished());
List<ProcessState<Integer>> secondStream = ImmutableList.of(
ProcessState.finished());
WorkProcessor<Integer> mergedStream = WorkProcessorUtils.mergeSorted(
ImmutableList.of(processorFrom(firstStream), processorFrom(secondStream)),
Comparator.comparingInt(firstInteger -> firstInteger));
// before
assertFalse(mergedStream.isBlocked());
assertFalse(mergedStream.isFinished());
assertFinishes(mergedStream);
}
@Test(timeOut = 5000)
public void testYield()
{
SettableFuture<?> future = SettableFuture.create();
List<ProcessState<Integer>> baseScenario = ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.ofResult(2),
ProcessState.blocked(future),
ProcessState.ofResult(3),
ProcessState.ofResult(4),
ProcessState.finished());
AtomicBoolean yieldSignal = new AtomicBoolean();
WorkProcessor<Integer> processor = processorFrom(baseScenario)
.yielding(yieldSignal::get);
// no yield, process normally
assertResult(processor, 1);
yieldSignal.set(true);
assertYields(processor);
// processor should progress since it yielded last time
assertResult(processor, 2);
// base scenario future blocks
assertBlocks(processor);
assertUnblocks(processor, future);
// yield signal is still set
assertYields(processor);
// continue to process normally
yieldSignal.set(false);
assertResult(processor, 3);
assertResult(processor, 4);
assertFinishes(processor);
}
@Test(timeOut = 5000)
public void testFlatMap()
{
List<ProcessState<Integer>> baseScenario = ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.ofResult(2),
ProcessState.finished());
WorkProcessor<Double> processor = processorFrom(baseScenario)
.flatMap(element -> WorkProcessor.fromIterable(ImmutableList.of((Double) 2. * element, (Double) 3. * element)));
assertResult(processor, 2.);
assertResult(processor, 3.);
assertResult(processor, 4.);
assertResult(processor, 6.);
assertFinishes(processor);
}
@Test(timeOut = 5000)
public void testMap()
{
List<ProcessState<Integer>> baseScenario = ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.ofResult(2),
ProcessState.finished());
WorkProcessor<Double> processor = processorFrom(baseScenario)
.map(element -> 2. * element);
assertResult(processor, 2.);
assertResult(processor, 4.);
assertFinishes(processor);
}
@Test(timeOut = 5000)
public void testFlatTransform()
{
SettableFuture<?> baseFuture = SettableFuture.create();
List<ProcessState<Double>> baseScenario = ImmutableList.of(
ProcessState.ofResult(1.0),
ProcessState.blocked(baseFuture),
ProcessState.ofResult(2.0),
ProcessState.yield(),
ProcessState.ofResult(3.0),
ProcessState.ofResult(4.0),
ProcessState.finished());
SettableFuture<?> mappedFuture1 = SettableFuture.create();
List<ProcessState<Integer>> mappedScenario1 = ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.yield(),
ProcessState.blocked(mappedFuture1),
ProcessState.ofResult(2),
ProcessState.finished());
List<ProcessState<Integer>> mappedScenario2 = ImmutableList.of(ProcessState.finished());
SettableFuture<?> mappedFuture3 = SettableFuture.create();
List<ProcessState<Integer>> mappedScenario3 = ImmutableList.of(
ProcessState.blocked(mappedFuture3),
ProcessState.finished());
List<ProcessState<Integer>> mappedScenario4 = ImmutableList.of(
ProcessState.ofResult(3),
ProcessState.finished());
SettableFuture<?> transformationFuture = SettableFuture.create();
List<Transform<Double, WorkProcessor<Integer>>> transformationScenario = ImmutableList.of(
Transform.of(Optional.of(1.0), TransformationState.ofResult(processorFrom(mappedScenario1), false)),
Transform.of(Optional.of(1.0), TransformationState.ofResult(processorFrom(mappedScenario2), false)),
Transform.of(Optional.of(1.0), TransformationState.ofResult(processorFrom(mappedScenario3))),
Transform.of(Optional.of(2.0), TransformationState.blocked(transformationFuture)),
Transform.of(Optional.of(2.0), TransformationState.ofResult(processorFrom(mappedScenario4))),
Transform.of(Optional.of(3.0), TransformationState.finished()));
WorkProcessor<Integer> processor = processorFrom(baseScenario)
.flatTransform(transformationFrom(transformationScenario));
// mappedScenario1.result 1
assertResult(processor, 1);
// mappedScenario1.yield
assertYields(processor);
// mappedScenario1.blocked
assertBlocks(processor);
// mappedScenario1 unblocks
assertUnblocks(processor, mappedFuture1);
// mappedScenario1 result 2
assertResult(processor, 2);
// mappedScenario3.blocked
assertBlocks(processor);
// mappedScenario3 unblocks
assertUnblocks(processor, mappedFuture3);
// base.blocked
assertBlocks(processor);
// base unblocks
assertUnblocks(processor, baseFuture);
// transformation.blocked
assertBlocks(processor);
// transformation unblocks
assertUnblocks(processor, transformationFuture);
// mappedScenario4 result 3
assertResult(processor, 3);
// base.yield
assertYields(processor);
// transformation finishes
assertFinishes(processor);
}
@Test(timeOut = 5000)
public void testTransform()
{
SettableFuture<?> baseFuture = SettableFuture.create();
List<ProcessState<Integer>> baseScenario = ImmutableList.of(
ProcessState.ofResult(1),
ProcessState.yield(),
ProcessState.blocked(baseFuture),
ProcessState.ofResult(2),
ProcessState.ofResult(3),
ProcessState.finished());
SettableFuture<?> transformationFuture = SettableFuture.create();
List<Transform<Integer, String>> transformationScenario = ImmutableList.of(
Transform.of(Optional.of(1), TransformationState.needsMoreData()),
Transform.of(Optional.of(2), TransformationState.ofResult("foo")),
Transform.of(Optional.of(3), TransformationState.blocked(transformationFuture)),
Transform.of(Optional.of(3), TransformationState.yield()),
Transform.of(Optional.of(3), TransformationState.ofResult("bar", false)),
Transform.of(Optional.of(3), TransformationState.ofResult("zoo", true)),
Transform.of(Optional.empty(), TransformationState.ofResult("car", false)),
Transform.of(Optional.empty(), TransformationState.finished()));
WorkProcessor<String> processor = processorFrom(baseScenario)
.transform(transformationFrom(transformationScenario));
// before
assertFalse(processor.isBlocked());
assertFalse(processor.isFinished());
// base.yield
assertYields(processor);
// base.blocked
assertBlocks(processor);
// base unblock
assertUnblocks(processor, baseFuture);
// transformation.result foo
assertResult(processor, "foo");
// transformation.blocked
assertBlocks(processor);
// transformation.unblock
assertUnblocks(processor, transformationFuture);
// transformation.yield
assertYields(processor);
// transformation.result bar
assertResult(processor, "bar");
// transformation.result zoo
assertResult(processor, "zoo");
// transformation.result car
assertResult(processor, "car");
// transformation.finished
assertFinishes(processor);
}
@Test(timeOut = 5000)
public void testCreateFrom()
{
SettableFuture<?> future = SettableFuture.create();
List<ProcessState<Integer>> scenario = ImmutableList.of(
ProcessState.yield(),
ProcessState.ofResult(1),
ProcessState.blocked(future),
ProcessState.yield(),
ProcessState.ofResult(2),
ProcessState.finished());
WorkProcessor<Integer> processor = processorFrom(scenario);
// before
assertFalse(processor.isBlocked());
assertFalse(processor.isFinished());
assertYields(processor);
assertResult(processor, 1);
assertBlocks(processor);
assertUnblocks(processor, future);
assertYields(processor);
assertResult(processor, 2);
assertFinishes(processor);
}
private static <T, R> WorkProcessor.Transformation<T, R> transformationFrom(List<Transform<T, R>> transformations)
{
Iterator<Transform<T, R>> iterator = transformations.iterator();
return elementOptional -> {
assertTrue(iterator.hasNext());
return iterator.next().transform(elementOptional);
};
}
private static <T> WorkProcessor<T> processorFrom(List<ProcessState<T>> states)
{
return WorkProcessorUtils.create(processFrom(states));
}
private static <T> WorkProcessor.Process<T> processFrom(List<ProcessState<T>> states)
{
Iterator<ProcessState<T>> iterator = states.iterator();
return () -> {
assertTrue(iterator.hasNext());
return iterator.next();
};
}
private static class Transform<T, R>
{
final Optional<T> from;
final TransformationState<R> to;
Transform(Optional<T> from, TransformationState<R> to)
{
this.from = requireNonNull(from);
this.to = requireNonNull(to);
}
static <T, R> Transform<T, R> of(Optional<T> from, TransformationState<R> to)
{
return new Transform<>(from, to);
}
TransformationState<R> transform(Optional<T> from)
{
assertEquals(from, this.from);
return to;
}
}
}