TestSpatialJoinOperator.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.geospatial;

import com.facebook.presto.RowPagesBuilder;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.operator.Driver;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.InternalJoinFilterFunction;
import com.facebook.presto.operator.Operator;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.PagesIndex.TestingFactory;
import com.facebook.presto.operator.PagesSpatialIndex;
import com.facebook.presto.operator.PagesSpatialIndexFactory;
import com.facebook.presto.operator.SpatialIndexBuilderOperator.SpatialIndexBuilderOperatorFactory;
import com.facebook.presto.operator.SpatialIndexBuilderOperator.SpatialPredicate;
import com.facebook.presto.operator.SpatialJoinOperator.SpatialJoinOperatorFactory;
import com.facebook.presto.operator.StandardJoinFilterFunction;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.ValuesOperator;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.SpatialJoinNode.Type;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.TestingTaskContext;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.geospatial.GeoFunctions.spatialPartitions;
import static com.facebook.presto.geospatial.GeoFunctions.stGeometryFromText;
import static com.facebook.presto.geospatial.GeoFunctions.stPoint;
import static com.facebook.presto.geospatial.type.GeometryType.GEOMETRY;
import static com.facebook.presto.operator.OperatorAssertion.assertOperatorEqualsIgnoreOrder;
import static com.facebook.presto.spi.plan.SpatialJoinNode.Type.INNER;
import static com.facebook.presto.spi.plan.SpatialJoinNode.Type.LEFT;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestSpatialJoinOperator
{
    private static final KdbTree KDB_TREE = KdbTree.buildKdbTree(
            2,
            ImmutableList.of(
                    new Rectangle(-2, -2, -2, -2),
                    new Rectangle(0, 0, 0, 0),
                    new Rectangle(-1, -2, 4, 3),
                    new Rectangle(6, 1, 6, 1),
                    new Rectangle(3, 9, 3, 9),
                    new Rectangle(15, 15, 15, 15)));
    private static final String KDB_TREE_JSON = KdbTreeUtils.toJson(KDB_TREE);

    //  2 intersecting polygons: A and B
    private static final Slice POLYGON_A = stGeometryFromText(Slices.utf8Slice("POLYGON ((0 0, -0.5 2.5, 0 5, 2.5 5.5, 5 5, 5.5 2.5, 5 0, 2.5 -0.5, 0 0))"));
    private static final Slice POLYGON_B = stGeometryFromText(Slices.utf8Slice("POLYGON ((4 4, 3.5 7, 4 10, 7 10.5, 10 10, 10.5 7, 10 4, 7 3.5, 4 4))"));
    private static final Slice POLYGON_C = stGeometryFromText(Slices.utf8Slice("POLYGON ((15 15, 15 14, 14 14, 14 15, 15 15))"));
    private static final Slice POLYGON_D = stGeometryFromText(Slices.utf8Slice("POLYGON ((18 18, 18 19, 19 19, 19 18, 18 18))"));

    // A set of points: X in A, Y in A and B, Z in B, W outside of A and B
    private static final Slice POINT_X = stPoint(1, 1);
    private static final Slice POINT_Y = stPoint(4.5, 4.5);
    private static final Slice POINT_Z = stPoint(6, 6);
    private static final Slice POINT_W = stPoint(20, 20);
    private static final Slice POINT_V = stPoint(15, 15);
    private static final Slice MULTIPOINT_U = stGeometryFromText(Slices.utf8Slice("MULTIPOINT (15 15)"));
    private static final Slice MULTIPOINT_T = stGeometryFromText(Slices.utf8Slice("MULTIPOINT (14.5 14.5, 16 16)"));
    private static final Slice POINT_S = stPoint(18, 18);
    private static final Slice MULTIPOINT_R = stGeometryFromText(Slices.utf8Slice("MULTIPOINT (15 15, 19 19)"));
    private static final Slice POINT_Q = stPoint(28, 28);

    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;

    @BeforeMethod
    public void setUp()
    {
        // Before/AfterMethod is chosen here because the executor needs to be shutdown
        // after every single test case to terminate outstanding threads, if any.

        // The line below is the same as newCachedThreadPool(daemonThreadsNamed(...)) except RejectionExecutionHandler.
        // RejectionExecutionHandler is set to DiscardPolicy (instead of the default AbortPolicy) here.
        // Otherwise, a large number of RejectedExecutionException will flood logging, resulting in Travis failure.
        executor = new ThreadPoolExecutor(
                0,
                Integer.MAX_VALUE,
                60L,
                SECONDS,
                new SynchronousQueue<>(),
                daemonThreadsNamed("test-executor-%s"),
                new ThreadPoolExecutor.DiscardPolicy());
        scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("test-scheduledExecutor-%s"));
    }

    @AfterMethod(alwaysRun = true)
    public void tearDown()
    {
        executor.shutdownNow();
        scheduledExecutor.shutdownNow();
    }

    @Test
    public void testSpatialJoin()
    {
        TaskContext taskContext = createTaskContext();
        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POLYGON_A, "A")
                .row(null, "null")
                .pageBreak()
                .row(POLYGON_B, "B")
                .row(POLYGON_C, "C")
                .row(POLYGON_D, "D");

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POINT_X, "x")
                .row(null, "null")
                .row(POINT_Y, "y")
                .pageBreak()
                .row(POINT_Z, "z")
                .pageBreak()
                .row(POINT_W, "w")
                .row(POINT_V, "v")
                .row(MULTIPOINT_U, "u")
                .pageBreak()
                .row(MULTIPOINT_T, "t")
                .row(POINT_S, "s")
                .row(MULTIPOINT_R, "r")
                .row(POINT_Q, "q");

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR))
                .row("x", "A")
                .row("y", "A")
                .row("y", "B")
                .row("z", "B")
                .row("v", "C")
                .row("u", "C")
                .row("t", "C")
                .row("s", "D")
                .row("r", "C")
                .row("r", "D")
                .build();

        assertSpatialJoin(taskContext, INNER, buildPages, probePages, expected);
    }

    @Test
    public void testSpatialLeftJoin()
    {
        TaskContext taskContext = createTaskContext();
        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POLYGON_A, "A")
                .row(null, "null")
                .pageBreak()
                .row(POLYGON_B, "B")
                .row(POLYGON_C, "C")
                .row(POLYGON_D, "D");

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POINT_X, "x")
                .row(null, "null")
                .row(POINT_Y, "y")
                .pageBreak()
                .row(POINT_Z, "z")
                .pageBreak()
                .row(POINT_W, "w")
                .row(POINT_V, "v")
                .row(MULTIPOINT_U, "u")
                .pageBreak()
                .row(MULTIPOINT_T, "t")
                .row(POINT_S, "s")
                .row(MULTIPOINT_R, "r")
                .row(POINT_Q, "q");

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR))
                .row("x", "A")
                .row("null", null)
                .row("y", "A")
                .row("y", "B")
                .row("z", "B")
                .row("w", null)
                .row("v", "C")
                .row("u", "C")
                .row("t", "C")
                .row("s", "D")
                .row("r", "C")
                .row("r", "D")
                .row("q", null)
                .build();

        assertSpatialJoin(taskContext, LEFT, buildPages, probePages, expected);
    }

    private void assertSpatialJoin(TaskContext taskContext, Type joinType, RowPagesBuilder buildPages, RowPagesBuilder probePages, MaterializedResult expected)
    {
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();
        PagesSpatialIndexFactory pagesSpatialIndexFactory = buildIndex(driverContext, (build, probe, r) -> build.intersects(probe), Optional.empty(), Optional.empty(), buildPages);
        OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), joinType, probePages.getTypes(), Ints.asList(1), 0, Optional.empty(), pagesSpatialIndexFactory);
        assertOperatorEqualsIgnoreOrder(joinOperatorFactory, driverContext, probePages.build(), expected);
    }

    @Test
    public void testEmptyBuild()
    {
        TaskContext taskContext = createTaskContext();
        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR));

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POINT_X, "x")
                .row(null, "null")
                .row(POINT_Y, "y")
                .pageBreak()
                .row(POINT_Z, "z")
                .pageBreak()
                .row(POINT_W, "w");

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR)).build();

        assertSpatialJoin(taskContext, INNER, buildPages, probePages, expected);
    }

    @Test
    public void testEmptyBuildLeftJoin()
    {
        TaskContext taskContext = createTaskContext();
        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR));

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POINT_X, "x")
                .row(null, "null")
                .row(POINT_Y, "y")
                .pageBreak()
                .row(POINT_Z, "z")
                .pageBreak()
                .row(POINT_W, "w");

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR))
                .row("x", null)
                .row("null", null)
                .row("y", null)
                .row("z", null)
                .row("w", null)
                .build();

        assertSpatialJoin(taskContext, LEFT, buildPages, probePages, expected);
    }

    @Test
    public void testEmptyProbe()
    {
        TaskContext taskContext = createTaskContext();
        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POLYGON_A, "A")
                .row(null, "null")
                .pageBreak()
                .row(POLYGON_B, "B");

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR));

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR)).build();

        assertSpatialJoin(taskContext, INNER, buildPages, probePages, expected);
    }

    @Test
    public void testYield()
    {
        // create a filter function that yields for every probe match
        // verify we will yield #match times totally

        TaskContext taskContext = createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();

        // force a yield for every match
        AtomicInteger filterFunctionCalls = new AtomicInteger();
        InternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction((
                (leftPosition, leftPage, rightPosition, rightPage) -> {
                    filterFunctionCalls.incrementAndGet();
                    driverContext.getYieldSignal().forceYieldForTesting();
                    return true;
                }));

        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(POLYGON_A, "A")
                .pageBreak()
                .row(POLYGON_B, "B");
        PagesSpatialIndexFactory pagesSpatialIndexFactory = buildIndex(driverContext, (build, probe, r) -> build.contains(probe), Optional.empty(), Optional.of(filterFunction), buildPages);

        // 10 points in polygon A (x0...x9)
        // 10 points in polygons A and B (y0...y9)
        // 10 points in polygon B (z0...z9)
        // 40 total matches
        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR));
        for (int i = 0; i < 10; i++) {
            probePages.row(stPoint(1 + 0.1 * i, 1 + 0.1 * i), "x" + i);
        }
        for (int i = 0; i < 10; i++) {
            probePages.row(stPoint(4.5 + 0.01 * i, 4.5 + 0.01 * i), "y" + i);
        }
        for (int i = 0; i < 10; i++) {
            probePages.row(stPoint(6 + 0.1 * i, 6 + 0.1 * i), "z" + i);
        }
        List<Page> probeInput = probePages.build();

        OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), INNER, probePages.getTypes(), Ints.asList(1), 0, Optional.empty(), pagesSpatialIndexFactory);

        Operator operator = joinOperatorFactory.createOperator(driverContext);
        assertTrue(operator.needsInput());
        operator.addInput(probeInput.get(0));
        operator.finish();

        // we will yield 40 times due to filterFunction
        for (int i = 0; i < 40; i++) {
            driverContext.getYieldSignal().setWithDelay(5 * SECONDS.toNanos(1), driverContext.getYieldExecutor());
            assertNull(operator.getOutput());
            assertEquals(filterFunctionCalls.get(), i + 1, "Expected join to stop processing (yield) after calling filter function once");
            driverContext.getYieldSignal().reset();
        }
        // delayed yield is not going to prevent operator from producing a page now (yield won't be forced because filter function won't be called anymore)
        driverContext.getYieldSignal().setWithDelay(5 * SECONDS.toNanos(1), driverContext.getYieldExecutor());
        Page output = operator.getOutput();
        assertNotNull(output);

        // make sure we have 40 matches
        assertEquals(output.getPositionCount(), 40);
    }

    @Test
    public void testDistanceQuery()
    {
        TaskContext taskContext = createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, false).addDriverContext();

        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR, DOUBLE))
                .row(stPoint(0, 0), "0_0", 1.5)
                .row(null, "null", 1.5)
                .row(stPoint(1, 0), "1_0", 1.5)
                .pageBreak()
                .row(stPoint(3, 0), "3_0", 1.5)
                .pageBreak()
                .row(stPoint(10, 0), "10_0", 1.5);
        PagesSpatialIndexFactory pagesSpatialIndexFactory = buildIndex(driverContext, (build, probe, r) -> build.distance(probe) <= r.getAsDouble(), Optional.of(2), Optional.empty(), buildPages);

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR))
                .row(stPoint(0, 1), "0_1")
                .row(null, "null")
                .row(stPoint(1, 1), "1_1")
                .pageBreak()
                .row(stPoint(3, 1), "3_1")
                .pageBreak()
                .row(stPoint(10, 1), "10_1");
        OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), INNER, probePages.getTypes(), Ints.asList(1), 0, Optional.empty(), pagesSpatialIndexFactory);

        // Make sure that spatial index reference counting works with duplicate factories
        joinOperatorFactory.duplicate().noMoreOperators();

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR))
                .row("0_1", "0_0")
                .row("0_1", "1_0")
                .row("1_1", "0_0")
                .row("1_1", "1_0")
                .row("3_1", "3_0")
                .row("10_1", "10_0")
                .build();

        assertOperatorEqualsIgnoreOrder(joinOperatorFactory, driverContext, probePages.build(), expected);
    }

    @Test
    public void testDistributedSpatialJoin()
    {
        TaskContext taskContext = createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, true).addDriverContext();

        RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR, INTEGER));
        addGeometryPartitionRows(buildPages, POLYGON_A, "A");
        buildPages.row(null, "null", null);
        buildPages.pageBreak();
        addGeometryPartitionRows(buildPages, POLYGON_B, "B");
        addGeometryPartitionRows(buildPages, POLYGON_C, "C");
        addGeometryPartitionRows(buildPages, POLYGON_D, "D");

        RowPagesBuilder probePages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR, INTEGER));
        addGeometryPartitionRows(probePages, POINT_X, "x");
        probePages.row(null, "null", null);
        addGeometryPartitionRows(probePages, POINT_Y, "y");
        probePages.pageBreak();
        addGeometryPartitionRows(probePages, POINT_Z, "z");
        addGeometryPartitionRows(probePages, POINT_W, "w");
        addGeometryPartitionRows(probePages, POINT_V, "v");
        addGeometryPartitionRows(probePages, MULTIPOINT_U, "u");
        probePages.pageBreak();
        addGeometryPartitionRows(probePages, MULTIPOINT_T, "t");
        addGeometryPartitionRows(probePages, POINT_S, "s");
        addGeometryPartitionRows(probePages, MULTIPOINT_R, "r");
        addGeometryPartitionRows(probePages, POINT_Q, "q");

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR))
                .row("x", "A")
                .row("y", "A")
                .row("y", "B")
                .row("z", "B")
                .row("v", "C")
                .row("u", "C")
                .row("t", "C")
                .row("s", "D")
                .row("r", "C")
                .row("r", "D")
                .build();

        PagesSpatialIndexFactory pagesSpatialIndexFactory = buildIndex(driverContext, (build, probe, r) -> build.intersects(probe), Optional.empty(), Optional.of(2), Optional.of(KDB_TREE_JSON), Optional.empty(), buildPages);
        OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), INNER, probePages.getTypes(), Ints.asList(1), 0, Optional.of(2), pagesSpatialIndexFactory);
        assertOperatorEqualsIgnoreOrder(joinOperatorFactory, driverContext, probePages.build(), expected);
    }

    @Test
    public void testDistributedSpatialSelfJoin()
    {
        TaskContext taskContext = createTaskContext();
        DriverContext driverContext = taskContext.addPipelineContext(0, true, true, true).addDriverContext();

        RowPagesBuilder pages = rowPagesBuilder(ImmutableList.of(GEOMETRY, VARCHAR, INTEGER));
        addGeometryPartitionRows(pages, POLYGON_A, "A");
        pages.row(null, "null", null);
        pages.pageBreak();
        addGeometryPartitionRows(pages, POLYGON_B, "B");

        MaterializedResult expected = resultBuilder(taskContext.getSession(), ImmutableList.of(VARCHAR, VARCHAR))
                .row("A", "A")
                .row("A", "B")
                .row("B", "A")
                .row("B", "B")
                .build();

        PagesSpatialIndexFactory pagesSpatialIndexFactory = buildIndex(driverContext, (build, probe, r) -> build.intersects(probe), Optional.empty(), Optional.of(2), Optional.of(KDB_TREE_JSON), Optional.empty(), pages);
        OperatorFactory joinOperatorFactory = new SpatialJoinOperatorFactory(2, new PlanNodeId("test"), INNER, pages.getTypes(), Ints.asList(1), 0, Optional.of(2), pagesSpatialIndexFactory);
        assertOperatorEqualsIgnoreOrder(joinOperatorFactory, driverContext, pages.build(), expected);
    }

    private void addGeometryPartitionRows(RowPagesBuilder pageBuilder, Slice geometry, String geometryName)
    {
        Block partitionIndices = spatialPartitions(KDB_TREE, geometry);
        for (int position = 0; position < partitionIndices.getPositionCount(); position++) {
            int partitionIndex = partitionIndices.getInt(position);
            pageBuilder.row(geometry, geometryName, partitionIndex);
        }
    }

    private PagesSpatialIndexFactory buildIndex(DriverContext driverContext, SpatialPredicate spatialRelationshipTest, Optional<Integer> radiusChannel, Optional<InternalJoinFilterFunction> filterFunction, RowPagesBuilder buildPages)
    {
        return buildIndex(driverContext, spatialRelationshipTest, radiusChannel, Optional.empty(), Optional.empty(), filterFunction, buildPages);
    }

    private PagesSpatialIndexFactory buildIndex(DriverContext driverContext, SpatialPredicate spatialRelationshipTest, Optional<Integer> radiusChannel, Optional<Integer> partitionChannel, Optional<String> kdbTreeJson, Optional<InternalJoinFilterFunction> filterFunction, RowPagesBuilder buildPages)
    {
        Optional<JoinFilterFunctionCompiler.JoinFilterFunctionFactory> filterFunctionFactory = filterFunction
                .map(function -> (session, addresses, pages) -> new StandardJoinFilterFunction(function, addresses, pages));

        ValuesOperator.ValuesOperatorFactory valuesOperatorFactory = new ValuesOperator.ValuesOperatorFactory(0, new PlanNodeId("test"), buildPages.build());
        SpatialIndexBuilderOperatorFactory buildOperatorFactory = new SpatialIndexBuilderOperatorFactory(
                1,
                new PlanNodeId("test"),
                buildPages.getTypes(),
                Ints.asList(1),
                0,
                radiusChannel,
                partitionChannel,
                spatialRelationshipTest,
                kdbTreeJson,
                filterFunctionFactory,
                10_000,
                new TestingFactory(false));

        Driver driver = Driver.createDriver(
                driverContext,
                valuesOperatorFactory.createOperator(driverContext),
                buildOperatorFactory.createOperator(driverContext));

        PagesSpatialIndexFactory pagesSpatialIndexFactory = buildOperatorFactory.getPagesSpatialIndexFactory();
        ListenableFuture<PagesSpatialIndex> pagesSpatialIndex = pagesSpatialIndexFactory.createPagesSpatialIndex();

        while (!pagesSpatialIndex.isDone()) {
            driver.process();
        }

        // Release the spatial index reference
        pagesSpatialIndexFactory.probeOperatorFinished();

        runDriverInThread(executor, driver);
        return pagesSpatialIndexFactory;
    }

    /**
     * Runs Driver in another thread until it is finished
     */
    private static void runDriverInThread(ExecutorService executor, Driver driver)
    {
        executor.execute(() -> {
            if (!driver.isFinished()) {
                try {
                    driver.process();
                }
                catch (PrestoException e) {
                    driver.getDriverContext().failed(e);
                    throw e;
                }
                runDriverInThread(executor, driver);
            }
        });
    }

    private TaskContext createTaskContext()
    {
        return TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION);
    }

    private static class TestInternalJoinFilterFunction
            implements InternalJoinFilterFunction
    {
        public interface Lambda
        {
            boolean filter(int leftPosition, Page leftPage, int rightPosition, Page rightPage);
        }

        private final TestInternalJoinFilterFunction.Lambda lambda;

        private TestInternalJoinFilterFunction(TestInternalJoinFilterFunction.Lambda lambda)
        {
            this.lambda = lambda;
        }

        @Override
        public boolean filter(int leftPosition, Page leftPage, int rightPosition, Page rightPage)
        {
            return lambda.filter(leftPosition, leftPage, rightPosition, rightPage);
        }
    }
}