ByteBufferInputStreamTest.java
/*
* Copyright (c) 2013, 2022 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.internal.util.collection;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.glassfish.jersey.internal.LocalizationMessages;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* {@link ByteBufferInputStream} unit tests.
*
* @author Marek Potociar
*/
public class ByteBufferInputStreamTest {
@Test
public void testBlockingReadAByteEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
assertEquals(-1, bbis.read());
}
@Test
public void testNonBlockingReadAByteEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
assertEquals(-1, bbis.tryRead());
}
@Test
public void testBlockingReadByteArrayEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(-1, bbis.read(buf));
}
@Test
public void testNonBlockingReadByteArrayEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(-1, bbis.tryRead(buf));
}
@Test
public void testBlockingReadByteArrayFromFinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(1024, bbis.read(buf));
// no more data to read; so it should return -1
assertEquals(-1, bbis.read(buf));
}
@Test
public void testNonBlockingReadByteArrayFromFinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
byte[] buf = new byte[1024];
assertEquals(1024, bbis.tryRead(buf));
// the queue has not been close; so it should return 0
assertEquals(0, bbis.tryRead(buf));
bbis.closeQueue();
}
@Test
public void testBlockingReadByteArrayFromUnfinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
final byte[] buf = new byte[1024];
assertEquals(1024, bbis.read(buf));
final AtomicBoolean closed = new AtomicBoolean(false);
final Semaphore s = new Semaphore(1);
s.acquire();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
// it should return -1 since there is no more data
assertEquals(-1, bbis.read(buf));
// it should only reach here if the stream has been closed
assertTrue(closed.get());
} catch (IOException e) {
e.printStackTrace();
} finally {
s.release();
}
}
});
t.start();
Thread.sleep(500);
closed.set(true);
bbis.closeQueue();
// wait until the job is done
s.acquire();
}
@Test
public void testNonBlockingReadByteArrayFromUnfinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(1024, bbis.tryRead(buf));
assertEquals(-1, bbis.tryRead(buf));
}
/**
* Test for non blocking single-byte read of the stream.
*
* @throws Exception in case of error.
*/
@Test
public void testNonBlockingReadSingleByte() throws Exception {
final int ROUNDS = 1000;
final int BUFFER_SIZE = 769;
final ByteBufferInputStream bbis = new ByteBufferInputStream();
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < ROUNDS; i++) {
final ByteBuffer data = ByteBuffer.allocate(BUFFER_SIZE);
if (Thread.currentThread().isInterrupted()) {
System.out.println("Got interrupted.");
return;
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
System.out.println("Pipe sink closed before writing all the data.");
return;
}
Thread.sleep(1); // Give the other thread a chance to run.
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
bbis.closeQueue();
}
}
});
try {
int i = 0;
int j = 0;
int c;
while ((c = bbis.tryRead()) != -1) {
if (c == Integer.MIN_VALUE) {
// nothing to read
Thread.yield(); // Give the other thread a chance to run.
continue;
}
assertEquals((byte) (i & 0xFF), (byte) (c & 0xFF), "At position: " + j);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
}
}
assertEquals(ROUNDS * BUFFER_SIZE, j, "Number of bytes produced and bytes read does not match.");
} finally {
executor.shutdownNow();
bbis.close();
}
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Waiting for the task to finish has timed out.");
}
}
/**
* Test for non blocking byte buffer based read of the stream.
*
* @throws Exception in case of error.
*/
@Test
public void testNonBlockingReadByteArray() throws Exception {
final int ROUNDS = 1000;
final int BUFFER_SIZE = 769;
final ByteBufferInputStream bbis = new ByteBufferInputStream();
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < ROUNDS; i++) {
final ByteBuffer data = ByteBuffer.allocate(BUFFER_SIZE);
if (Thread.currentThread().isInterrupted()) {
System.out.println("Got interrupted.");
return;
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
System.out.println("Pipe sink closed before writing all the data.");
return;
}
Thread.sleep(1); // Give the other thread a chance to run.
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
bbis.closeQueue();
}
}
});
try {
int i = 0;
int j = 0;
int c;
byte[] buffer = new byte[443];
while ((c = bbis.tryRead(buffer)) != -1) {
if (c == 0) {
// nothing to read
Thread.yield(); // Give the other thread a chance to run.
continue;
}
for (int p = 0; p < c; p++) {
assertEquals((byte) (i & 0xFF), (byte) buffer[p], "At position: " + j);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
}
}
}
assertEquals(ROUNDS * BUFFER_SIZE, j, "Number of bytes produced and bytes read does not match.");
} finally {
executor.shutdownNow();
bbis.close();
}
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Waiting for the task to finish has timed out.");
}
}
/**
* Test for blocking single-byte read of the stream.
*
* @throws Exception in case of error.
*/
@Test
public void testBlockingReadSingleByte() throws Exception {
final int ROUNDS = 1000;
final int BUFFER_SIZE = 769;
final ByteBufferInputStream bbis = new ByteBufferInputStream();
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < ROUNDS; i++) {
final ByteBuffer data = ByteBuffer.allocate(BUFFER_SIZE);
if (Thread.currentThread().isInterrupted()) {
System.out.println("Got interrupted.");
return;
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
System.out.println("Pipe sink closed before writing all the data.");
return;
}
Thread.sleep(1); // Give the other thread a chance to run.
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
bbis.closeQueue();
}
}
});
try {
int i = 0;
int j = 0;
int c;
while ((c = bbis.read()) != -1) {
assertNotEquals(Integer.MIN_VALUE, c, "Should not read 'nothing' in blocking mode.");
assertEquals((byte) (i & 0xFF), (byte) c, "At position: " + j);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
}
}
assertEquals(ROUNDS * BUFFER_SIZE, j, "Number of bytes produced and bytes read does not match.");
} finally {
executor.shutdownNow();
bbis.close();
}
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Waiting for the task to finish has timed out.");
}
}
/**
* Test for blocking byte buffer based read of the stream.
*
* @throws Exception in case of error.
*/
@Test
public void testBlockingReadByteArray() throws Exception {
final int ROUNDS = 1000;
final int BUFFER_SIZE = 769;
final ByteBufferInputStream bbis = new ByteBufferInputStream();
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < ROUNDS; i++) {
final ByteBuffer data = ByteBuffer.allocate(BUFFER_SIZE);
if (Thread.currentThread().isInterrupted()) {
System.out.println("Got interrupted.");
return;
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
System.out.println("Pipe sink closed before writing all the data.");
return;
}
Thread.sleep(1); // Give the other thread a chance to run.
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
bbis.closeQueue();
}
}
});
try {
int i = 0;
int j = 0;
int c;
byte[] buffer = new byte[443];
while ((c = bbis.read(buffer)) != -1) {
assertNotEquals(0, c, "Should not read 0 bytes in blocking mode.");
for (int p = 0; p < c; p++) {
assertEquals((byte) (i & 0xFF), buffer[p], "At position: " + j);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
}
}
}
assertEquals(ROUNDS * BUFFER_SIZE, j, "Number of bytes produced and bytes read does not match.");
} finally {
executor.shutdownNow();
bbis.close();
}
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Waiting for the task to finish has timed out.");
}
}
/**
* Test for available() method.
*
* @throws Exception in case of error.
*/
@Test
public void testAvailable() throws Exception {
final int BUFFER_SIZE = 769;
final ByteBufferInputStream bbis = new ByteBufferInputStream();
ByteBuffer data = ByteBuffer.allocate(BUFFER_SIZE);
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) 'A');
}
data.flip();
bbis.put(data);
assertEquals(BUFFER_SIZE, bbis.available(), "Available bytes");
data = ByteBuffer.allocate(BUFFER_SIZE);
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) 'B');
}
data.flip();
bbis.put(data);
assertEquals(2 * BUFFER_SIZE, bbis.available(), "Available bytes");
int c = bbis.read();
assertEquals('A', c, "Byte read");
assertEquals(2 * BUFFER_SIZE - 1, bbis.available(), "Available bytes");
byte[] buff = new byte[199];
int l = bbis.read(buff);
assertEquals(buff.length, l, "Number of bytes read");
assertEquals(2 * BUFFER_SIZE - 200, bbis.available(), "Available bytes");
buff = new byte[1000];
l = bbis.read(buff);
assertEquals(buff.length, l, "Number of bytes read");
assertEquals(2 * BUFFER_SIZE - 1200, bbis.available(), "Available bytes");
bbis.closeQueue();
l = bbis.read(buff);
assertEquals(2 * BUFFER_SIZE - 1200, l, "Number of bytes read");
assertEquals(0, bbis.available(), "Available bytes");
bbis.close();
}
/**
* Test for available() method.
*
* @throws Exception in case of error.
*/
@Test
public void testCloseWithThrowable() throws Exception {
final int BUFFER_SIZE = 769;
ByteBuffer data = ByteBuffer.allocate(BUFFER_SIZE);
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) 'A');
}
data.flip();
// first invocation of available should fail with exception, but not subsequently due to closed stream
testAction(new Task(createClosedExceptionStream(data, "FAILED")) {
@Override
public void run() throws IOException {
bbis().available();
}
}, "FAILED", false);
// first invocation of read should fail with exception, and subsequently due to closed stream
testAction(new Task(createClosedExceptionStream(data, "FAILED")) {
@Override
public void run() throws IOException {
bbis().read();
}
}, "FAILED", true);
// first invocation of read should fail with exception, and subsequently due to closed stream
testAction(new Task(createClosedExceptionStream(data, "FAILED")) {
@Override
public void run() throws IOException {
bbis().read(new byte[10]);
}
}, "FAILED", true);
// first invocation of tryRead should fail with exception, and subsequently due to closed stream
testAction(new Task(createClosedExceptionStream(data, "FAILED")) {
@Override
public void run() throws IOException {
bbis().tryRead();
}
}, "FAILED", true);
// first invocation of tryRead should fail with exception, and subsequently due to closed stream
testAction(new Task(createClosedExceptionStream(data, "FAILED")) {
@Override
public void run() throws IOException {
bbis().tryRead(new byte[10]);
}
}, "FAILED", true);
// first invocation of close should fail with exception, but not subsequently due closed stream
testAction(new Task(createClosedExceptionStream(data, "FAILED")) {
@Override
public void run() throws IOException {
bbis().close();
}
}, "FAILED", false);
}
private ByteBufferInputStream createClosedExceptionStream(ByteBuffer data, String exMsg) throws InterruptedException {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.put(data);
bbis.closeQueue(new Exception(exMsg));
return bbis;
}
private void testAction(Task task, String exMsg, boolean retryFailOnClosed) throws IOException {
try {
task.run();
fail("IOException expected.");
} catch (IOException ex) {
assertNotNull(ex.getCause(), "Custom exception cause");
assertEquals(exMsg, ex.getCause().getMessage(), "Custom exception cause message");
}
if (retryFailOnClosed) {
try {
task.run();
fail("IOException expected.");
} catch (IOException ex) {
assertEquals(LocalizationMessages.INPUT_STREAM_CLOSED(), ex.getMessage(), "Closed IOException message");
assertNull(ex.getCause(), "Closed IOException cause");
}
} else {
task.run();
}
}
private abstract static class Task {
private final ByteBufferInputStream bbis;
protected Task(ByteBufferInputStream bbis) {
this.bbis = bbis;
}
protected final ByteBufferInputStream bbis() {
return bbis;
}
public abstract void run() throws IOException;
}
}