JedisPubSubBaseTest.java

package redis.clients.jedis;

import org.junit.jupiter.api.Test;
import redis.clients.jedis.util.SafeEncoder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE;

public class JedisPubSubBaseTest  {
    @Test
    public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
        // setup
        final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

            @Override
            public void onMessage(String channel, String message) {
                fail("this should not happen when thread is interrupted");
            }

            @Override
            protected String encode(byte[] raw) {
                return SafeEncoder.encode(raw);
            }
        };

        final Connection mockConnection = mock(Connection.class);
        final List<Object> mockSubscribe = Arrays.asList(
                SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
        );
        final List<Object> mockResponse = Arrays.asList(
                MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
        );

        when(mockConnection.getUnflushedObject()).

                thenReturn(mockSubscribe, mockResponse);


        final CountDownLatch countDownLatch = new CountDownLatch(1);
        // action
        final Thread thread = new Thread(() -> {
            Thread.currentThread().interrupt();
            pubSub.proceed(mockConnection, "channel");

            countDownLatch.countDown();
        });
        thread.start();

        assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));
    }
}