CnxManagerTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.InitialMessage;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.FLENewEpochTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CnxManagerTest extends ZKTestCase {

    protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class);
    protected static final int THRESHOLD = 4;

    int count;
    Map<Long, QuorumServer> peers;
    File[] peerTmpdir;
    int[] peerQuorumPort;
    int[] peerClientPort;
    @BeforeEach
    public void setUp() throws Exception {

        this.count = 3;
        this.peers = new HashMap<>(count);
        peerTmpdir = new File[count];
        peerQuorumPort = new int[count];
        peerClientPort = new int[count];

        for (int i = 0; i < count; i++) {
            peerQuorumPort[i] = PortAssignment.unique();
            peerClientPort[i] = PortAssignment.unique();
            peers.put((long) i, new QuorumServer(i, new InetSocketAddress("127.0.0.1", peerQuorumPort[i]), new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", peerClientPort[i])));
            peerTmpdir[i] = ClientBase.createTmpDir();
        }
    }

    ByteBuffer createMsg(int state, long leader, long zxid, long epoch) {
        byte[] requestBytes = new byte[28];
        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

        /*
         * Building notification packet to send
         */

        requestBuffer.clear();
        requestBuffer.putInt(state);
        requestBuffer.putLong(leader);
        requestBuffer.putLong(zxid);
        requestBuffer.putLong(epoch);

        return requestBuffer;
    }

    class CnxManagerThread extends Thread {

        boolean failed;
        CnxManagerThread() {
            failed = false;
        }

        public void run() {
            try {
                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2, 2);
                QuorumCnxManager cnxManager = peer.createCnxnManager();
                QuorumCnxManager.Listener listener = cnxManager.listener;
                if (listener != null) {
                    listener.start();
                } else {
                    LOG.error("Null listener when initializing cnx manager");
                }

                long sid = 1;
                cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));

                Message m = null;
                int numRetries = 1;
                while ((m == null) && (numRetries++ <= THRESHOLD)) {
                    m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                    if (m == null) {
                        cnxManager.connectAll();
                    }
                }

                if (numRetries > THRESHOLD) {
                    failed = true;
                    return;
                }

                cnxManager.testInitiateConnection(sid);

                m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                if (m == null) {
                    failed = true;
                }
            } catch (Exception e) {
                LOG.error("Exception while running mock thread", e);
                fail("Unexpected exception");
            }
        }

    }

    @Test
    public void testCnxManager() throws Exception {
        CnxManagerThread thread = new CnxManagerThread();

        thread.start();

        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }

        cnxManager.toSend(0L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));

        Message m = null;
        int numRetries = 1;
        while ((m == null) && (numRetries++ <= THRESHOLD)) {
            m = cnxManager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
            if (m == null) {
                cnxManager.connectAll();
            }
        }

        assertTrue(numRetries <= THRESHOLD, "Exceeded number of retries");

        thread.join(5000);
        if (thread.isAlive()) {
            fail("Thread didn't join");
        } else {
            if (thread.failed) {
                fail("Did not receive expected message");
            }
        }
        cnxManager.halt();
        assertFalse(cnxManager.listener.isAlive());
    }

    @Test
    public void testCnxManagerTimeout() throws Exception {
        int address = ThreadLocalRandom.current().nextInt(1, 255);
        int deadPort = PortAssignment.unique();
        String deadAddress = "10.1.1." + address;

        LOG.info("This is the dead address I'm trying: {}", deadAddress);

        peers.put(2L,
                  new QuorumServer(2,
                                   new InetSocketAddress(deadAddress, deadPort),
                                   new InetSocketAddress(deadAddress, PortAssignment.unique()),
                                   new InetSocketAddress(deadAddress, PortAssignment.unique())));
        peerTmpdir[2] = ClientBase.createTmpDir();

        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }

        long begin = Time.currentElapsedTime();
        cnxManager.toSend(2L, createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
        long end = Time.currentElapsedTime();

        if ((end - begin) > 10_000) {
            fail("Waited more than necessary");
        }
        cnxManager.halt();
        assertFalse(cnxManager.listener.isAlive());
    }

    /**
     * Tests a bug in QuorumCnxManager that causes a spin lock
     * when a negative value is sent. This test checks if the
     * connection is being closed upon a message with negative
     * length.
     *
     * @throws Exception
     */
    @Test
    public void testCnxManagerSpinLock() throws Exception {
        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }

        InetSocketAddress address = peers.get(peer.getMyId()).electionAddr.getReachableOrOne();
        LOG.info("Election port: {}", address.getPort());

        Thread.sleep(1000);

        SocketChannel sc = SocketChannel.open();
        sc.socket().connect(address, 5000);

        InetSocketAddress otherAddr = peers.get(2L).electionAddr.getReachableOrOne();
        DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
        dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION_V1);
        dout.writeLong(2);
        String addr = otherAddr.getHostString() + ":" + otherAddr.getPort();
        byte[] addr_bytes = addr.getBytes();
        dout.writeInt(addr_bytes.length);
        dout.write(addr_bytes);
        dout.flush();

        ByteBuffer msgBuffer = ByteBuffer.wrap(new byte[4]);
        msgBuffer.putInt(-20);
        msgBuffer.position(0);
        sc.write(msgBuffer);

        Thread.sleep(1000);

        try {
            /*
             * Write a number of times until it
             * detects that the socket is broken.
             */
            for (int i = 0; i < 100; i++) {
                msgBuffer.position(0);
                sc.write(msgBuffer);
            }
            fail("Socket has not been closed");
        } catch (Exception e) {
            LOG.info("Socket has been closed as expected");
        }
        peer.shutdown();
        cnxManager.halt();
        assertFalse(cnxManager.listener.isAlive());
    }

    /**
     * Test for bug described in https://issues.apache.org/jira/browse/ZOOKEEPER-3320.
     * Test create peer with address which contains unresolvable DNS name,
     * leader election listener thread should stop after N errors.
     *
     * @throws Exception
     */
    @Test
    public void testCnxManagerListenerThreadConfigurableRetry() throws Exception {
        final Map<Long, QuorumServer> unresolvablePeers = new HashMap<>();
        final long myid = 1L;
        unresolvablePeers.put(myid, new QuorumServer(myid, "unresolvable-domain.org:2182:2183;2181"));
        final QuorumPeer peer = new QuorumPeer(unresolvablePeers, ClientBase.createTmpDir(), ClientBase.createTmpDir(), 2181, 3, myid, 1000, 2, 2, 2);
        final QuorumCnxManager cnxManager = peer.createCnxnManager();
        final QuorumCnxManager.Listener listener = cnxManager.listener;
        final AtomicBoolean errorHappened = new AtomicBoolean(false);
        listener.setSocketBindErrorHandler(() -> errorHappened.set(true));
        listener.start();
        // listener thread should stop and throws error which notify QuorumPeer about error.
        // QuorumPeer should start shutdown process
        listener.join(15000); // set wait time, if listener contains bug and thread not stops.
        assertFalse(listener.isAlive());
        assertTrue(errorHappened.get());
        assertFalse(listener.isAlive(), QuorumPeer.class.getSimpleName() + " not stopped after " + "listener thread death");
    }

    /**
     * Tests a bug in QuorumCnxManager that causes a NPE when a 3.4.6
     * observer connects to a 3.5.0 server.
     * see https://issues.apache.org/jira/browse/ZOOKEEPER-1789
     *
     * @throws Exception
     */
    @Test
    public void testCnxManagerNPE() throws Exception {
        // the connecting peer (id = 2) is a 3.4.6 observer
        peers.get(2L).type = LearnerType.OBSERVER;
        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        InetSocketAddress address = peers.get(peer.getMyId()).electionAddr.getReachableOrOne();
        LOG.info("Election port: {}", address.getPort());

        Thread.sleep(1000);

        SocketChannel sc = SocketChannel.open();
        sc.socket().connect(address, 5000);

        /*
         * Write id (3.4.6 protocol). This previously caused a NPE in
         * QuorumCnxManager.
         */
        byte[] msgBytes = new byte[8];
        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
        msgBuffer.putLong(2L);
        msgBuffer.position(0);
        sc.write(msgBuffer);

        msgBuffer = ByteBuffer.wrap(new byte[8]);
        // write length of message
        msgBuffer.putInt(4);
        // write message
        msgBuffer.putInt(5);
        msgBuffer.position(0);
        sc.write(msgBuffer);

        Message m = cnxManager.pollRecvQueue(1000, TimeUnit.MILLISECONDS);
        assertNotNull(m);

        peer.shutdown();
        cnxManager.halt();
        assertFalse(cnxManager.listener.isAlive());
    }

    /*
     * Test if a receiveConnection is able to timeout on socket errors
     */
    @Test
    public void testSocketTimeout() throws Exception {
        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2, 2);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        QuorumCnxManager.Listener listener = cnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        InetSocketAddress address = peers.get(peer.getMyId()).electionAddr.getReachableOrOne();
        LOG.info("Election port: {}", address.getPort());
        Thread.sleep(1000);

        Socket sock = new Socket();
        sock.connect(address, 5000);
        long begin = Time.currentElapsedTime();
        // Read without sending data. Verify timeout.
        cnxManager.receiveConnection(sock);
        long end = Time.currentElapsedTime();
        if ((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) {
            fail("Waited more than necessary");
        }
        cnxManager.halt();
        assertFalse(cnxManager.listener.isAlive());
    }

    /**
     * Test the SSLSocket is explicitly closed when there is IOException
     * happened during connect.
     */
    @Test
    public void testSSLSocketClosedWhenHandshakeTimeout() throws Exception {
        final CountDownLatch closeLatch = new CountDownLatch(1);
        QuorumX509Util mockedX509Util = new QuorumX509Util() {
            @Override
            public SSLSocket createSSLSocket() {
                return new SSLSocket() {

                    @Override
                    public void connect(SocketAddress endpoint, int timeout) {
                    }

                    @Override
                    public void startHandshake() throws IOException {
                        throw new IOException();
                    }

                    @Override
                    public void close() {
                        closeLatch.countDown();
                    }

                    public String[] getSupportedCipherSuites() {
                        throw new UnsupportedOperationException();
                    }

                    public String[] getEnabledCipherSuites() {
                        throw new UnsupportedOperationException();
                    }

                    public String[] getSupportedProtocols() {
                        throw new UnsupportedOperationException();
                    }

                    public String[] getEnabledProtocols() {
                        throw new UnsupportedOperationException();
                    }

                    public SSLSession getSession() {
                        throw new UnsupportedOperationException();
                    }

                    public void setEnabledCipherSuites(String[] suites) {
                    }
                    public void setEnabledProtocols(String[] protocols) {
                    }
                    public void addHandshakeCompletedListener(HandshakeCompletedListener listener) {
                    }
                    public void removeHandshakeCompletedListener(HandshakeCompletedListener listener) {
                    }
                    public void setUseClientMode(boolean mode) {
                    }
                    public boolean getUseClientMode() {
                        return true;
                    }
                    public void setNeedClientAuth(boolean need) {
                    }
                    public boolean getNeedClientAuth() {
                        return true;
                    }
                    public void setWantClientAuth(boolean want) {
                    }
                    public boolean getWantClientAuth() {
                        return true;
                    }
                    public void setEnableSessionCreation(boolean flag) {
                    }
                    public boolean getEnableSessionCreation() {
                        return true;
                    }
                };
            }
        };

        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 2000, 2, 2, 2) {
            @Override
            public QuorumX509Util createX509Util() {
                return mockedX509Util;
            }
        };

        peer.setSslQuorum(true);
        QuorumCnxManager cnxManager = peer.createCnxnManager();
        cnxManager.connectOne(1, peers.get(1L).electionAddr);
        assertTrue(closeLatch.await(1, TimeUnit.SECONDS));
    }

    /*
     * Test if Worker threads are getting killed after connection loss
     */
    @Test
    public void testWorkerThreads() throws Exception {
        ArrayList<QuorumPeer> peerList = new ArrayList<>();
        try {
            for (int sid = 0; sid < 3; sid++) {
                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid], peerTmpdir[sid], peerClientPort[sid], 3, sid, 1000, 2, 2, 2);
                LOG.info("Starting peer {}", peer.getMyId());
                peer.start();
                peerList.add(sid, peer);
            }
            String failure = verifyThreadCount(peerList, 4);
            assertNull(failure, failure);
            for (int myid = 0; myid < 3; myid++) {
                for (int i = 0; i < 5; i++) {
                    // halt one of the listeners and verify count
                    QuorumPeer peer = peerList.get(myid);
                    LOG.info("Round {}, halting peer {}", i, peer.getMyId());
                    peer.shutdown();
                    peerList.remove(myid);
                    failure = verifyThreadCount(peerList, 2);
                    assertNull(failure, failure);
                    // Restart halted node and verify count
                    peer = new QuorumPeer(peers, peerTmpdir[myid], peerTmpdir[myid], peerClientPort[myid], 3, myid, 1000, 2, 2, 2);
                    LOG.info("Round {}, restarting peer {}", i, peer.getMyId());
                    peer.start();
                    peerList.add(myid, peer);
                    failure = verifyThreadCount(peerList, 4);
                    assertNull(failure, failure);
                }
            }
        } finally {
            for (QuorumPeer quorumPeer : peerList) {
                quorumPeer.shutdown();
            }
        }
    }

    /**
     * Returns null on success, otw the message assoc with the failure
     * @throws InterruptedException
     */
    public String verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) throws InterruptedException {
        String failure = null;
        for (int i = 0; i < 480; i++) {
            Thread.sleep(500);

            failure = _verifyThreadCount(peerList, ecnt);
            if (failure == null) {
                return null;
            }
        }
        return failure;
    }
    public String _verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) {
        for (int myid = 0; myid < peerList.size(); myid++) {
            QuorumPeer peer = peerList.get(myid);
            QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
            long cnt = cnxManager.getThreadCount();
            if (cnt != ecnt) {
                return new Date()
                       + " Incorrect number of Worker threads for sid=" + myid
                       + " expected " + ecnt
                       + " found " + cnt;
            }
        }
        return null;
    }

    @Test
    public void testInitialMessage() throws Exception {
        InitialMessage msg;
        ByteArrayOutputStream bos;
        DataInputStream din;
        DataOutputStream dout;
        String hostport;

        // message with bad protocol version
        try {

            // the initial message (without the protocol version)
            hostport = "10.0.0.2:3888";
            bos = new ByteArrayOutputStream();
            dout = new DataOutputStream(bos);
            dout.writeLong(5L); // sid
            dout.writeInt(hostport.getBytes().length);
            dout.writeBytes(hostport);

            // now parse it
            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
            msg = InitialMessage.parse(-65530L, din);
            fail("bad protocol version accepted");
        } catch (InitialMessage.InitialMessageException ex) {
        }

        // message too long
        try {

            hostport = createLongString(1048576);
            bos = new ByteArrayOutputStream();
            dout = new DataOutputStream(bos);
            dout.writeLong(5L); // sid
            dout.writeInt(hostport.getBytes().length);
            dout.writeBytes(hostport);

            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V1, din);
            fail("long message accepted");
        } catch (InitialMessage.InitialMessageException ex) {
        }

        // bad hostport string
        try {

            hostport = "what's going on here?";
            bos = new ByteArrayOutputStream();
            dout = new DataOutputStream(bos);
            dout.writeLong(5L); // sid
            dout.writeInt(hostport.getBytes().length);
            dout.writeBytes(hostport);

            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V1, din);
            fail("bad hostport accepted");
        } catch (InitialMessage.InitialMessageException ex) {
        }

        // good message, single election address
        try {

            hostport = "10.0.0.2:3888";
            bos = new ByteArrayOutputStream();
            dout = new DataOutputStream(bos);
            dout.writeLong(5L); // sid
            dout.writeInt(hostport.getBytes().length);
            dout.writeBytes(hostport);

            // now parse it
            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V1, din);
            assertEquals(Long.valueOf(5), msg.sid);
            assertEquals(Arrays.asList(new InetSocketAddress("10.0.0.2", 3888)), msg.electionAddr);
        } catch (InitialMessage.InitialMessageException ex) {
            fail(ex.toString());
        }

        // good message, multiple election addresses (ZOOKEEPER-3188)
        try {

            hostport = "1.1.1.1:9999|2.2.2.2:8888|3.3.3.3:7777";
            bos = new ByteArrayOutputStream();
            dout = new DataOutputStream(bos);
            dout.writeLong(5L); // sid
            dout.writeInt(hostport.getBytes().length);
            dout.writeBytes(hostport);

            // now parse it
            din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
            msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V2, din);
            assertEquals(Long.valueOf(5), msg.sid);
            assertEquals(Arrays.asList(new InetSocketAddress("1.1.1.1", 9999),
                                       new InetSocketAddress("2.2.2.2", 8888),
                                       new InetSocketAddress("3.3.3.3", 7777)),
                         msg.electionAddr);
        } catch (InitialMessage.InitialMessageException ex) {
            fail(ex.toString());
        }
    }

    @Test
    public void testWildcardAddressRecognition() {
        assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("0.0.0.0"));
        assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("::"));
        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("some.unresolvable.host.com"));
        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("127.0.0.1"));
        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("255.255.255.255"));
        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("1.2.3.4"));
        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("www.google.com"));
    }

    private String createLongString(int size) {
        StringBuilder sb = new StringBuilder(size);
        for (int i = 0; i < size; i++) {
            sb.append('x');
        }
        return sb.toString();
    }

}