TestFSCheckpointService.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.checkpoint;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.checkpoint.CheckpointService.CheckpointWriteChannel;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.mockito.*;
public class TestFSCheckpointService {
private final int BUFSIZE = 1024;
@Test
public void testCheckpointCreate() throws Exception {
checkpointCreate(ByteBuffer.allocate(BUFSIZE));
}
@Test
public void testCheckpointCreateDirect() throws Exception {
checkpointCreate(ByteBuffer.allocateDirect(BUFSIZE));
}
public void checkpointCreate(ByteBuffer b) throws Exception {
int WRITES = 128;
FileSystem fs = mock(FileSystem.class);
DataOutputBuffer dob = new DataOutputBuffer();
FSDataOutputStream hdfs = spy(new FSDataOutputStream(dob, null));
@SuppressWarnings("resource") // backed by array
DataOutputBuffer verif = new DataOutputBuffer();
when(fs.create(isA(Path.class), eq((short)1))).thenReturn(hdfs);
when(fs.rename(isA(Path.class), isA(Path.class))).thenReturn(true);
Path base = new Path("/chk");
Path finalLoc = new Path("/chk/checkpoint_chk0");
Path tmp = FSCheckpointService.tmpfile(finalLoc);
FSCheckpointService chk = new FSCheckpointService(fs, base,
new SimpleNamingService("chk0"), (short) 1);
CheckpointWriteChannel out = chk.create();
Random r = new Random();
final byte[] randBytes = new byte[BUFSIZE];
for (int i = 0; i < WRITES; ++i) {
r.nextBytes(randBytes);
int s = r.nextInt(BUFSIZE - 1);
int e = r.nextInt(BUFSIZE - s) + 1;
verif.write(randBytes, s, e);
b.clear();
b.put(randBytes).flip();
b.position(s).limit(b.position() + e);
out.write(b);
}
verify(fs, never()).rename(any(Path.class), eq(finalLoc));
CheckpointID cid = chk.commit(out);
verify(hdfs).close();
verify(fs).rename(eq(tmp), eq(finalLoc));
assertArrayEquals(Arrays.copyOfRange(verif.getData(), 0, verif.getLength()),
Arrays.copyOfRange(dob.getData(), 0, dob.getLength()));
}
@Test
public void testDelete() throws Exception {
FileSystem fs = mock(FileSystem.class);
Path chkloc = new Path("/chk/chk0");
when(fs.delete(eq(chkloc), eq(false))).thenReturn(true);
Path base = new Path("/otherchk");
FSCheckpointID id = new FSCheckpointID(chkloc);
FSCheckpointService chk = new FSCheckpointService(fs, base,
new SimpleNamingService("chk0"), (short) 1);
assertTrue(chk.delete(id));
verify(fs).delete(eq(chkloc), eq(false));
}
}