TestOperatorInfoUnionSerde.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator;

import com.facebook.drift.codec.ThriftCodec;
import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.drift.codec.internal.compiler.CompilerThriftCodecFactory;
import com.facebook.drift.codec.internal.reflection.ReflectionThriftCodecFactory;
import com.facebook.drift.codec.metadata.ThriftCatalog;
import com.facebook.drift.codec.utils.DurationToMillisThriftCodec;
import com.facebook.drift.codec.utils.JodaDateTimeToEpochMillisThriftCodec;
import com.facebook.drift.protocol.TBinaryProtocol;
import com.facebook.drift.protocol.TCompactProtocol;
import com.facebook.drift.protocol.TFacebookCompactProtocol;
import com.facebook.drift.protocol.TMemoryBuffer;
import com.facebook.drift.protocol.TProtocol;
import com.facebook.drift.protocol.TTransport;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.joda.time.DateTime;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.List;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertNotNull;

@Test(singleThreaded = true)
public class TestOperatorInfoUnionSerde
{
    private static final ThriftCatalog COMMON_CATALOG = new ThriftCatalog();
    private static final DurationToMillisThriftCodec DURATION_CODEC = new DurationToMillisThriftCodec(COMMON_CATALOG);
    private static final JodaDateTimeToEpochMillisThriftCodec DATE_TIME_CODEC = new JodaDateTimeToEpochMillisThriftCodec(COMMON_CATALOG);
    private static final ThriftCodecManager COMPILER_READ_CODEC_MANAGER = new ThriftCodecManager(new CompilerThriftCodecFactory(false), COMMON_CATALOG, ImmutableSet.of(DURATION_CODEC, DATE_TIME_CODEC));
    private static final ThriftCodecManager COMPILER_WRITE_CODEC_MANAGER = new ThriftCodecManager(new CompilerThriftCodecFactory(false), COMMON_CATALOG, ImmutableSet.of(DURATION_CODEC, DATE_TIME_CODEC));
    private static final ThriftCodec<OperatorInfoUnion> COMPILER_READ_CODEC = COMPILER_READ_CODEC_MANAGER.getCodec(OperatorInfoUnion.class);
    private static final ThriftCodec<OperatorInfoUnion> COMPILER_WRITE_CODEC = COMPILER_WRITE_CODEC_MANAGER.getCodec(OperatorInfoUnion.class);
    private static final ThriftCodecManager REFLECTION_READ_CODEC_MANAGER = new ThriftCodecManager(new ReflectionThriftCodecFactory(), COMMON_CATALOG, ImmutableSet.of(DURATION_CODEC, DATE_TIME_CODEC));
    private static final ThriftCodecManager REFLECTION_WRITE_CODEC_MANAGER = new ThriftCodecManager(new ReflectionThriftCodecFactory(), COMMON_CATALOG, ImmutableSet.of(DURATION_CODEC, DATE_TIME_CODEC));
    private static final ThriftCodec<OperatorInfoUnion> REFLECTION_READ_CODEC = REFLECTION_READ_CODEC_MANAGER.getCodec(OperatorInfoUnion.class);
    private static final ThriftCodec<OperatorInfoUnion> REFLECTION_WRITE_CODEC = REFLECTION_WRITE_CODEC_MANAGER.getCodec(OperatorInfoUnion.class);
    private static final TMemoryBuffer transport = new TMemoryBuffer(100 * 1024);
    private OperatorInfoUnion operatorInfoUnion;

    @BeforeMethod
    public void setUp()
    {
        operatorInfoUnion = new OperatorInfoUnion(getExchangeClientStatus());
    }

    @DataProvider
    public Object[][] codecCombinations()
    {
        return new Object[][] {
                {COMPILER_READ_CODEC, COMPILER_WRITE_CODEC},
                {COMPILER_READ_CODEC, REFLECTION_WRITE_CODEC},
                {REFLECTION_READ_CODEC, COMPILER_WRITE_CODEC},
                {REFLECTION_READ_CODEC, REFLECTION_WRITE_CODEC}
        };
    }

    @Test(dataProvider = "codecCombinations")
    public void testRoundTripSerializeBinaryProtocol(ThriftCodec<OperatorInfoUnion> readCodec, ThriftCodec<OperatorInfoUnion> writeCodec)
            throws Exception
    {
        OperatorInfoUnion operatorInfoUnion = getRoundTripSerialize(readCodec, writeCodec, TBinaryProtocol::new);
        assertSerde(operatorInfoUnion);
    }

