TestReadRowsHelper.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.plugin.bigquery;

import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.testng.annotations.Test;

import java.util.Iterator;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@Test
public class TestReadRowsHelper
{
    // it is not used, we just need the reference
    BigQueryStorageClient client = mock(BigQueryStorageClient.class);
    private Storage.ReadRowsRequest.Builder request = Storage.ReadRowsRequest.newBuilder().setReadPosition(
            Storage.StreamPosition.newBuilder().setStream(
                    Storage.Stream.newBuilder().setName("test")));

    @Test
    void testNoFailures()
    {
        MockResponsesBatch batch1 = new MockResponsesBatch();
        batch1.addResponse(Storage.ReadRowsResponse.newBuilder().setRowCount(10).build());
        batch1.addResponse(Storage.ReadRowsResponse.newBuilder().setRowCount(11).build());

        // so we can run multiple tests
        ImmutableList<Storage.ReadRowsResponse> responses = ImmutableList.copyOf(
                new MockReadRowsHelper(client, request, 3, ImmutableList.of(batch1))
                        .readRows());

        assertThat(responses.size()).isEqualTo(2);
        assertThat(responses.stream().mapToLong(Storage.ReadRowsResponse::getRowCount).sum()).isEqualTo(21);
    }

    @Test
    void testRetryOfSingleFailure()
    {
        MockResponsesBatch batch1 = new MockResponsesBatch();
        batch1.addResponse(Storage.ReadRowsResponse.newBuilder().setRowCount(10).build());
        batch1.addException(new StatusRuntimeException(Status.INTERNAL.withDescription(
                "Received unexpected EOS on DATA frame from server.")));
        MockResponsesBatch batch2 = new MockResponsesBatch();
        batch2.addResponse(Storage.ReadRowsResponse.newBuilder().setRowCount(11).build());

        ImmutableList<Storage.ReadRowsResponse> responses = ImmutableList.copyOf(
                new MockReadRowsHelper(client, request, 3, ImmutableList.of(batch1, batch2))
                        .readRows());

        assertThat(responses.size()).isEqualTo(2);
        assertThat(responses.stream().mapToLong(Storage.ReadRowsResponse::getRowCount).sum()).isEqualTo(21);
    }

    private static final class MockReadRowsHelper
            extends ReadRowsHelper
    {
        Iterator<MockResponsesBatch> responses;

        MockReadRowsHelper(BigQueryStorageClient client, Storage.ReadRowsRequest.Builder request, int maxReadRowsRetries, Iterable<MockResponsesBatch> responses)
        {
            super(client, request, maxReadRowsRetries);
            this.responses = responses.iterator();
        }

        @Override
        protected Iterator<Storage.ReadRowsResponse> fetchResponses(Storage.ReadRowsRequest.Builder readRowsRequest)
        {
            return responses.next();
        }
    }
}