ZooKeeperServerMainTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider.MetricsProviderCapturingLifecycle;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider.MetricsProviderWithConfiguration;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider.MetricsProviderWithErrorInConfigure;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider.MetricsProviderWithErrorInStart;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider.MetricsProviderWithErrorInStop;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test stand-alone server.
*
*/
public class ZooKeeperServerMainTest extends ZKTestCase implements Watcher {
protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperServerMainTest.class);
private CountDownLatch clientConnected = new CountDownLatch(1);
public static class MainThread extends Thread {
final File confFile;
final TestZKSMain main;
final File tmpDir;
final File dataDir;
final File logDir;
public MainThread(int clientPort, boolean preCreateDirs, String configs) throws IOException {
this(clientPort, null, preCreateDirs, ClientBase.createTmpDir(), configs);
}
public MainThread(int clientPort, Integer secureClientPort, boolean preCreateDirs, String configs)
throws IOException {
this(clientPort, secureClientPort,
preCreateDirs, ClientBase.createTmpDir(), configs);
}
public MainThread(int clientPort, Integer secureClientPort, boolean preCreateDirs, File tmpDir, String configs) throws IOException {
super("Standalone server with clientPort:" + clientPort);
this.tmpDir = tmpDir;
confFile = new File(tmpDir, "zoo.cfg");
FileWriter fwriter = new FileWriter(confFile);
fwriter.write("tickTime=2000\n");
fwriter.write("initLimit=10\n");
fwriter.write("syncLimit=5\n");
if (configs != null) {
fwriter.write(configs);
}
dataDir = new File(this.tmpDir, "data");
logDir = new File(dataDir.toString() + "_txnlog");
if (preCreateDirs) {
if (!dataDir.mkdir()) {
throw new IOException("unable to mkdir " + dataDir);
}
if (!logDir.mkdir()) {
throw new IOException("unable to mkdir " + logDir);
}
ClientBase.createInitializeFile(logDir);
}
String normalizedDataDir = PathUtils.normalizeFileSystemPath(dataDir.toString());
String normalizedLogDir = PathUtils.normalizeFileSystemPath(logDir.toString());
fwriter.write("dataDir=" + normalizedDataDir + "\n");
fwriter.write("dataLogDir=" + normalizedLogDir + "\n");
fwriter.write("clientPort=" + clientPort + "\n");
if (secureClientPort != null) {
fwriter.write("secureClientPort=" + secureClientPort + "\n");
}
fwriter.flush();
fwriter.close();
main = new TestZKSMain();
}
public void run() {
String[] args = new String[1];
args[0] = confFile.toString();
try {
main.initializeAndRun(args);
} catch (Exception e) {
// test will still fail even though we just log/ignore
LOG.error("unexpected exception in run", e);
}
}
public void shutdown() throws IOException {
main.shutdown();
}
void deleteDirs() throws IOException {
delete(tmpDir);
}
void delete(File f) throws IOException {
if (f.isDirectory()) {
for (File c : f.listFiles()) {
delete(c);
}
}
if (!f.delete()) {
// double check for the file existence
if (f.exists()) {
throw new IOException("Failed to delete file: " + f);
}
}
}
ServerCnxnFactory getCnxnFactory() {
return main.getCnxnFactory();
}
public ServerCnxnFactory getSecureCnxnFactory(){
return main.getSecureCnxnFactory();
}
}
public static class TestZKSMain extends ZooKeeperServerMain {
public void shutdown() {
super.shutdown();
}
}
/**
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2247.
* Test to verify that even after non recoverable error (error while
* writing transaction log), ZooKeeper is still available.
*/
@Test
@Timeout(value = 30)
public void testNonRecoverableError() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
MainThread main = new MainThread(CLIENT_PORT, true, null);
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT),
"waiting for server being up");
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
zk.create("/foo1", "foobar".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo1", null, null)), "foobar");
// inject problem in server
ZooKeeperServer zooKeeperServer = main.getCnxnFactory().getZooKeeperServer();
FileTxnSnapLog snapLog = zooKeeperServer.getTxnLogFactory();
FileTxnSnapLog fileTxnSnapLogWithError = new FileTxnSnapLog(snapLog.getDataLogDir(), snapLog.getSnapDir()) {
@Override
public void commit() throws IOException {
throw new IOException("Input/output error");
}
};
ZKDatabase newDB = new ZKDatabase(fileTxnSnapLogWithError);
zooKeeperServer.setZKDatabase(newDB);
try {
// do create operation, so that injected IOException is thrown
zk.create("/foo2", "foobar".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("IOException is expected as error is injected in transaction log commit functionality");
} catch (Exception e) {
// do nothing
}
zk.close();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT),
"waiting for server down");
fileTxnSnapLogWithError.close();
main.shutdown();
main.deleteDirs();
}
/**
* Tests that the ZooKeeper server will fail to start if the
* snapshot directory is read only.
*
* This test will fail if it is executed as root user.
*/
@Test
@Timeout(value = 30)
public void testReadOnlySnapshotDir() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
// Start up the ZK server to automatically create the necessary directories
// and capture the directory where data is stored
MainThread main = new MainThread(CLIENT_PORT, true, null);
File tmpDir = main.tmpDir;
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2),
"waiting for server being up");
main.shutdown();
// Make the snapshot directory read only
File snapDir = new File(main.dataDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
snapDir.setWritable(false);
// Restart ZK and observe a failure
main = new MainThread(CLIENT_PORT, null, false, tmpDir, null);
main.start();
assertFalse(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2),
"waiting for server being up");
main.shutdown();
snapDir.setWritable(true);
main.deleteDirs();
}
/**
* Tests that the ZooKeeper server will fail to start if the
* transaction log directory is read only.
*
* This test will fail if it is executed as root user.
*/
@Test
@Timeout(value = 30)
public void testReadOnlyTxnLogDir() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
// Start up the ZK server to automatically create the necessary directories
// and capture the directory where data is stored
MainThread main = new MainThread(CLIENT_PORT, true, null);
File tmpDir = main.tmpDir;
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2),
"waiting for server being up");
main.shutdown();
// Make the transaction log directory read only
File logDir = new File(main.logDir, FileTxnSnapLog.version + FileTxnSnapLog.VERSION);
logDir.setWritable(false);
// Restart ZK and observe a failure
main = new MainThread(CLIENT_PORT, null, false, tmpDir, null);
main.start();
assertFalse(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT / 2),
"waiting for server being up");
main.shutdown();
logDir.setWritable(true);
main.deleteDirs();
}
/**
* Verify the ability to start a standalone server instance.
*/
@Test
public void testStandalone() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
MainThread main = new MainThread(CLIENT_PORT, true, null);
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT),
"waiting for server being up");
clientConnected = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
assertTrue(clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Failed to establish zkclient connection!");
zk.create("/foo", "foobar".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo", null, null)), "foobar");
zk.close();
main.shutdown();
main.join();
main.deleteDirs();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT),
"waiting for server down");
}
/**
* Test verifies that the server shouldn't allow minsessiontimeout greater than
* maxsessiontimeout
*/
@Test
public void testWithMinSessionTimeoutGreaterThanMaxSessionTimeout() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
final int tickTime = 2000;
final int minSessionTimeout = 20 * tickTime + 1000; // min is higher
final int maxSessionTimeout = tickTime * 2 - 100; // max is lower
final String configs = "maxSessionTimeout="
+ maxSessionTimeout
+ "\n"
+ "minSessionTimeout="
+ minSessionTimeout
+ "\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
String[] args = new String[1];
args[0] = main.confFile.toString();
try {
main.main.initializeAndRun(args);
fail("Must throw exception as " + "minsessiontimeout > maxsessiontimeout");
} catch (ConfigException iae) {
// expected
}
}
/**
* Test verifies that the server shouldn't boot with an invalid metrics provider
*/
@Test
public void testInvalidMetricsProvider() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
final String configs = "metricsProvider.className=BadClass\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
String[] args = new String[1];
args[0] = main.confFile.toString();
try {
main.main.initializeAndRun(args);
fail("Must throw exception as metrics provider is not " + "well configured");
} catch (ConfigException iae) {
// expected
}
}
/**
* Test verifies that the server shouldn't boot with a faulty metrics provider
*/
@Test
public void testFaultyMetricsProviderOnStart() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
final String configs = "metricsProvider.className=" + MetricsProviderWithErrorInStart.class.getName() + "\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
String[] args = new String[1];
args[0] = main.confFile.toString();
try {
main.main.initializeAndRun(args);
fail("Must throw exception as metrics provider cannot boot");
} catch (IOException iae) {
// expected
}
}
/**
* Test verifies that the server shouldn't boot with a faulty metrics provider
*/
@Test
public void testFaultyMetricsProviderOnConfigure() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
final String configs = "metricsProvider.className="
+ MetricsProviderWithErrorInConfigure.class.getName()
+ "\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
String[] args = new String[1];
args[0] = main.confFile.toString();
try {
main.main.initializeAndRun(args);
fail("Must throw exception as metrics provider is cannot boot");
} catch (IOException iae) {
// expected
}
}
/**
* Test verifies that the server shouldn't be affected but runtime errors on stop()
*/
@Test
public void testFaultyMetricsProviderOnStop() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
MetricsProviderWithErrorInStop.stopCalled.set(false);
final String configs = "metricsProvider.className=" + MetricsProviderWithErrorInStop.class.getName() + "\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT),
"waiting for server being up");
clientConnected = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
assertTrue(clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Failed to establish zkclient connection!");
zk.create("/foo", "foobar".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo", null, null)), "foobar");
zk.close();
main.shutdown();
main.join();
main.deleteDirs();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT),
"waiting for server down");
assertTrue(MetricsProviderWithErrorInStop.stopCalled.get());
}
/**
* Test verifies that configuration is passed to the MetricsProvider.
*/
@Test
public void testMetricsProviderConfiguration() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
MetricsProviderWithConfiguration.httpPort.set(0);
final String configs = "metricsProvider.className="
+ MetricsProviderWithConfiguration.class.getName()
+ "\n"
+ "metricsProvider.httpPort=1234\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT),
"waiting for server being up");
clientConnected = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
assertTrue(clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Failed to establish zkclient connection!");
zk.create("/foo", "foobar".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo", null, null)), "foobar");
zk.close();
main.shutdown();
main.join();
main.deleteDirs();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT),
"waiting for server down");
assertEquals(1234, MetricsProviderWithConfiguration.httpPort.get());
}
/**
* Test verifies that all of the lifecycle methods of the MetricsProvider are called.
*/
@Test
public void testMetricsProviderLifecycle() throws Exception {
ClientBase.setupTestEnv();
MetricsProviderCapturingLifecycle.reset();
final int CLIENT_PORT = PortAssignment.unique();
final String configs = "metricsProvider.className="
+ MetricsProviderCapturingLifecycle.class.getName()
+ "\n"
+ "metricsProvider.httpPort=1234\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
main.start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT),
"waiting for server being up");
clientConnected = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT, this);
assertTrue(clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Failed to establish zkclient connection!");
zk.create("/foo", "foobar".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo", null, null)), "foobar");
zk.close();
main.shutdown();
main.join();
main.deleteDirs();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT, ClientBase.CONNECTION_TIMEOUT), "waiting for server down");
assertTrue(BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.configureCalled.get(), "metrics provider lifecycle error");
assertTrue(BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.startCalled.get(), "metrics provider lifecycle error");
assertTrue(BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.getRootContextCalled.get(), "metrics provider lifecycle error");
assertTrue(BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.stopCalled.get(), "metrics provider lifecycle error");
}
/**
* Test verifies that the server is able to redefine if user configured only
* minSessionTimeout limit
*/
@Test
public void testWithOnlyMinSessionTimeout() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
final int tickTime = 2000;
final int minSessionTimeout = tickTime * 2 - 100;
int maxSessionTimeout = 20 * tickTime;
final String configs = "minSessionTimeout=" + minSessionTimeout + "\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
main.start();
String HOSTPORT = "127.0.0.1:" + CLIENT_PORT;
assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT), "waiting for server being up");
// create session with min value
verifySessionTimeOut(minSessionTimeout, minSessionTimeout, HOSTPORT);
verifySessionTimeOut(minSessionTimeout - 2000, minSessionTimeout, HOSTPORT);
// create session with max value
verifySessionTimeOut(maxSessionTimeout, maxSessionTimeout, HOSTPORT);
verifySessionTimeOut(maxSessionTimeout + 2000, maxSessionTimeout, HOSTPORT);
main.shutdown();
assertTrue(ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT), "waiting for server down");
}
/**
* Test verifies that the server is able to redefine the min/max session
* timeouts
*/
@Test
public void testMinMaxSessionTimeOut() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT = PortAssignment.unique();
final int tickTime = 2000;
final int minSessionTimeout = tickTime * 2 - 100;
final int maxSessionTimeout = 20 * tickTime + 1000;
final String configs = "maxSessionTimeout="
+ maxSessionTimeout
+ "\n"
+ "minSessionTimeout="
+ minSessionTimeout
+ "\n";
MainThread main = new MainThread(CLIENT_PORT, true, configs);
main.start();
String HOSTPORT = "127.0.0.1:" + CLIENT_PORT;
assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT), "waiting for server being up");
// create session with min value
verifySessionTimeOut(minSessionTimeout, minSessionTimeout, HOSTPORT);
verifySessionTimeOut(minSessionTimeout - 2000, minSessionTimeout, HOSTPORT);
// create session with max value
verifySessionTimeOut(maxSessionTimeout, maxSessionTimeout, HOSTPORT);
verifySessionTimeOut(maxSessionTimeout + 2000, maxSessionTimeout, HOSTPORT);
main.shutdown();
assertTrue(ClientBase.waitForServerDown(HOSTPORT, ClientBase.CONNECTION_TIMEOUT), "waiting for server down");
}
private void verifySessionTimeOut(int sessionTimeout, int expectedSessionTimeout, String HOSTPORT) throws IOException, KeeperException, InterruptedException {
clientConnected = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(HOSTPORT, sessionTimeout, this);
assertTrue(clientConnected.await(sessionTimeout, TimeUnit.MILLISECONDS), "Failed to establish zkclient connection!");
assertEquals(expectedSessionTimeout, zk.getSessionTimeout(), "Not able to configure the sessionTimeout values");
zk.close();
}
@Test
public void testJMXRegistrationWithNIO() throws Exception {
ClientBase.setupTestEnv();
File tmpDir_1 = ClientBase.createTmpDir();
ServerCnxnFactory server_1 = startServer(tmpDir_1);
File tmpDir_2 = ClientBase.createTmpDir();
ServerCnxnFactory server_2 = startServer(tmpDir_2);
server_1.shutdown();
server_2.shutdown();
deleteFile(tmpDir_1);
deleteFile(tmpDir_2);
}
@Test
public void testJMXRegistrationWithNetty() throws Exception {
String originalServerCnxnFactory = System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, NettyServerCnxnFactory.class.getName());
try {
ClientBase.setupTestEnv();
File tmpDir_1 = ClientBase.createTmpDir();
ServerCnxnFactory server_1 = startServer(tmpDir_1);
File tmpDir_2 = ClientBase.createTmpDir();
ServerCnxnFactory server_2 = startServer(tmpDir_2);
server_1.shutdown();
server_2.shutdown();
deleteFile(tmpDir_1);
deleteFile(tmpDir_2);
} finally {
// setting back
if (originalServerCnxnFactory == null || originalServerCnxnFactory.isEmpty()) {
System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
} else {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, originalServerCnxnFactory);
}
}
}
private void deleteFile(File f) throws IOException {
if (f.isDirectory()) {
for (File c : f.listFiles()) {
deleteFile(c);
}
}
if (!f.delete()) {
// double check for the file existence
if (f.exists()) {
throw new IOException("Failed to delete file: " + f);
}
}
}
private ServerCnxnFactory startServer(File tmpDir) throws IOException, InterruptedException {
final int CLIENT_PORT = PortAssignment.unique();
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
ServerCnxnFactory f = ServerCnxnFactory.createFactory(CLIENT_PORT, -1);
f.startup(zks);
assertNotNull(zks.jmxServerBean, "JMX initialization failed!");
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT, CONNECTION_TIMEOUT),
"waiting for server being up");
return f;
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
clientConnected.countDown();
}
}
}