ZabUtils.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.test.ClientBase;

public class ZabUtils {

    private ZabUtils() {
    }

    public static final int SYNC_LIMIT = 2;

    public static QuorumPeer createQuorumPeer(File tmpDir) throws IOException {
        HashMap<Long, QuorumPeer.QuorumServer> peers = new HashMap<>();
        QuorumPeer peer = QuorumPeer.testingQuorumPeer();
        peer.syncLimit = SYNC_LIMIT;
        peer.initLimit = 2;
        peer.tickTime = 2000;

        peers.put(0L, new QuorumPeer.QuorumServer(0, new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
        peers.put(1L, new QuorumPeer.QuorumServer(1, new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
        peers.put(2L, new QuorumPeer.QuorumServer(2, new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique())));

        peer.setQuorumVerifier(new QuorumMaj(peers), false);
        peer.setCnxnFactory(new NullServerCnxnFactory());
        File version2 = new File(tmpDir, "version-2");
        version2.mkdir();
        ClientBase.createInitializeFile(tmpDir);
        FileOutputStream fos = new FileOutputStream(new File(version2, "currentEpoch"));
        fos.write("0\n".getBytes());
        fos.close();
        fos = new FileOutputStream(new File(version2, "acceptedEpoch"));
        fos.write("0\n".getBytes());
        fos.close();
        return peer;
    }

    public static Leader createLeader(File tmpDir, QuorumPeer peer) throws IOException, NoSuchFieldException, IllegalAccessException, X509Exception {
        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
        return new Leader(peer, zk);
    }

    public static Leader createMockLeader(File tmpDir, QuorumPeer peer) throws IOException, NoSuchFieldException, IllegalAccessException, X509Exception {
        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
        return new MockLeader(peer, zk);
    }

    private static LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer) throws IOException, NoSuchFieldException, IllegalAccessException {
        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
        peer.setTxnFactory(logFactory);
        ZKDatabase zkDb = new ZKDatabase(logFactory);
        LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
        return zk;
    }

    private static final class NullServerCnxnFactory extends ServerCnxnFactory {

        public void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException {
        }
        public void start() {
        }
        public void shutdown() {
        }
        public void setMaxClientCnxnsPerHost(int max) {
        }
        public void join() throws InterruptedException {
        }
        public int getMaxClientCnxnsPerHost() {
            return 0;
        }
        public int getSocketListenBacklog() {
            return -1;
        }
        public int getLocalPort() {
            return 0;
        }
        public InetSocketAddress getLocalAddress() {
            return null;
        }
        public Iterable<ServerCnxn> getConnections() {
            return null;
        }
        public void configure(InetSocketAddress addr, int maxcc, int listenBacklog, boolean secure) throws IOException {
        }

        @Override
        public boolean closeSession(long sessionId, ServerCnxn.DisconnectReason reason) {
            return false;
        }
        @Override
        public void closeAll(ServerCnxn.DisconnectReason reason) {
        }
        @Override
        public int getNumAliveConnections() {
            return 0;
        }
        @Override
        public void reconfigure(InetSocketAddress addr) {
        }
        @Override
        public void resetAllConnectionStats() {
        }
        @Override
        public Iterable<Map<String, Object>> getAllConnectionInfo(boolean brief) {
            return null;
        }

    }

    public static final class MockLeader extends Leader {

        MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk) throws IOException, X509Exception {
            super(qp, zk);
        }

        /**
         * This method returns the value of the variable that holds the epoch
         * to be proposed and that has been proposed, depending on the point
         * of the execution in which it is called.
         *
         * @return epoch
         */
        public long getCurrentEpochToPropose() {
            return epoch;
        }

    }

}