TestZKCuratorManager.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.curator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.util.JaasConfiguration;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Test the manager for ZooKeeper Curator.
*/
public class TestZKCuratorManager {
private TestingServer server;
private ZKCuratorManager curator;
@BeforeEach
public void setup() throws Exception {
this.server = new TestingServer();
Configuration conf = new Configuration();
String zkHostPort = this.server.getConnectString();
this.curator = new ZKCuratorManager(conf);
this.curator.start(zkHostPort);
}
@AfterEach
public void teardown() throws Exception {
this.curator.close();
if (this.server != null) {
this.server.close();
this.server = null;
}
}
@Test
public void testReadWriteData() throws Exception {
String testZNode = "/test";
String expectedString = "testString";
assertFalse(curator.exists(testZNode));
curator.create(testZNode);
assertTrue(curator.exists(testZNode));
curator.setData(testZNode, expectedString, -1);
String testString = curator.getStringData("/test");
assertEquals(expectedString, testString);
}
@Test
public void testChildren() throws Exception {
List<String> children = curator.getChildren("/");
assertEquals(1, children.size());
assertFalse(curator.exists("/node1"));
curator.create("/node1");
assertTrue(curator.exists("/node1"));
assertFalse(curator.exists("/node2"));
curator.create("/node2");
assertTrue(curator.exists("/node2"));
children = curator.getChildren("/");
assertEquals(3, children.size());
curator.delete("/node2");
assertFalse(curator.exists("/node2"));
children = curator.getChildren("/");
assertEquals(2, children.size());
}
@Test
public void testGetStringData() throws Exception {
String node1 = "/node1";
String node2 = "/node2";
assertFalse(curator.exists(node1));
curator.create(node1);
assertNull(curator.getStringData(node1));
byte[] setData = "setData".getBytes(StandardCharsets.UTF_8);
curator.setData(node1, setData, -1);
assertEquals("setData", curator.getStringData(node1));
Stat stat = new Stat();
assertFalse(curator.exists(node2));
curator.create(node2);
assertNull(curator.getStringData(node2, stat));
curator.setData(node2, setData, -1);
assertEquals("setData", curator.getStringData(node2, stat));
}
@Test
public void testTransaction() throws Exception {
List<ACL> zkAcl = ZKUtil.parseACLs(CommonConfigurationKeys.ZK_ACL_DEFAULT);
String fencingNodePath = "/fencing";
String node1 = "/node1";
String node2 = "/node2";
byte[] testData = "testData".getBytes(StandardCharsets.UTF_8);
assertFalse(curator.exists(fencingNodePath));
assertFalse(curator.exists(node1));
assertFalse(curator.exists(node2));
ZKCuratorManager.SafeTransaction txn = curator.createTransaction(
zkAcl, fencingNodePath);
txn.create(node1, testData, zkAcl, CreateMode.PERSISTENT);
txn.create(node2, testData, zkAcl, CreateMode.PERSISTENT);
assertFalse(curator.exists(fencingNodePath));
assertFalse(curator.exists(node1));
assertFalse(curator.exists(node2));
txn.commit();
assertFalse(curator.exists(fencingNodePath));
assertTrue(curator.exists(node1));
assertTrue(curator.exists(node2));
assertTrue(Arrays.equals(testData, curator.getData(node1)));
assertTrue(Arrays.equals(testData, curator.getData(node2)));
byte[] setData = "setData".getBytes(StandardCharsets.UTF_8);
txn = curator.createTransaction(zkAcl, fencingNodePath);
txn.setData(node1, setData, -1);
txn.delete(node2);
assertTrue(curator.exists(node2));
assertTrue(Arrays.equals(testData, curator.getData(node1)));
txn.commit();
assertFalse(curator.exists(node2));
assertTrue(Arrays.equals(setData, curator.getData(node1)));
}
@Test
public void testJaasConfiguration() throws Exception {
// Validate that HadoopZooKeeperFactory will set ZKConfig with given principals
ZKCuratorManager.HadoopZookeeperFactory factory1 =
new ZKCuratorManager.HadoopZookeeperFactory("foo1", "bar1", "bar1.keytab");
ZooKeeper zk1 = factory1.newZooKeeper("connString", 1000, null, false);
validateJaasConfiguration(ZKCuratorManager.HadoopZookeeperFactory.JAAS_CLIENT_ENTRY,
"bar1", "bar1.keytab", zk1);
// Validate that a new HadoopZooKeeperFactory will use the new principals
ZKCuratorManager.HadoopZookeeperFactory factory2 =
new ZKCuratorManager.HadoopZookeeperFactory("foo2", "bar2", "bar2.keytab");
ZooKeeper zk2 = factory2.newZooKeeper("connString", 1000, null, false);
validateJaasConfiguration(ZKCuratorManager.HadoopZookeeperFactory.JAAS_CLIENT_ENTRY,
"bar2", "bar2.keytab", zk2);
try {
// Setting global configuration
String testClientConfig = "TestClientConfig";
JaasConfiguration jconf = new JaasConfiguration(testClientConfig, "test", "test.keytab");
javax.security.auth.login.Configuration.setConfiguration(jconf);
System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, testClientConfig);
// Validate that a new HadoopZooKeeperFactory will use the global principals
ZKCuratorManager.HadoopZookeeperFactory factory3 =
new ZKCuratorManager.HadoopZookeeperFactory("foo3", "bar3", "bar3.keytab");
ZooKeeper zk3 = factory3.newZooKeeper("connString", 1000, null, false);
validateJaasConfiguration(testClientConfig, "test", "test.keytab", zk3);
} finally {
// Remove global configuration
System.clearProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY);
}
}
@Test
public void testCuratorFrameworkFactory() throws Exception{
// By not explicitly calling the NewZooKeeper method validate that the Curator override works.
ZKClientConfig zkClientConfig = new ZKClientConfig();
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
CommonConfigurationKeys.ZK_TIMEOUT_MS_DEFAULT);
int zkRetryInterval = conf.getInt(
CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS,
CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS_DEFAULT);
RetryNTimes retryPolicy = new RetryNTimes(numRetries, zkRetryInterval);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(conf.get(CommonConfigurationKeys.ZK_ADDRESS))
.zkClientConfig(zkClientConfig)
.sessionTimeoutMs(zkSessionTimeout).retryPolicy(retryPolicy)
.authorization(new ArrayList<>())
.zookeeperFactory(new ZKCuratorManager.HadoopZookeeperFactory(
"foo1", "bar1", "bar1.keytab", false,
new SecurityUtil.TruststoreKeystore(conf))
).build();
client.start();
validateJaasConfiguration(ZKCuratorManager.HadoopZookeeperFactory.JAAS_CLIENT_ENTRY,
"bar1", "bar1.keytab", client.getZookeeperClient().getZooKeeper());
}
private void validateJaasConfiguration(String clientConfig, String principal, String keytab,
ZooKeeper zk) {
assertEquals(clientConfig,
zk.getClientConfig().getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY),
"Validate that expected clientConfig is set in ZK config");
AppConfigurationEntry[] entries = javax.security.auth.login.Configuration.getConfiguration()
.getAppConfigurationEntry(clientConfig);
assertEquals(principal, entries[0].getOptions().get("principal"),
"Validate that expected principal is set in Jaas config");
assertEquals(keytab, entries[0].getOptions().get("keyTab"),
"Validate that expected keytab is set in Jaas config");
}
}