DoExchangeEchoScenario.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight.integration.tests;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Validator;
/** Test DoExchange by echoing data back to the client. */
final class DoExchangeEchoScenario implements Scenario {
public static final byte[] COMMAND = "echo".getBytes(StandardCharsets.UTF_8);
@Override
public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception {
return new DoExchangeProducer(allocator);
}
@Override
public void buildServer(FlightServer.Builder builder) {}
@Override
public void client(BufferAllocator allocator, Location location, FlightClient client)
throws Exception {
final Schema schema =
new Schema(Collections.singletonList(Field.notNullable("x", new ArrowType.Int(32, true))));
try (final FlightClient.ExchangeReaderWriter stream =
client.doExchange(FlightDescriptor.command(COMMAND));
final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
final FlightStream reader = stream.getReader();
// Write data and check that it gets echoed back.
IntVector iv = (IntVector) root.getVector("x");
iv.allocateNew();
stream.getWriter().start(root);
int rowCount = 10;
for (int batchIdx = 0; batchIdx < 4; batchIdx++) {
for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) {
iv.setSafe(rowIdx, batchIdx + rowIdx);
}
root.setRowCount(rowCount);
boolean writeMetadata = batchIdx % 2 == 0;
final byte[] rawMetadata = Integer.toString(batchIdx).getBytes(StandardCharsets.UTF_8);
if (writeMetadata) {
final ArrowBuf metadata = allocator.buffer(rawMetadata.length);
metadata.writeBytes(rawMetadata);
stream.getWriter().putNext(metadata);
} else {
stream.getWriter().putNext();
}
IntegrationAssertions.assertTrue("Unexpected end of reader", reader.next());
if (writeMetadata) {
IntegrationAssertions.assertNotNull(reader.getLatestMetadata());
final byte[] readMetadata = new byte[rawMetadata.length];
reader.getLatestMetadata().readBytes(readMetadata);
IntegrationAssertions.assertEquals(rawMetadata, readMetadata);
} else {
IntegrationAssertions.assertNull(reader.getLatestMetadata());
}
IntegrationAssertions.assertEquals(root.getSchema(), reader.getSchema());
Validator.compareVectorSchemaRoot(reader.getRoot(), root);
}
stream.getWriter().completed();
IntegrationAssertions.assertFalse("Expected to reach end of reader", reader.next());
}
}
}