TestThriftServerInfoIntegration.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.server;

import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.drift.client.DriftClientFactory;
import com.facebook.drift.client.address.AddressSelector;
import com.facebook.drift.client.address.SimpleAddressSelector;
import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.drift.server.DriftServer;
import com.facebook.drift.transport.netty.client.DriftNettyClientConfig;
import com.facebook.drift.transport.netty.client.DriftNettyMethodInvokerFactory;
import com.facebook.drift.transport.netty.server.DriftNettyServerModule;
import com.facebook.drift.transport.netty.server.DriftNettyServerTransport;
import com.facebook.presto.Session;
import com.facebook.presto.dispatcher.NoOpQueryManager;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManager;
import com.facebook.presto.execution.TaskSource;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBufferInfo;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.memory.MemoryPoolAssignmentsRequest;
import com.facebook.presto.metadata.MetadataUpdates;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.server.thrift.ThriftServerInfoClient;
import com.facebook.presto.server.thrift.ThriftServerInfoService;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.inject.Singleton;

import java.util.List;
import java.util.Optional;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.drift.client.ExceptionClassifier.NORMAL_RESULT;
import static com.facebook.drift.server.guice.DriftServerBinder.driftServerBinder;
import static com.facebook.drift.transport.netty.client.DriftNettyMethodInvokerFactory.createStaticDriftNettyMethodInvokerFactory;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public class TestThriftServerInfoIntegration
{
    private LifeCycleManager lifeCycleManager;
    private int thriftServerPort;

    @BeforeClass
    public void setup()
            throws Exception
    {
        Bootstrap app = new Bootstrap(
                new DriftNettyServerModule(),
                new TestingThriftServerInfoModule());

        app.setRequiredConfigurationProperties(ImmutableMap.of("presto.version", "test.0", "coordinator", "false"));

        Injector injector = app
                .doNotInitializeLogging()
                .initialize();

        lifeCycleManager = injector.getInstance(LifeCycleManager.class);
        thriftServerPort = driftServerPort(injector.getInstance(DriftServer.class));
    }

    @AfterClass
    public void teardown()
    {
        if (lifeCycleManager != null) {
            lifeCycleManager.stop();
        }
    }

    @Test
    public void testServer()
    {
        AddressSelector<SimpleAddressSelector.SimpleAddress> addressSelector = new SimpleAddressSelector(
                ImmutableSet.of(HostAndPort.fromParts("localhost", thriftServerPort)),
                true);
        try (DriftNettyMethodInvokerFactory<?> invokerFactory = createStaticDriftNettyMethodInvokerFactory(new DriftNettyClientConfig())) {
            DriftClientFactory clientFactory = new DriftClientFactory(new ThriftCodecManager(), invokerFactory, addressSelector, NORMAL_RESULT);
            ThriftServerInfoClient client = clientFactory.createDriftClient(ThriftServerInfoClient.class).get();

            // get buffer result
            NodeState state = NodeState.valueOf(client.getServerState().get());
            assertEquals(state, ACTIVE);
        }
        catch (Exception e) {
            fail();
        }
    }

    private static int driftServerPort(DriftServer server)
    {
        return ((DriftNettyServerTransport) server.getServerTransport()).getPort();
    }

    public static class TestingThriftServerInfoModule
            implements Module
    {
        @Override
        public void configure(Binder binder)
        {
            configBinder(binder).bindConfig(ServerConfig.class);
            //Bind noop QueryManager similar to the binding done for TaskManager here
            binder.bind(QueryManager.class).to(NoOpQueryManager.class).in(Scopes.SINGLETON);
            binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON);
            binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON);
            binder.bind(ShutdownAction.class).to(TestingPrestoServer.TestShutdownAction.class).in(Scopes.SINGLETON);

            binder.bind(ThriftServerInfoService.class).in(Scopes.SINGLETON);
            driftServerBinder(binder).bindService(ThriftServerInfoService.class);
        }

        @Provides
        @Singleton
        public static TaskManager createTaskManager()
        {
            return new TaskManager()
            {
                @Override
                public List<TaskInfo> getAllTaskInfo()
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public TaskInfo getTaskInfo(TaskId taskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public TaskStatus getTaskStatus(TaskId taskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public ListenableFuture<TaskInfo> getTaskInfo(TaskId taskId, TaskState currentState)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public String getTaskInstanceId(TaskId taskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public ListenableFuture<TaskStatus> getTaskStatus(TaskId taskId, TaskState currentState)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public void updateMemoryPoolAssignments(MemoryPoolAssignmentsRequest assignments)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, Optional<TableWriteInfo> tableWriteInfo)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public TaskInfo cancelTask(TaskId taskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public TaskInfo abortTask(TaskId taskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, long maxSizeInBytes)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public OutputBufferInfo getOutputBufferInfo(TaskId taskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long sequenceId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public TaskInfo abortTaskResults(TaskId taskId, OutputBufferId bufferId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public void addStateChangeListener(TaskId taskId, StateMachine.StateChangeListener<TaskState> stateChangeListener)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public void removeRemoteSource(TaskId taskId, TaskId remoteSourceTaskId)
                {
                    throw new UnsupportedOperationException();
                }

                @Override
                public void updateMetadataResults(TaskId taskId, MetadataUpdates metadataUpdates)
                {
                    throw new UnsupportedOperationException();
                }
            };
        }
    }
}