MultiThreadedFakeApp.java

package redis.clients.jedis.scenario;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MultiThreadedFakeApp extends FakeApp {

  static final int QUEUE_CAPACITY = 100000;

  private final ExecutorService executorService;
  private final RateLimiter rateLimiter;

  public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action, int numThreads) {
    this(client, action, numThreads, null);
  }

  public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action, int numThreads, RateLimiterConfig config) {
    super(client, action);
    this.executorService = new ThreadPoolExecutor(
        numThreads,
        numThreads,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY),
        new ThreadPoolExecutor.CallerRunsPolicy()
    );
    if (config != null) {
      this.rateLimiter = RateLimiterRegistry.of(config).rateLimiter("fakeAppLimiter");
    } else {
      this.rateLimiter = null;
    }
  }

  @Override
  public void run() {
    log.info("Starting FakeApp");

    int checkEachSeconds = 5;
    int timeoutSeconds = 120;

    while (actionResponse == null || !actionResponse.isCompleted(
        Duration.ofSeconds(checkEachSeconds), Duration.ofSeconds(keepExecutingForSeconds),
        Duration.ofSeconds(timeoutSeconds))) {
      try {
        if (rateLimiter != null) {
          RateLimiter.waitForPermission(rateLimiter);
        }
        executorService.submit(() -> action.run(client));
      } catch (JedisConnectionException e) {
        log.error("Error executing action", e);
        exceptions.add(e);
      }
    }

    executorService.shutdown();

    try {
      if (!executorService.awaitTermination(keepExecutingForSeconds, TimeUnit.SECONDS)) {
        executorService.shutdownNow();
      }
    } catch (InterruptedException e) {
      log.error("Error waiting for executor service to terminate", e);
    }
  }
}