TestAbfsOutputStream.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.anyLong;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
public final class TestAbfsOutputStream {
private static final int BUFFER_SIZE = 4096;
private static final int WRITE_SIZE = 1000;
private static final String PATH = "~/testpath";
private final String globalKey = "fs.azure.configuration";
private final String accountName1 = "account1";
private final String accountKey1 = globalKey + "." + accountName1;
private final String accountValue1 = "one";
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
int writeBufferSize,
boolean isFlushEnabled,
boolean disableOutputStreamFlush,
boolean isAppendBlob,
boolean isExpectHeaderEnabled,
AbfsClientHandler clientHandler,
String path,
TracingContext tracingContext,
ExecutorService executorService) throws IOException,
IllegalAccessException {
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
accountName1);
String blockFactoryName =
abfsConf.getRawConfiguration().getTrimmed(DATA_BLOCKS_BUFFER,
DATA_BLOCKS_BUFFER_DEFAULT);
DataBlocks.BlockFactory blockFactory =
DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR,
abfsConf.getRawConfiguration(),
blockFactoryName);
return new AbfsOutputStreamContext(2)
.withWriteBufferSize(writeBufferSize)
.enableExpectHeader(isExpectHeaderEnabled)
.enableFlush(isFlushEnabled)
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
.withClientHandler(clientHandler)
.withPath(path)
.withTracingContext(tracingContext)
.withExecutorService(executorService)
.withBlockFactory(blockFactory)
.build();
}
/**
* The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server
*/
@Test
public void verifyShortWriteRequest() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(),
any(), any(TracingContext.class)))
.thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
true,
clientHandler,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf))));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[WRITE_SIZE];
new Random().nextBytes(b);
out.write(b);
out.hsync();
final byte[] b1 = new byte[2*WRITE_SIZE];
new Random().nextBytes(b1);
out.write(b1);
out.flush();
out.hflush();
out.hsync();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, WRITE_SIZE, APPEND_MODE, false, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(),
any(TracingContext.class));
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(), any(TracingContext.class));
}
/**
* The test verifies OutputStream Write of WRITE_SIZE(1000 bytes) followed by a close is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequest() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
TracingContext tracingContext = new TracingContext("test-corr-id",
"test-fs-id", FSOperationType.WRITE,
TracingHeaderFormat.ALL_ID_FORMAT, null);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
true,
clientHandler,
PATH,
tracingContext,
createExecutorService(abfsConf))));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[WRITE_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 5; i++) {
out.write(b);
}
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null, true, null);
verify(client, times(1)).append(eq(PATH), any(byte[].class),
refEq(firstReqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(eq(PATH), any(byte[].class),
refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(eq(PATH), any(byte[].class), any(), any(),
any(), any(TracingContext.class));
ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<TracingContext> acTracingContext = ArgumentCaptor
.forClass(TracingContext.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> acMd5 = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture(), isNull(), isNull(),
acTracingContext.capture(), acMd5.capture());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a close is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
TracingContext tracingContext = new TracingContext(
abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
true,
clientHandler,
PATH,
tracingContext,
createExecutorService(abfsConf)))));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(), any(TracingContext.class));
ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<TracingContext> acTracingContext = ArgumentCaptor
.forClass(TracingContext.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> acMd5 = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture(), isNull(), isNull(),
acTracingContext.capture(), acMd5.capture());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSize() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(), any(TracingContext.class)))
.thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(),
any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
true,
clientHandler,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf))));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(), any(TracingContext.class));
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) on a AppendBlob based stream is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(), any(TracingContext.class)))
.thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
true,
true,
clientHandler,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf))));
AzureIngressHandler ingressHandler = Mockito.spy(out.getIngressHandler());
Mockito.doReturn(ingressHandler).when(out).getIngressHandler();
Mockito.doReturn(out).when(ingressHandler).getAbfsOutputStream();
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, true, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(), any(TracingContext.class));
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a hflush call is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
when(op.getSasToken()).thenReturn("");
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
TracingContext tracingContext = new TracingContext(
abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(), any(TracingContext.class)))
.thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
true,
clientHandler,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf))));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
out.hflush();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(), any(TracingContext.class));
ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<TracingContext> acTracingContext = ArgumentCaptor
.forClass(TracingContext.class);
ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<String> acFlushMd5 = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
acFlushSASToken.capture(), isNull(), isNull(), acTracingContext.capture(), acFlushMd5.capture());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
}
/**
* The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a flush call is making correct HTTP calls to the server
*/
@Test
public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsConfiguration abfsConf;
final Configuration conf = new Configuration();
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(), any(TracingContext.class)))
.thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
true,
clientHandler,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf))));
when(out.getClient()).thenReturn(client);
when(out.getMd5()).thenReturn(null);
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
for (int i = 0; i < 2; i++) {
out.write(b);
}
Thread.sleep(1000);
out.flush();
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class));
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), any(), any(TracingContext.class));
// confirm there were only 2 invocations in all
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(), any(TracingContext.class));
}
/**
* Method to create an executor Service for AbfsOutputStream.
* @param abfsConf Configuration.
* @return ExecutorService.
*/
private ExecutorService createExecutorService(
AbfsConfiguration abfsConf) {
ExecutorService executorService =
new SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
abfsConf.getWriteMaxConcurrentRequestCount(),
abfsConf.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-test-bounded"),
BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT, true);
return executorService;
}
}