FileSystemWatcherTestCase.java
/*
* JBoss, Home of Professional Open Source.
*
* Copyright 2013 Red Hat, Inc. and/or its affiliates, and individual
* contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xnio.nio.test;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.xnio.FileChangeCallback;
import org.xnio.FileChangeEvent;
import org.xnio.FileSystemWatcher;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.Xnio;
import org.xnio.channels.AcceptingChannel;
import org.xnio.channels.ConnectedStreamChannel;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static org.xnio.FileChangeEvent.Type.MODIFIED;
import static org.xnio.FileChangeEvent.Type.REMOVED;
/**
* Test file system watcher, non poll based
*
* @author Stuart Douglas
*/
public class FileSystemWatcherTestCase {
public static final String DIR_NAME = "/fileSystemWatcherTest";
public static final String EXISTING_FILE_NAME = "a.txt";
public static final String EXISTING_DIR = "existingDir";
private static final int WAIT_SECONDS = 20;
private static final int NUM_THREADS = 5;
private static final int NUM_FILES = 6;
private final BlockingDeque<Collection<FileChangeEvent>> results = new LinkedBlockingDeque<>();
private final BlockingDeque<Collection<FileChangeEvent>> secondResults = new LinkedBlockingDeque<>();
File rootDir;
File existingSubDir;
protected AcceptingChannel<? extends ConnectedStreamChannel> server;
private Xnio createXnio() {
return Xnio.getInstance("nio", FileSystemWatcherTestCase.class.getClassLoader());
}
@Before
public void setup() throws Exception {
rootDir = new File(System.getProperty("java.io.tmpdir") + DIR_NAME);
deleteRecursive(rootDir);
rootDir.mkdirs();
File existing = new File(rootDir, EXISTING_FILE_NAME);
touchFile(existing);
existingSubDir = new File(rootDir, EXISTING_DIR);
existingSubDir.mkdir();
existing = new File(existingSubDir, EXISTING_FILE_NAME);
touchFile(existing);
}
private static void touchFile(File existing) throws IOException {
FileOutputStream out = new FileOutputStream(existing);
try {
out.write(("data" + System.currentTimeMillis()).getBytes());
out.flush();
} finally {
IoUtils.safeClose(out);
}
}
@After
public void after() {
deleteRecursive(rootDir);
}
@Test
public void testFileSystemWatcher() throws Exception {
try (FileSystemWatcher watcher = createXnio().createFileSystemWatcher("testWatcher", OptionMap.create(Options.WATCHER_POLL_INTERVAL, 10))) {
watcher.watchPath(rootDir, new FileChangeCallback() {
@Override
public void handleChanges(Collection<FileChangeEvent> changes) {
results.add(changes);
}
});
watcher.watchPath(rootDir, new FileChangeCallback() {
@Override
public void handleChanges(Collection<FileChangeEvent> changes) {
secondResults.add(changes);
}
});
//first add a file
File added = new File(rootDir, "newlyAddedFile.txt").getAbsoluteFile();
touchFile(added);
checkResult(added, FileChangeEvent.Type.ADDED);
added.setLastModified(500);
checkResult(added, MODIFIED);
added.delete();
Thread.sleep(1);
checkResult(added, REMOVED);
added = new File(existingSubDir, "newSubDirFile.txt");
touchFile(added);
checkResult(added, FileChangeEvent.Type.ADDED);
added.setLastModified(500);
checkResult(added, MODIFIED);
added.delete();
Thread.sleep(1);
checkResult(added, REMOVED);
File existing = new File(rootDir, EXISTING_FILE_NAME);
existing.delete();
Thread.sleep(1);
checkResult(existing, REMOVED);
File newDir = new File(rootDir, "newlyCreatedDirectory");
newDir.mkdir();
checkResult(newDir, FileChangeEvent.Type.ADDED);
added = new File(newDir, "newlyAddedFileInNewlyAddedDirectory.txt").getAbsoluteFile();
touchFile(added);
checkResult(added, FileChangeEvent.Type.ADDED);
added.setLastModified(500);
checkResult(added, MODIFIED);
added.delete();
Thread.sleep(1);
checkResult(added, REMOVED);
}
results.clear();
secondResults.clear();
}
@Test
public void testMultiThread() throws Exception {
try (FileSystemWatcher watcher = createXnio().createFileSystemWatcher(
"testWatcher", OptionMap.create(Options.WATCHER_POLL_INTERVAL, 10))) {
watcher.watchPath(rootDir, new FileChangeCallback() {
@Override
public void handleChanges(Collection<FileChangeEvent> changes) {
results.add(changes);
}
});
Thread[] array = new Thread[NUM_THREADS];
for (int i = 0; i< array.length; i++) {
array[i] = new Thread(new FileAdder(i));
array[i].start();
}
// mark each file received in a set
Set<String> files = new HashSet<>(NUM_THREADS * NUM_FILES);
// get changes until all the adds are in the set
Collection<FileChangeEvent> events = this.results.poll(WAIT_SECONDS, TimeUnit.SECONDS);
while (files.size() < NUM_THREADS * NUM_FILES && events != null) {
for (FileChangeEvent e : events) {
if (e.getType() == FileChangeEvent.Type.ADDED) {
files.add(e.getFile().getName());
}
}
if (files.size() < NUM_THREADS * NUM_FILES) {
events = this.results.poll(WAIT_SECONDS, TimeUnit.SECONDS);
}
}
// check the files created are all received
for (int i = 0; i < NUM_THREADS; i++) {
for (int j = 0; j < NUM_FILES; j++) {
Assert.assertTrue("Add for file [" + i + "," + j + "] was not received",
files.contains("thread-" + i + "-" + j));
}
}
}
results.clear();
}
private void checkResult(File file, FileChangeEvent.Type type) throws InterruptedException {
Assert.assertTrue("File " + file + " operation " + type + " not received in results", checkResult(file, type, results));
Assert.assertTrue("File " + file + " operation " + type + " not received in secondResults", checkResult(file, type, secondResults));
}
private static boolean checkResult(File file, FileChangeEvent.Type type, BlockingDeque<Collection<FileChangeEvent>> deque) throws InterruptedException {
// sometime OS will give a MODIFIED event on its parent folder when a file is ADDED
// consume all extra events until the expected one is received
Collection<FileChangeEvent> events = deque.poll(WAIT_SECONDS, TimeUnit.SECONDS);
while (events != null) {
for (FileChangeEvent e : events) {
if (file.equals(e.getFile()) && type == e.getType()) {
return true;
}
}
events = deque.poll(WAIT_SECONDS, TimeUnit.SECONDS);
}
return false;
}
public static void deleteRecursive(final File file) {
File[] files = file.listFiles();
if (files != null) {
for (File f : files) {
deleteRecursive(f);
}
}
file.delete();
}
/**
* Helper runnable to create NUM_FILES files in the working directory with
* the name: "thread-" + i + "-" + j. Where i is thread number and j the
* iteration [0-NUM_FILES). Between each file creation the thread waits a
* random time [0-100ms).
*/
class FileAdder implements Runnable {
private final int number;
private final Random random;
FileAdder(int number) {
this.number = number;
this.random = new Random();
}
@Override
public void run() {
for (int j = 0; j < NUM_FILES; j++) {
try {
Path added = rootDir.toPath().resolve("thread-" + number + "-" + j);
Files.write(added, added.getFileName().toString().getBytes());
final int timeout = random.nextInt(100);
if (timeout > 0) {
TimeUnit.MILLISECONDS.sleep(timeout);
}
} catch (IOException | InterruptedException e) {
Assert.fail("Thread " + number + " failed " + e.getMessage());
}
}
}
}
}