    @Test(dataProvider = "codecCombinations")
    public void testRoundTripSerializeTCompactProtocol(ThriftCodec<OperatorInfoUnion> readCodec, ThriftCodec<OperatorInfoUnion> writeCodec)
            throws Exception
    {
        OperatorInfoUnion operatorInfoUnion = getRoundTripSerialize(readCodec, writeCodec, TCompactProtocol::new);
        assertSerde(operatorInfoUnion);
    }

    @Test(dataProvider = "codecCombinations")
    public void testRoundTripSerializeTFacebookCompactProtocol(ThriftCodec<OperatorInfoUnion> readCodec, ThriftCodec<OperatorInfoUnion> writeCodec)
            throws Exception
    {
        OperatorInfoUnion operatorInfoUnion = getRoundTripSerialize(readCodec, writeCodec, TFacebookCompactProtocol::new);
        assertSerde(operatorInfoUnion);
    }

    private OperatorInfoUnion getRoundTripSerialize(ThriftCodec<OperatorInfoUnion> readCodec, ThriftCodec<OperatorInfoUnion> writeCodec, Function<TTransport, TProtocol> protocolFactory)
            throws Exception
    {
        TProtocol protocol = protocolFactory.apply(transport);
        writeCodec.write(operatorInfoUnion, protocol);
        return readCodec.read(protocol);
    }

    private void assertSerde(OperatorInfoUnion operatorInfoUnion)
    {
        ExchangeClientStatus exchangeClientStatus = operatorInfoUnion.getExchangeClientStatus();
        assertNotNull(exchangeClientStatus);
        assertThat(exchangeClientStatus.getBufferedBytes()).isEqualTo(246L);
        assertThat(exchangeClientStatus.getMaxBufferedBytes()).isEqualTo(762L);
        assertThat(exchangeClientStatus.getAverageBytesPerRequest()).isEqualTo(4155);
        assertThat(exchangeClientStatus.getSuccessfulRequestsCount()).isEqualTo(5708);
        assertThat(exchangeClientStatus.getBufferedPages()).isEqualTo(316);
        assertThat(exchangeClientStatus.isNoMoreLocations()).isTrue();

        List<PageBufferClientStatus> pageBufferClientStatuses = exchangeClientStatus.getPageBufferClientStatuses();
        assertNotNull(pageBufferClientStatuses);
        assertThat(pageBufferClientStatuses).hasSize(1);

        PageBufferClientStatus pageBufferClientStatus = pageBufferClientStatuses.get(0);
        assertThat(pageBufferClientStatus.getUri()).isEqualTo(URI.create("http://fake"));
        assertThat(pageBufferClientStatus.getState()).isEqualTo("running");
        assertThat(pageBufferClientStatus.getLastUpdate()).isEqualTo(new DateTime(2022, 10, 28, 16, 7, 15, 0));
        assertThat(pageBufferClientStatus.getRowsReceived()).isEqualTo(7174L);
        assertThat(pageBufferClientStatus.getPagesReceived()).isEqualTo(612);
        assertThat(pageBufferClientStatus.getRowsRejected()).isEqualTo(OptionalLong.of(93L));
        assertThat(pageBufferClientStatus.getPagesRejected()).isEqualTo(OptionalInt.of(12));
        assertThat(pageBufferClientStatus.getRequestsScheduled()).isEqualTo(2);
        assertThat(pageBufferClientStatus.getRequestsCompleted()).isEqualTo(71);
        assertThat(pageBufferClientStatus.getRequestsFailed()).isEqualTo(3);
        assertThat(pageBufferClientStatus.getHttpRequestState()).isEqualTo("OK");
    }

    private ExchangeClientStatus getExchangeClientStatus()
    {
        return new ExchangeClientStatus(
                246L,
                762L,
                4155,
                5708,
                316,
                true,
                ImmutableList.of(new PageBufferClientStatus(
                        URI.create("http://fake"),
                        "running",
                        new DateTime(2022, 10, 28, 16, 7, 15, 0).getMillis(),
                        7174L,
                        612,
                        OptionalLong.of(93L),
                        OptionalInt.of(12),
                        2,
                        71,
                        3,
                        "OK")));
    }
}