TestByteRangeInputStream.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.hadoop.thirdparty.com.google.common.net.HttpHeaders;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength;
import org.apache.hadoop.test.Whitebox;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class TestByteRangeInputStream {
private class ByteRangeInputStreamImpl extends ByteRangeInputStream {
public ByteRangeInputStreamImpl(URLOpener o, URLOpener r)
throws IOException {
super(o, r);
}
@Override
protected URL getResolvedUrl(HttpURLConnection connection)
throws IOException {
return new URL("http://resolvedurl/");
}
}
private ByteRangeInputStream.URLOpener getMockURLOpener(URL url)
throws IOException {
ByteRangeInputStream.URLOpener opener =
mock(ByteRangeInputStream.URLOpener.class, CALLS_REAL_METHODS);
opener.setURL(url);
doReturn(getMockConnection("65535"))
.when(opener).connect(anyLong(), anyBoolean());
return opener;
}
private HttpURLConnection getMockConnection(String length)
throws IOException {
HttpURLConnection mockConnection = mock(HttpURLConnection.class);
doReturn(new ByteArrayInputStream("asdf".getBytes()))
.when(mockConnection).getInputStream();
doReturn(length).when(mockConnection)
.getHeaderField(HttpHeaders.CONTENT_LENGTH);
return mockConnection;
}
@Test
public void testByteRange() throws IOException {
ByteRangeInputStream.URLOpener oMock = getMockURLOpener(
new URL("http://test"));
ByteRangeInputStream.URLOpener rMock = getMockURLOpener(null);
ByteRangeInputStream bris = new ByteRangeInputStreamImpl(oMock, rMock);
bris.seek(0);
assertEquals(0, bris.getPos(), "getPos wrong");
bris.read();
assertEquals(0, bris.startPos, "Initial call made incorrectly (offset check)");
assertEquals(1, bris.getPos(), "getPos should return 1 after reading one byte");
verify(oMock, times(1)).connect(0, false);
bris.read();
assertEquals(2, bris.getPos(), "getPos should return 2 after reading two bytes");
// No additional connections should have been made (no seek)
verify(oMock, times(1)).connect(0, false);
rMock.setURL(new URL("http://resolvedurl/"));
bris.seek(100);
bris.read();
assertEquals(100, bris.startPos,
"Seek to 100 bytes made incorrectly (offset Check)");
assertEquals(101, bris.getPos(),
"getPos should return 101 after reading one byte");
verify(rMock, times(1)).connect(100, true);
bris.seek(101);
bris.read();
// Seek to 101 should not result in another request
verify(rMock, times(1)).connect(100, true);
verify(rMock, times(0)).connect(101, true);
bris.seek(2500);
bris.read();
assertEquals(2500, bris.startPos, "Seek to 2500 bytes made incorrectly (offset Check)");
doReturn(getMockConnection(null))
.when(rMock).connect(anyLong(), anyBoolean());
bris.seek(500);
try {
bris.read();
fail("Exception should be thrown when content-length is not given");
} catch (IOException e) {
assertTrue(e.getMessage().startsWith(HttpHeaders.CONTENT_LENGTH + " is missing: "),
"Incorrect response message: " + e.getMessage());
}
bris.close();
}
@Test
public void testPropagatedClose() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
int brisOpens = 0;
int brisCloses = 0;
int isCloses = 0;
// first open, shouldn't close underlying stream
bris.getInputStream();
verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// stream is open, shouldn't close underlying stream
bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// seek forces a reopen, should close underlying stream
bris.seek(1);
bris.getInputStream();
verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream.in, times(++isCloses)).close();
// verify that the underlying stream isn't closed after a seek
// ie. the state was correctly updated
bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// seeking to same location should be a no-op
bris.seek(1);
bris.getInputStream();
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// close should of course close
bris.close();
verify(bris, times(++brisCloses)).close();
verify(mockStream.in, times(++isCloses)).close();
// it's already closed, underlying stream should not close
bris.close();
verify(bris, times(++brisCloses)).close();
verify(mockStream.in, times(isCloses)).close();
// it's closed, don't reopen it
boolean errored = false;
try {
bris.getInputStream();
} catch (IOException e) {
errored = true;
assertEquals("Stream closed", e.getMessage());
} finally {
assertTrue(errored, "Read a closed steam");
}
verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong());
verify(bris, times(brisCloses)).close();
verify(mockStream.in, times(isCloses)).close();
}
@Test
public void testAvailable() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(65535L,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
assertEquals(65535, bris.available(),
"Before read or seek, available should be same as filelength");
verify(bris, times(1)).openInputStream(Mockito.anyLong());
bris.seek(10);
assertEquals(65525, bris.available(), "Seek 10 bytes, available should return filelength - 10");
//no more bytes available
bris.seek(65535);
assertEquals(0, bris.available(), "Seek till end of file, available should return 0 bytes");
//test reads, seek back to 0 and start reading
bris.seek(0);
bris.read();
assertEquals(65534, bris.available(), "Read 1 byte, available must return filelength - 1");
bris.read();
assertEquals(65533, bris.available(),
"Read another 1 byte, available must return filelength - 2");
//seek and read
bris.seek(100);
bris.read();
assertEquals(65434, bris.available(),
"Seek to offset 100 and read 1 byte, available should return filelength - 101");
bris.close();
}
@Test
public void testAvailableLengthNotKnown() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
//Length is null for chunked transfer-encoding
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
assertEquals(Integer.MAX_VALUE, bris.available());
}
@Test
public void testAvailableStreamClosed() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
bris.close();
try{
bris.available();
fail("Exception should be thrown when stream is closed");
}catch(IOException e){
assertTrue(e.getMessage().equals("Stream closed"), "Exception when stream is closed");
}
}
}