ClusterCommandExecutorTest.java

package redis.clients.jedis;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;

public class ClusterCommandExecutorTest {

  private static final Duration ONE_SECOND = Duration.ofSeconds(1);

  private static final CommandObject<String> STR_COM_OBJECT
      = new CommandObject<>(new ClusterCommandArguments(null).key(""), null);

  @Test
  public void runSuccessfulExecute() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) {
      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        return (T) "foo";
      }
      @Override
      protected void sleep(long ignored) {
        throw new RuntimeException("This test should never sleep");
      }
    };
    assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));
  }

  @Test
  public void runFailOnFirstExecSuccessOnSecondExec() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND) {
      boolean isFirstCall = true;

      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        if (isFirstCall) {
          isFirstCall = false;
          throw new JedisConnectionException("Borkenz");
        }

        return (T) "foo";
      }

      @Override
      protected void sleep(long ignored) {
        throw new RuntimeException("This test should never sleep");
      }
    };

    assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));
  }

  @Test
  public void runAlwaysFailing() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
    final LongConsumer sleep = mock(LongConsumer.class);
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, ONE_SECOND) {
      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        throw new JedisConnectionException("Connection failed");
      }

      @Override
      protected void sleep(long sleepMillis) {
        sleep.accept(sleepMillis);
      }
    };

    try {
      testMe.executeCommand(STR_COM_OBJECT);
      fail("cluster command did not fail");
    } catch (JedisClusterOperationException e) {
      // expected
    }
    InOrder inOrder = inOrder(connectionHandler, sleep);
    inOrder.verify(connectionHandler, times(2)).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verify(sleep).accept(ArgumentMatchers.anyLong());
    inOrder.verify(connectionHandler).renewSlotCache();
    inOrder.verify(connectionHandler).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verifyNoMoreInteractions();
  }

  @Test
  public void runMovedSuccess() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
    final HostAndPort movedTarget = new HostAndPort(null, 0);
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND) {
      boolean isFirstCall = true;

      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        if (isFirstCall) {
          isFirstCall = false;

          // Slot 0 moved
          throw new JedisMovedDataException("", movedTarget, 0);
        }

        return (T) "foo";
      }

      @Override
      protected void sleep(long ignored) {
        throw new RuntimeException("This test should never sleep");
      }
    };

    assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));

    InOrder inOrder = inOrder(connectionHandler);
    inOrder.verify(connectionHandler).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verify(connectionHandler).renewSlotCache(ArgumentMatchers.any());
    inOrder.verify(connectionHandler).getConnection(movedTarget);
    inOrder.verifyNoMoreInteractions();
  }

  @Test
  public void runAskSuccess() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
    Connection connection = mock(Connection.class);
    final HostAndPort askTarget = new HostAndPort(null, 0);
    when(connectionHandler.getConnection(askTarget)).thenReturn(connection);

    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10, ONE_SECOND) {
      boolean isFirstCall = true;

      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        if (isFirstCall) {
          isFirstCall = false;

          // Slot 0 moved
          throw new JedisAskDataException("", askTarget, 0);
        }

        return (T) "foo";
      }

      @Override
      protected void sleep(long ignored) {
        throw new RuntimeException("This test should never sleep");
      }
    };

    assertEquals("foo", testMe.executeCommand(STR_COM_OBJECT));

    InOrder inOrder = inOrder(connectionHandler, connection);
    inOrder.verify(connectionHandler).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verify(connectionHandler).getConnection(askTarget);
    // inOrder.verify(connection).asking();
    inOrder.verify(connection).close(); // From the finally clause in runWithRetries()
    inOrder.verifyNoMoreInteractions();
  }

  // requires 'execute(Connection connection, CommandObject<T> commandObject)' separately
  @Test
  public void runMovedThenAllNodesFailing() {
    // Test:
    // First attempt is a JedisMovedDataException() move, because we asked the wrong node.
    // All subsequent attempts are JedisConnectionExceptions, because all nodes are now down.
    // In response to the JedisConnectionExceptions, run() retries random nodes until maxAttempts is
    // reached.
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);

    final Connection redirecter = mock(Connection.class);
    when(connectionHandler.getConnection(STR_COM_OBJECT.getArguments())).thenReturn(redirecter);

    final Connection failer = mock(Connection.class);
    when(connectionHandler.getConnection(ArgumentMatchers.any(HostAndPort.class))).thenReturn(failer);
    Mockito.doAnswer((InvocationOnMock invocation) -> {
      when(connectionHandler.getConnection(STR_COM_OBJECT.getArguments())).thenReturn(failer);
      return null;
    }).when(connectionHandler).renewSlotCache();

    final LongConsumer sleep = mock(LongConsumer.class);
    final HostAndPort movedTarget = new HostAndPort(null, 0);
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 5, ONE_SECOND) {
      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        if (redirecter == connection) {
          // First attempt, report moved
          throw new JedisMovedDataException("Moved", movedTarget, 0);
        }

        if (failer == connection) {
          // Second attempt in response to the move, report failure
          throw new JedisConnectionException("Connection failed");
        }

        throw new IllegalStateException("Should have thrown jedis exception");
      }

      @Override
      protected void sleep(long sleepMillis) {
        sleep.accept(sleepMillis);
      }
    };

    try {
      testMe.executeCommand(STR_COM_OBJECT);
      fail("cluster command did not fail");
    } catch (JedisClusterOperationException e) {
      // expected
    }
    InOrder inOrder = inOrder(connectionHandler, sleep);
    inOrder.verify(connectionHandler).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verify(connectionHandler).renewSlotCache(redirecter);
    inOrder.verify(connectionHandler, times(2)).getConnection(movedTarget);
    inOrder.verify(sleep).accept(ArgumentMatchers.anyLong());
    inOrder.verify(connectionHandler).renewSlotCache();
    inOrder.verify(connectionHandler, times(2)).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verify(sleep).accept(ArgumentMatchers.anyLong());
    inOrder.verify(connectionHandler).renewSlotCache();
    inOrder.verifyNoMoreInteractions();
  }

  // requires 'execute(Connection connection, CommandObject<T> commandObject)' separately
  @Test
  public void runMasterFailingReplicaRecovering() {
    // We have two nodes, master and replica, and master has just gone down permanently.
    //
    // Test:
    // 1. We try to contact master => JedisConnectionException
    // 2. We try to contact master => JedisConnectionException
    // 3. sleep and renew
    // 4. We try to contact replica => Success, because it has now failed over

    final Connection master = mock(Connection.class);
    when(master.toString()).thenReturn("master");

    final Connection replica = mock(Connection.class);
    when(replica.toString()).thenReturn("replica");

    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);

    when(connectionHandler.getConnection(STR_COM_OBJECT.getArguments())).thenReturn(master);

    Mockito.doAnswer((InvocationOnMock invocation) -> {
      when(connectionHandler.getConnection(STR_COM_OBJECT.getArguments())).thenReturn(replica);
      return null;
    }).when(connectionHandler).renewSlotCache();

    final AtomicLong totalSleepMs = new AtomicLong();
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 5, ONE_SECOND) {

      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        assertNotNull(connection);

        if (connection.toString().equals("master")) {
          throw new JedisConnectionException("Master is down");
        }

        assert connection.toString().equals("replica");

        return (T) "Success!";
      }

      @Override
      protected void sleep(long sleepMillis) {
        // assert sleepMillis > 0;
        totalSleepMs.addAndGet(sleepMillis);
      }
    };

    assertEquals("Success!", testMe.executeCommand(STR_COM_OBJECT));
    InOrder inOrder = inOrder(connectionHandler);
    inOrder.verify(connectionHandler, times(2)).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verify(connectionHandler).renewSlotCache();
    inOrder.verify(connectionHandler).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verifyNoMoreInteractions();
    MatcherAssert.assertThat(totalSleepMs.get(), Matchers.greaterThan(0L));
  }

  @Test
  public void runRethrowsJedisNoReachableClusterNodeException() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);
    when(connectionHandler.getConnection(STR_COM_OBJECT.getArguments())).thenThrow(
        JedisClusterOperationException.class);

    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 10,
        Duration.ZERO) {
      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        return null;
      }

      @Override
      protected void sleep(long ignored) {
        throw new RuntimeException("This test should never sleep");
      }
    };

    assertThrows(JedisClusterOperationException.class, () -> testMe.executeCommand(STR_COM_OBJECT));
  }

  @Test
  public void runStopsRetryingAfterTimeout() {
    ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class);

    //final LongConsumer sleep = mock(LongConsumer.class);
    final AtomicLong totalSleepMs = new AtomicLong();
    ClusterCommandExecutor testMe = new ClusterCommandExecutor(connectionHandler, 3, Duration.ZERO) {
      @Override
      public <T> T execute(Connection connection, CommandObject<T> commandObject) {
        try {
          // exceed deadline
          Thread.sleep(2L);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
        throw new JedisConnectionException("Connection failed");
      }

      @Override
      protected void sleep(long sleepMillis) {
        //sleep.accept(sleepMillis);
        totalSleepMs.addAndGet(sleepMillis);
      }
    };

    try {
      testMe.executeCommand(STR_COM_OBJECT);
      fail("cluster command did not fail");
    } catch (JedisClusterOperationException e) {
      // expected
    }
    //InOrder inOrder = inOrder(connectionHandler, sleep);
    InOrder inOrder = inOrder(connectionHandler);
    inOrder.verify(connectionHandler).getConnection(STR_COM_OBJECT.getArguments());
    inOrder.verifyNoMoreInteractions();
    assertEquals(0L, totalSleepMs.get());
  }
}