TestPlanCheckerProvider.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.sidecar;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.sidecar.nativechecker.NativePlanChecker;
import com.facebook.presto.sidecar.nativechecker.NativePlanCheckerConfig;
import com.facebook.presto.sidecar.nativechecker.NativePlanCheckerProvider;
import com.facebook.presto.sidecar.nativechecker.PlanConversionFailureInfo;
import com.facebook.presto.sidecar.nativechecker.PlanConversionResponse;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.NodePoolType;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorPartitioningHandle;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanFragmentId;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.spi.plan.StageExecutionDescriptor;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.testng.annotations.Test;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

public class TestPlanCheckerProvider
{
    private static final JsonCodec<SimplePlanFragment> PLAN_FRAGMENT_JSON_CODEC = JsonCodec.jsonCodec(SimplePlanFragment.class);
    private static final JsonCodec<PlanConversionResponse> PLAN_CONVERSION_RESPONSE_JSON_CODEC = JsonCodec.jsonCodec(PlanConversionResponse.class);

    @Test
    public void testGetPlanChecker()
    {
        NativePlanCheckerConfig config = new NativePlanCheckerConfig();
        assertTrue(config.isPlanValidationEnabled());
        NativePlanCheckerProvider provider = new NativePlanCheckerProvider(new TestingNodeManager(URI.create("localhost")), PLAN_FRAGMENT_JSON_CODEC, config);
        assertTrue(provider.getIntermediatePlanCheckers().isEmpty());
        assertTrue(provider.getFinalPlanCheckers().isEmpty());
        assertEquals(provider.getFragmentPlanCheckers().size(), 1);
    }

    @Test
    public void testNativePlanMockValidate()
            throws IOException
    {
        TestingPlanNode root = new TestingPlanNode();
        ConnectorPartitioningHandle connectorPartitioningHandle = new TestingConnectorPartitioningHandle();
        PartitioningHandle handle = new PartitioningHandle(Optional.empty(), Optional.empty(), connectorPartitioningHandle);
        PartitioningScheme partitioningScheme = new PartitioningScheme(new Partitioning(handle, ImmutableList.of()), ImmutableList.of());
        SimplePlanFragment fragment = new SimplePlanFragment(new PlanFragmentId(1), root, ImmutableSet.of(), handle, ImmutableList.of(), partitioningScheme, StageExecutionDescriptor.ungroupedExecution(), false);

        try (MockWebServer server = new MockWebServer()) {
            server.start();
            TestingNodeManager nodeManager = new TestingNodeManager(server.url(NativePlanChecker.PLAN_CONVERSION_ENDPOINT).uri());
            NativePlanChecker checker = new NativePlanChecker(nodeManager, PLAN_FRAGMENT_JSON_CODEC);

            PlanConversionResponse responseOk = new PlanConversionResponse(ImmutableList.of());
            String responseOkString = PLAN_CONVERSION_RESPONSE_JSON_CODEC.toJson(responseOk);
            server.enqueue(new MockResponse().setBody(responseOkString));
            checker.validateFragment(fragment, null, null);

            String errorMessage = "native conversion error";
            ErrorCode errorCode = StandardErrorCode.NOT_SUPPORTED.toErrorCode();
            PlanConversionResponse responseError = new PlanConversionResponse(ImmutableList.of(new PlanConversionFailureInfo("MockError", errorMessage, null, ImmutableList.of(), ImmutableList.of(), errorCode)));
            String responseErrorString = PLAN_CONVERSION_RESPONSE_JSON_CODEC.toJson(responseError);
            server.enqueue(new MockResponse().setResponseCode(500).setBody(responseErrorString));
            PrestoException error = expectThrows(PrestoException.class,
                    () -> checker.validateFragment(fragment, null, null));
            assertEquals(error.getErrorCode(), errorCode);
            assertTrue(error.getMessage().contains(errorMessage));
        }
    }

    public static class TestingConnectorPartitioningHandle
            implements ConnectorPartitioningHandle
    {
        @JsonProperty
        @Override
        public boolean isCoordinatorOnly()
        {
            return false;
        }
    }

    private static class TestingPlanNode
            extends PlanNode
    {
        protected TestingPlanNode()
        {
            super(Optional.empty(), new PlanNodeId("1"), Optional.empty());
        }

        @Override
        public List<PlanNode> getSources()
        {
            return ImmutableList.of();
        }

        @Override
        public List<VariableReferenceExpression> getOutputVariables()
        {
            return ImmutableList.of();
        }

        @Override
        public PlanNode replaceChildren(List<PlanNode> newChildren)
        {
            return this;
        }

        @Override
        public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode)
        {
            return this;
        }
    }

    private static class TestingNodeManager
            implements NodeManager
    {
        private final TestSidecarNode sidecarNode;

        public TestingNodeManager(URI sidecarUri)
        {
            this.sidecarNode = new TestSidecarNode(sidecarUri);
        }

        @Override
        public Set<Node> getAllNodes()
        {
            return ImmutableSet.of(sidecarNode);
        }

        @Override
        public Set<Node> getWorkerNodes()
        {
            return Collections.emptySet();
        }

        @Override
        public Node getCurrentNode()
        {
            return null;
        }

        @Override
        public Node getSidecarNode()
        {
            return sidecarNode;
        }

        @Override
        public String getEnvironment()
        {
            return null;
        }
    }

    private static class TestSidecarNode
            implements Node
    {
        private final URI sidecarUri;

        public TestSidecarNode(URI sidecarUri)
        {
            this.sidecarUri = sidecarUri;
        }

        @Override
        public String getHost()
        {
            return sidecarUri.getHost();
        }

        @Override
        public HostAddress getHostAndPort()
        {
            return HostAddress.fromUri(sidecarUri);
        }

        @Override
        public URI getHttpUri()
        {
            return sidecarUri;
        }

        @Override
        public String getNodeIdentifier()
        {
            return "ABC";
        }

        @Override
        public String getVersion()
        {
            return "1";
        }

        @Override
        public boolean isCoordinator()
        {
            return false;
        }

        @Override
        public boolean isResourceManager()
        {
            return false;
        }

        @Override
        public boolean isCatalogServer()
        {
            return false;
        }

        @Override
        public boolean isCoordinatorSidecar()
        {
            return true;
        }

        @Override
        public NodePoolType getPoolType()
        {
            return NodePoolType.DEFAULT;
        }
    }
}