TestDFSOutputStream.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.DataChecksum;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyBoolean;
import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
public class TestDFSOutputStream {
static MiniDFSCluster cluster;
@BeforeAll
public static void setup() throws IOException {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
}
/**
* The close() method of DFSOutputStream should never throw the same exception
* twice. See HDFS-5335 for details.
*/
@Test
public void testCloseTwice() throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
"wrappedStream");
DataStreamer streamer = (DataStreamer) Whitebox
.getInternalState(dos, "streamer");
@SuppressWarnings("unchecked")
LastExceptionInStreamer ex = (LastExceptionInStreamer) Whitebox
.getInternalState(streamer, "lastException");
Throwable thrown = (Throwable) Whitebox.getInternalState(ex, "thrown");
assertNull(thrown);
dos.close();
IOException dummy = new IOException("dummy");
ex.set(dummy);
try {
dos.close();
} catch (IOException e) {
assertEquals(e, dummy);
}
thrown = (Throwable) Whitebox.getInternalState(ex, "thrown");
assertNull(thrown);
dos.close();
}
/**
* The computePacketChunkSize() method of DFSOutputStream should set the actual
* packet size < 64kB. See HDFS-7308 for details.
*/
@Test
public void testComputePacketChunkSize() throws Exception {
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
"wrappedStream");
final int packetSize = 64*1024;
final int bytesPerChecksum = 512;
Method method = dos.getClass().getDeclaredMethod("computePacketChunkSize",
int.class, int.class);
method.setAccessible(true);
method.invoke(dos, packetSize, bytesPerChecksum);
Field field = dos.getClass().getDeclaredField("packetSize");
field.setAccessible(true);
assertTrue((Integer) field.get(dos) + 33 < packetSize);
// If PKT_MAX_HEADER_LEN is 257, actual packet size come to over 64KB
// without a fix on HDFS-7308.
assertTrue((Integer) field.get(dos) + 257 < packetSize);
}
/**
* This tests preventing overflows of package size and bodySize.
* <p>
* See also https://issues.apache.org/jira/browse/HDFS-11608.
* </p>
* @throws IOException
* @throws SecurityException
* @throws NoSuchFieldException
* @throws InvocationTargetException
* @throws IllegalArgumentException
* @throws IllegalAccessException
* @throws NoSuchMethodException
*/
@Test
@Timeout(value = 60)
public void testPreventOverflow() throws IOException, NoSuchFieldException,
SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException {
final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
int configuredWritePacketSize = defaultWritePacketSize;
int finalWritePacketSize = defaultWritePacketSize;
/* test default WritePacketSize, e.g. 64*1024 */
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
/* test large WritePacketSize, e.g. 1G */
configuredWritePacketSize = 1000 * 1024 * 1024;
finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
}
/**
* @configuredWritePacketSize the configured WritePacketSize.
* @finalWritePacketSize the final WritePacketSize picked by
* {@link DFSOutputStream#adjustChunkBoundary}
*/
private void runAdjustChunkBoundary(
final int configuredWritePacketSize,
final int finalWritePacketSize) throws IOException, NoSuchFieldException,
SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException {
final boolean appendChunk = false;
final long blockSize = 3221225500L;
final long bytesCurBlock = 1073741824L;
final int bytesPerChecksum = 512;
final int checksumSize = 4;
final int chunkSize = bytesPerChecksum + checksumSize;
final int packateMaxHeaderLength = 33;
MiniDFSCluster dfsCluster = null;
final File baseDir = new File(PathUtils.getTestDir(getClass()),
GenericTestUtils.getMethodName());
try {
final Configuration dfsConf = new Configuration();
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
baseDir.getAbsolutePath());
dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
configuredWritePacketSize);
dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
dfsCluster.waitActive();
final FSDataOutputStream os = dfsCluster.getFileSystem()
.create(new Path(baseDir.getPath(), "testPreventOverflow"));
final DFSOutputStream dos = (DFSOutputStream) Whitebox
.getInternalState(os, "wrappedStream");
/* set appendChunk */
final Method setAppendChunkMethod = dos.getClass()
.getDeclaredMethod("setAppendChunk", boolean.class);
setAppendChunkMethod.setAccessible(true);
setAppendChunkMethod.invoke(dos, appendChunk);
/* set bytesCurBlock */
final Method setBytesCurBlockMethod = dos.getClass()
.getDeclaredMethod("setBytesCurBlock", long.class);
setBytesCurBlockMethod.setAccessible(true);
setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
/* set blockSize */
final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
blockSizeField.setAccessible(true);
blockSizeField.setLong(dos, blockSize);
/* call adjustChunkBoundary */
final Method method = dos.getClass()
.getDeclaredMethod("adjustChunkBoundary");
method.setAccessible(true);
method.invoke(dos);
/* get and verify writePacketSize */
final Field writePacketSizeField = dos.getClass()
.getDeclaredField("writePacketSize");
writePacketSizeField.setAccessible(true);
assertEquals(writePacketSizeField.getInt(dos),
finalWritePacketSize);
/* get and verify chunksPerPacket */
final Field chunksPerPacketField = dos.getClass()
.getDeclaredField("chunksPerPacket");
chunksPerPacketField.setAccessible(true);
assertEquals(chunksPerPacketField.getInt(dos),
(finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
/* get and verify packetSize */
final Field packetSizeField = dos.getClass()
.getDeclaredField("packetSize");
packetSizeField.setAccessible(true);
assertEquals(packetSizeField.getInt(dos),
chunksPerPacketField.getInt(dos) * chunkSize);
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
}
@Test
public void testCongestionBackoff() throws IOException {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
Configuration conf = mock(Configuration.class);
when(client.getConfiguration()).thenReturn(conf);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
DataStreamer stream = new DataStreamer(
mock(HdfsFileStatus.class),
mock(ExtendedBlock.class),
client,
"foo", null, null, null, null, null, null);
DataOutputStream blockStream = mock(DataOutputStream.class);
doThrow(new IOException()).when(blockStream).flush();
Whitebox.setInternalState(stream, "blockStream", blockStream);
Whitebox.setInternalState(stream, "stage",
BlockConstructionStage.PIPELINE_CLOSE);
@SuppressWarnings("unchecked")
LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
Whitebox.getInternalState(stream, "dataQueue");
@SuppressWarnings("unchecked")
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
Whitebox.getInternalState(stream, "congestedNodes");
congestedNodes.add(mock(DatanodeInfo.class));
DFSPacket packet = mock(DFSPacket.class);
dataQueue.add(packet);
stream.run();
assertTrue(congestedNodes.isEmpty());
}
@Test
@Timeout(value = 60)
public void testCongestionAckDelay() {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
Configuration conf = mock(Configuration.class);
when(client.getConfiguration()).thenReturn(conf);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
DataStreamer stream = new DataStreamer(
mock(HdfsFileStatus.class),
mock(ExtendedBlock.class),
client,
"foo", null, null, null, null, null, null);
DataOutputStream blockStream = mock(DataOutputStream.class);
Whitebox.setInternalState(stream, "blockStream", blockStream);
Whitebox.setInternalState(stream, "stage",
BlockConstructionStage.PIPELINE_CLOSE);
@SuppressWarnings("unchecked")
LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
Whitebox.getInternalState(stream, "dataQueue");
@SuppressWarnings("unchecked")
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
Whitebox.getInternalState(stream, "congestedNodes");
int backOffMaxTime = (int)
Whitebox.getInternalState(stream, "congestionBackOffMaxTimeInMs");
DFSPacket[] packet = new DFSPacket[100];
AtomicBoolean isDelay = new AtomicBoolean(true);
// ResponseProcessor needs the dataQueue for the next step.
new Thread(() -> {
for (int i = 0; i < 10; i++) {
// In order to ensure that other threads run for a period of time to prevent affecting
// the results.
try {
Thread.sleep(backOffMaxTime / 50);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (dataQueue) {
congestedNodes.add(mock(DatanodeInfo.class));
// The DataStreamer releases the dataQueue before sleeping, and the ResponseProcessor
// has time to hold the dataQueue to continuously accept ACKs and add congestedNodes
// to the list. Therefore, congestedNodes.size() is greater than 1.
if (congestedNodes.size() > 1){
isDelay.set(false);
try {
doThrow(new IOException()).when(blockStream).flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
try {
doThrow(new IOException()).when(blockStream).flush();
} catch (Exception e) {
e.printStackTrace();
}
// Prevent the DataStreamer from always waiting because the
// dataQueue may be empty, so that the unit test cannot exit.
DFSPacket endPacket = mock(DFSPacket.class);
dataQueue.add(endPacket);
}).start();
// The purpose of adding packets to the dataQueue is to make the DataStreamer run
// normally and judge whether to enter the sleep state according to the congestion.
new Thread(() -> {
for (int i = 0; i < 100; i++) {
packet[i] = mock(DFSPacket.class);
dataQueue.add(packet[i]);
try {
Thread.sleep(backOffMaxTime / 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
stream.run();
assertFalse(isDelay.get());
}
@Test
public void testNoLocalWriteFlag() throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.NO_LOCAL_WRITE,
CreateFlag.CREATE);
BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
DatanodeManager dm = bm.getDatanodeManager();
try(FSDataOutputStream os = fs.create(new Path("/test-no-local"),
FsPermission.getDefault(),
flags, 512, (short)2, 512, null)) {
// Inject a DatanodeManager that returns one DataNode as local node for
// the client.
DatanodeManager spyDm = spy(dm);
DatanodeDescriptor dn1 = dm.getDatanodeListForReport
(HdfsConstants.DatanodeReportType.LIVE).get(0);
doReturn(dn1).when(spyDm).getDatanodeByHost("127.0.0.1");
Whitebox.setInternalState(bm, "datanodeManager", spyDm);
byte[] buf = new byte[512 * 16];
new Random().nextBytes(buf);
os.write(buf);
} finally {
Whitebox.setInternalState(bm, "datanodeManager", dm);
}
cluster.triggerBlockReports();
final String bpid = cluster.getNamesystem().getBlockPoolId();
// Total number of DataNodes is 3.
assertEquals(3, cluster.getAllBlockReports(bpid).size());
int numDataNodesWithData = 0;
for (Map<DatanodeStorage, BlockListAsLongs> dnBlocks :
cluster.getAllBlockReports(bpid)) {
for (BlockListAsLongs blocks : dnBlocks.values()) {
if (blocks.getNumberOfBlocks() > 0) {
numDataNodesWithData++;
break;
}
}
}
// Verify that only one DN has no data.
assertEquals(1, 3 - numDataNodesWithData);
}
@Test
public void testEndLeaseCall() throws Exception {
Configuration conf = new Configuration();
DFSClient client = new DFSClient(cluster.getNameNode(0)
.getNameNodeAddress(), conf);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream = spyClient.create("/file2",
FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null , 1024, null);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
spyDFSOutputStream.closeThreads(anyBoolean());
verify(spyClient, times(1)).endFileLease(anyString());
}
@Test
public void testStreamFlush() throws Exception {
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/normal-file"));
// Verify output stream supports hsync() and hflush().
assertTrue(os.hasCapability(StreamCapability.HFLUSH.getValue()),
"DFSOutputStream should support hflush()!");
assertTrue(os.hasCapability(StreamCapability.HSYNC.getValue()),
"DFSOutputStream should support hsync()!");
byte[] bytes = new byte[1024];
InputStream is = new ByteArrayInputStream(bytes);
IOUtils.copyBytes(is, os, bytes.length);
os.hflush();
IOUtils.copyBytes(is, os, bytes.length);
os.hsync();
os.close();
}
@Test
public void testExceptionInCloseWithRecoverLease() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true);
DFSClient client =
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream = spyClient.create(
"/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null, 1024, null);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
doThrow(new IOException("Emulated IOException in close"))
.when(spyDFSOutputStream).completeFile();
try {
spyDFSOutputStream.close();
fail();
} catch (IOException ioe) {
assertTrue(spyDFSOutputStream.isLeaseRecovered());
waitForFileClosed("/testExceptionInCloseWithRecoverLease");
assertTrue(isFileClosed("/testExceptionInCloseWithRecoverLease"));
}
}
@Test
public void testExceptionInCloseWithoutRecoverLease() throws Exception {
Configuration conf = new Configuration();
DFSClient client =
new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
DFSClient spyClient = Mockito.spy(client);
DFSOutputStream dfsOutputStream =
spyClient.create("/testExceptionInCloseWithoutRecoverLease",
FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
(short) 3, 1024, null, 1024, null);
DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
doThrow(new IOException("Emulated IOException in close"))
.when(spyDFSOutputStream).completeFile();
try {
spyDFSOutputStream.close();
fail();
} catch (IOException ioe) {
assertFalse(spyDFSOutputStream.isLeaseRecovered());
try {
waitForFileClosed("/testExceptionInCloseWithoutRecoverLease");
} catch (TimeoutException e) {
assertFalse(isFileClosed("/testExceptionInCloseWithoutRecoverLease"));
}
}
}
@Test
@Timeout(value = 60)
public void testFirstPacketSizeInNewBlocks() throws IOException {
final long blockSize = (long) 1024 * 1024;
MiniDFSCluster dfsCluster = cluster;
DistributedFileSystem fs = dfsCluster.getFileSystem();
Configuration dfsConf = fs.getConf();
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE);
try(FSDataOutputStream fos = fs.create(new Path("/testfile.dat"),
FsPermission.getDefault(),
flags, 512, (short)3, blockSize, null)) {
DataChecksum crc32c = DataChecksum.newDataChecksum(
DataChecksum.Type.CRC32C, 512);
long loop = 0;
Random r = new Random();
byte[] buf = new byte[(int) blockSize];
r.nextBytes(buf);
fos.write(buf);
fos.hflush();
int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize();
int packetContentSize = (dfsConf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT) -
PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize;
while (loop < 20) {
r.nextBytes(buf);
fos.write(buf);
fos.hflush();
loop++;
assertEquals(((DFSOutputStream) fos.getWrappedStream()).packetSize,
packetContentSize);
}
}
fs.delete(new Path("/testfile.dat"), true);
}
@AfterAll
public static void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
private boolean isFileClosed(String path) throws IOException {
return cluster.getFileSystem().isFileClosed(new Path(path));
}
private void waitForFileClosed(String path) throws Exception {
GenericTestUtils.waitFor(() -> {
boolean closed;
try {
closed = isFileClosed(path);
} catch (IOException e) {
return false;
}
return closed;
}, 1000, 5000);
}
}