RedisPlanStatisticsProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.statistic;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import javax.inject.Inject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RedisPlanStatisticsProvider
implements HistoryBasedPlanStatisticsProvider
{
private static final Logger log = Logger.get(RedisPlanStatisticsProvider.class);
private final RedisProviderApiStats redisProviderApiStats;
private RedisClusterAsyncCommands<String, HistoricalPlanStatistics> commands;
private final long totalFetchTimeoutMillis;
private final long totalSetTimeMillis;
private final long defaultTTLSeconds;
@Inject
public RedisPlanStatisticsProvider(
RedisClusterAsyncCommands<String, HistoricalPlanStatistics> commands,
RedisProviderApiStats redisProviderApiStats,
RedisProviderConfig config)
{
this.commands = commands;
this.totalSetTimeMillis = config.getTotalSetTimeoutMs();
this.totalFetchTimeoutMillis = config.getTotalFetchTimeoutMs();
this.defaultTTLSeconds = config.getDefaultTtlSeconds();
this.redisProviderApiStats = redisProviderApiStats;
}
public RedisPlanStatisticsProvider(RedisProviderApiStats redisProviderApiStats, RedisClusterAsyncCommands<String, HistoricalPlanStatistics> commands, long totalFetchTimeoutMillis, long totalSetTimeoutMillis, long defaultTTLSeconds)
{
this.commands = commands;
this.totalSetTimeMillis = totalSetTimeoutMillis;
this.totalFetchTimeoutMillis = totalFetchTimeoutMillis;
this.defaultTTLSeconds = defaultTTLSeconds;
this.redisProviderApiStats = redisProviderApiStats;
}
@Override
public String getName()
{
return "redis";
}
@Override
public Map<PlanNodeWithHash, HistoricalPlanStatistics> getStats(List<PlanNodeWithHash> planNodesWithHash, long timeoutMillis)
{
Map<PlanNodeWithHash, HistoricalPlanStatistics> result = redisProviderApiStats.execute(() -> {
Map<PlanNodeWithHash, RedisFuture<HistoricalPlanStatistics>> redisFutureMap = new HashMap<>();
for (PlanNodeWithHash planNodeWithHash : planNodesWithHash) {
if (!planNodeWithHash.getHash().isPresent()) {
continue;
}
RedisFuture<HistoricalPlanStatistics> future = commands.get(planNodeWithHash.getHash().get());
redisFutureMap.put(planNodeWithHash, future);
}
LettuceFutures.awaitAll(Duration.ofMillis(totalFetchTimeoutMillis), redisFutureMap.values().toArray(new RedisFuture[redisFutureMap.values().size()]));
Map<PlanNodeWithHash, HistoricalPlanStatistics> output = new HashMap<>();
for (Map.Entry<PlanNodeWithHash, RedisFuture<HistoricalPlanStatistics>> redisFutureEntry : redisFutureMap.entrySet()) {
if (redisFutureEntry.getValue().isDone()) {
try {
output.put(redisFutureEntry.getKey(), redisFutureEntry.getValue().get());
}
catch (Exception e) {
// This exception will be handled by the RedisProviderApiStats which will increment metrics and not fail the query if stats not found
log.error(String.format("Error reading done Redis future for key %s", redisFutureEntry.getKey().toString()));
throw e;
}
}
else {
// This exception will be handled by the RedisProviderApiStats which will increment metrics and not fail the query if stats not found
// cancel
log.debug(String.format("Redis Timeout: Couldn't receive stats for key %s from redis", redisFutureEntry.getKey().toString()));
redisFutureEntry.getValue().cancel(true);
}
}
return output;
}, RedisProviderApiStats.Operation.FetchStats);
return (result == null) ? ImmutableMap.of() : result;
}
@Override
public void putStats(Map<PlanNodeWithHash, HistoricalPlanStatistics> hashesAndStatistics)
{
redisProviderApiStats.execute(() -> {
List<RedisFuture> redisFuturesList = new ArrayList<>();
hashesAndStatistics.forEach((k, v) -> {
if (k.getHash().isPresent()) {
redisFuturesList.add(commands.setex(k.getHash().get(), defaultTTLSeconds, v));
}
});
LettuceFutures.awaitAll(Duration.ofMillis(totalSetTimeMillis), redisFuturesList.toArray(new RedisFuture[redisFuturesList.size()]));
for (RedisFuture redisFuture : redisFuturesList) {
if (!redisFuture.isDone()) {
log.error("Redis Timeout: Couldn't put stats within timeout");
redisFuture.cancel(true);
}
}
return null;
}, RedisProviderApiStats.Operation.PutStats);
}
// Only Visible for integration testing
@VisibleForTesting
public void resetConnection(RedisClusterAsyncCommands<String, HistoricalPlanStatistics> cmds)
{
commands = cmds;
}
}