QuorumPeerMainTLSTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread.UNSET_STATIC_CLIENTPORT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
import java.io.IOException;
import java.security.Security;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.KeyStoreFileType;
import org.apache.zookeeper.common.X509KeyType;
import org.apache.zookeeper.common.X509TestContext;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ReconfigTest;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class QuorumPeerMainTLSTest extends QuorumPeerTestBase {

  protected static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMainTLSTest.class);
  private static File tempDir;
  private static X509TestContext x509TestContext = null;

  @BeforeAll
  public static void beforeAll() throws Exception {
    Security.addProvider(new BouncyCastleProvider());
    tempDir = ClientBase.createEmptyTestDir();
    x509TestContext = X509TestContext.newBuilder()
        .setTempDir(tempDir)
        .setKeyStoreKeyType(X509KeyType.EC)
        .setTrustStoreKeyType(X509KeyType.EC)
        .build();
  }

  @AfterAll
  public static void afterAll() {
    Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME);
    try {
      FileUtils.deleteDirectory(tempDir);
    } catch (IOException e) {
      // ignore
    }
  }


  // TODO - test reconfig - NIO cnxn factory initially, reconfig to listen on TLS port, should fail coz Netty cnxn factory is needed
  // TODO - equivalent of testReconfigRemoveClientFromStatic, but for secureClientPort
  interface QuorumConfigBuilder {
    String build(int id, String role, int quorumPort, int leaderPort, int clientPort, int secureClientPort);
  }

  static class MaybeSecureServers extends Servers {
    public int[] quorumPorts;
    public int[] leaderPorts;
    public boolean[] isSecureClient;
    public int[] secureClientPorts;
    int numParticipants;
    int numObservers;
    String quorumCfg;

    String otherCfg;

    public MaybeSecureServers(int numParticipants, int numObservers, String otherCfg, QuorumConfigBuilder quorumConfigBuilder) throws IOException {
      this.numParticipants = numParticipants;
      this.numObservers = numObservers;
      this.otherCfg = otherCfg;
      int SIZE = numParticipants + numObservers;

      this.mt = new MainThread[SIZE];
      this.zk = new ZooKeeper[SIZE];
      this.quorumPorts = new int[SIZE];
      this.leaderPorts = new int[SIZE];
      this.clientPorts = new int[SIZE];
      this.adminPorts = new int[SIZE];
      this.secureClientPorts = new int[SIZE];
      this.isSecureClient = new boolean[SIZE];

      StringBuilder quorumCfg = new StringBuilder();


      for (int i = 0; i < SIZE; i++){
        this.quorumPorts[i] = PortAssignment.unique();
        this.leaderPorts[i] = PortAssignment.unique();
        this.clientPorts[i] = PortAssignment.unique();
        this.adminPorts[i] = PortAssignment.unique();
        this.secureClientPorts[i] = PortAssignment.unique();
        String role = i < numParticipants ? "participant" : "observer";
        String serverEntry = quorumConfigBuilder.build(i, role, this.quorumPorts[i], this.leaderPorts[i], this.clientPorts[i], this.secureClientPorts[i]);
        quorumCfg.append(serverEntry).append("\n");
        if (serverEntry.endsWith("" + this.secureClientPorts[i])) {
          this.isSecureClient[i] = true;
        }
      }

      this.quorumCfg = quorumCfg.toString();
      for (int i = 0; i < SIZE; i++){
        this.mt[i] = new MainThread(i, UNSET_STATIC_CLIENTPORT, this.adminPorts[i], null, this.quorumCfg, this.otherCfg, null, true, null);
      }
    }

    public void restartSecureClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
      if (zk[clientIndex] != null) {
        zk[clientIndex].close();
      }

      isSecureClient[clientIndex] = true;
      zk[clientIndex] = new ZooKeeper(
          "127.0.0.1:" + secureClientPorts[clientIndex],
          ClientBase.CONNECTION_TIMEOUT,
          watcher, getClientTLSConfigs(x509TestContext));


    }

    public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
      int index = 0;
      for (MainThread t : mt) {
        if (!t.isAlive()) {
          t.start();
          index++;
        }
      }
      for (int i = 0; i < zk.length; i++) {
        if (isSecureClient[i]) {
          restartSecureClient(i, watcher);
        } else {
          restartClient(i, watcher);
        }
      }
    }
  }

  static Map<String, String> getServerTLSConfigs(X509TestContext x509TestContext) throws IOException {
    Map<String, String> sslConfigs = new HashMap<>();
    sslConfigs.put("ssl.keyStore.location", x509TestContext.getKeyStoreFile(KeyStoreFileType.PEM).getAbsolutePath());
    sslConfigs.put("ssl.trustStore.location", x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM).getAbsolutePath());
    sslConfigs.put("ssl.keyStore.type", "PEM");
    sslConfigs.put("ssl.trustStore.type", "PEM");
    // Netty is required for TLS
    sslConfigs.put("serverCnxnFactory", org.apache.zookeeper.server.NettyServerCnxnFactory.class.getName());
    return sslConfigs;
  }

  static ZKClientConfig getClientTLSConfigs(X509TestContext x509TestContext) throws IOException {
    if (x509TestContext == null) {
      throw new RuntimeException("x509TestContext cannot be null");
    }
    File clientKeyStore = x509TestContext.getKeyStoreFile(KeyStoreFileType.PEM);
    File clientTrustStore = x509TestContext.getTrustStoreFile(KeyStoreFileType.PEM);

    ZKClientConfig zKClientConfig = new ZKClientConfig();
    zKClientConfig.setProperty("zookeeper.client.secure", "true");
    zKClientConfig.setProperty("zookeeper.ssl.keyStore.location", clientKeyStore.getAbsolutePath());
    zKClientConfig.setProperty("zookeeper.ssl.trustStore.location", clientTrustStore.getAbsolutePath());
    zKClientConfig.setProperty("zookeeper.ssl.keyStore.type", "PEM");
    zKClientConfig.setProperty("zookeeper.ssl.trustStore.type", "PEM");
    // only netty supports TLS
    zKClientConfig.setProperty("zookeeper.clientCnxnSocket", org.apache.zookeeper.ClientCnxnSocketNetty.class.getName());
    return zKClientConfig;
  }

  /**
   * Starts a single server in replicated mode
   */
  @Test
  public void testTLSQuorumPeers() throws IOException, InterruptedException {
    Map<String, String> configMap = new HashMap<>();
    configMap.put("standaloneEnabled", "false");
    configMap.put("authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
    configMap.putAll(getServerTLSConfigs(x509TestContext));

    StringBuilder configBuilder = new StringBuilder();
    for (Map.Entry<String, String> entry : configMap.entrySet()) {
      configBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
    }

    MaybeSecureServers maybeSecureServers = new MaybeSecureServers(3, 2, configBuilder.toString(),
        (id, role, quorumPort, leaderPort, clientPort, secureClientPort) -> String.format("server.%d=127.0.0.1:%d:%d:%s;;127.0.0.1:%d", id, quorumPort, leaderPort, role, secureClientPort));

    // wire to "servers" of QuorumPeerTestBase, so it can be destroyed in QuorumPeerTestBase.tearDown()
    servers = maybeSecureServers;

    // start servers and clients
    maybeSecureServers.restartAllServersAndClients(this);

    // wait for clients to connect
    waitForAll(maybeSecureServers, ZooKeeper.States.CONNECTED);

    // Find and log leader
    maybeSecureServers.findLeader();

    QuorumPeer qp0 = maybeSecureServers.mt[0].getQuorumPeer();

    assertNotNull(qp0);

    // verify no listener on client port
    assertNull(qp0.cnxnFactory);
    assertNull(qp0.getClientAddress());
    assertEquals(-1, qp0.getClientPort());

    // verify valid secure client port listener exists
    assertNotNull(qp0.secureCnxnFactory);
    assertNotNull(qp0.getSecureClientAddress());
    assertEquals(maybeSecureServers.secureClientPorts[0], qp0.getSecureClientPort());
    assertEquals(maybeSecureServers.secureClientPorts[0], qp0.getSecureClientAddress().getPort());
  }

  @Test
  public void reconfigFromClientPortToSecureClientPort() throws IOException, InterruptedException, KeeperException {
    Map<String, String> configMap = new HashMap<>();
    configMap.put("reconfigEnabled", "true");
    configMap.put("authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
    configMap.put("DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
    configMap.putAll(getServerTLSConfigs(x509TestContext));

    StringBuilder configBuilder = new StringBuilder();
    for (Map.Entry<String, String> entry : configMap.entrySet()) {
      configBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
    }

    MaybeSecureServers maybeSecureServers = new MaybeSecureServers(3, 0, configBuilder.toString(),
        (id, role, quorumPort, leaderPort, clientPort, secureClientPort) -> String.format("server.%d=127.0.0.1:%d:%d:%s;127.0.0.1:%d", id, quorumPort, leaderPort, role, clientPort));

    servers = maybeSecureServers;

    maybeSecureServers.restartAllServersAndClients(this);

    waitForAll(maybeSecureServers, ZooKeeper.States.CONNECTED);

    ZooKeeperAdmin zkAdmin = new ZooKeeperAdmin("127.0.0.1:" + maybeSecureServers.clientPorts[0], ClientBase.CONNECTION_TIMEOUT, this);
    zkAdmin.addAuthInfo("digest", "super:test".getBytes());

    List<String> joiningServers = new ArrayList<>();
    List<String> leavingServers = new ArrayList<>();

    int reconfigIndex = 1;
    leavingServers.add(Integer.toString(reconfigIndex));
    joiningServers.add(String.format("server.%d=127.0.0.1:%d:%d:%s;;127.0.0.1:%d", reconfigIndex,
        maybeSecureServers.quorumPorts[reconfigIndex], maybeSecureServers.leaderPorts[reconfigIndex],
        "participant", maybeSecureServers.secureClientPorts[reconfigIndex]));

    ReconfigTest.reconfig(zkAdmin, null, leavingServers, null, -1);
    LOG.info("Reconfig REMOVE done with leavingServers={}!", leavingServers);
    ReconfigTest.testServerHasConfig(maybeSecureServers.zk[0], null, leavingServers);

    ReconfigTest.reconfig(zkAdmin, joiningServers, null, null, -1);
    LOG.info("Reconfig ADD done with joiningServers={}!", joiningServers);

    ReconfigTest.testServerHasConfig(maybeSecureServers.zk[0], joiningServers, null);

    assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + maybeSecureServers.clientPorts[reconfigIndex], 5000, false));

    maybeSecureServers.restartSecureClient(reconfigIndex, this);
    waitForOne(maybeSecureServers.zk[reconfigIndex], ZooKeeper.States.CONNECTED);
    ReconfigTest.testNormalOperation(maybeSecureServers.zk[0], maybeSecureServers.zk[reconfigIndex]);
  }

}