JedisPubSubBaseTest.java

package redis.clients.jedis;

import org.junit.jupiter.api.Test;
import redis.clients.jedis.util.SafeEncoder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE;

public class JedisPubSubBaseTest  {
    @Test
    public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
        // setup
        final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {

            @Override
            public void onMessage(String channel, String message) {
                fail("this should not happen when thread is interrupted");
            }

            @Override
            protected String encode(byte[] raw) {
                return SafeEncoder.encode(raw);
            }
        };

        final Connection mockConnection = mock(Connection.class);
        final List<Object> mockSubscribe = Arrays.asList(
                SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
        );
        final List<Object> mockResponse = Arrays.asList(
                MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
        );

        when(mockConnection.getUnflushedObject()).
                thenReturn(mockSubscribe, mockResponse);


        // action
        final AtomicReference<Throwable> workerError = new AtomicReference<>();
        final Thread thread = new Thread(() -> {
            Thread.currentThread().interrupt();
            try {
                pubSub.proceed(mockConnection, "channel");
            } catch (Throwable t) {
                workerError.set(t);
            }
        }, "pubsub-interrupt-test");
        thread.start();

        thread.join(TimeUnit.SECONDS.toMillis(1));
        assertFalse(thread.isAlive(),
            "proceed() should return promptly when the calling thread is interrupted");
        if (workerError.get() != null) {
            throw new AssertionError("proceed() threw unexpectedly", workerError.get());
        }
    }
}