ResourceManagerClusterStateProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo;
import com.facebook.presto.memory.ClusterMemoryPool;
import com.facebook.presto.memory.MemoryInfo;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.BasicQueryStats;
import com.facebook.presto.server.NodeStatus;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.airlift.units.Duration;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.facebook.presto.SystemSessionProperties.resourceOvercommit;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Stream.concat;
public class ResourceManagerClusterStateProvider
{
private final Map<String, CoordinatorQueriesState> nodeQueryStates = new ConcurrentHashMap<>();
private final Map<String, InternalNodeState> nodeStatuses = new ConcurrentHashMap<>();
private final Map<String, CoordinatorResourceGroupState> resourceGroupStates = new ConcurrentHashMap<>();
private final AtomicReference<Integer> adjustedQueueSize = new AtomicReference<>(0);
private final InternalNodeManager internalNodeManager;
private final SessionPropertyManager sessionPropertyManager;
private final int maxCompletedQueries;
private final Duration queryExpirationTimeout;
private final Duration completedQueryExpirationTimeout;
private final boolean isReservedPoolEnabled;
private final Supplier<Map<MemoryPoolId, ClusterMemoryPoolInfo>> clusterMemoryPoolInfosSupplier;
@Inject
public ResourceManagerClusterStateProvider(
InternalNodeManager internalNodeManager,
SessionPropertyManager sessionPropertyManager,
ResourceManagerConfig resourceManagerConfig,
NodeMemoryConfig nodeMemoryConfig,
@ForResourceManager ScheduledExecutorService scheduledExecutorService)
{
this(
requireNonNull(internalNodeManager, "internalNodeManager is null"),
requireNonNull(sessionPropertyManager, "sessionPropertyManager is null"),
requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getMaxCompletedQueries(),
resourceManagerConfig.getQueryExpirationTimeout(),
resourceManagerConfig.getCompletedQueryExpirationTimeout(),
resourceManagerConfig.getNodeStatusTimeout(),
resourceManagerConfig.getMemoryPoolInfoRefreshDuration(),
resourceManagerConfig.getResourceGroupRuntimeInfoTimeout(),
requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null").isReservedPoolEnabled(),
requireNonNull(scheduledExecutorService, "scheduledExecutorService is null"));
}
public ResourceManagerClusterStateProvider(
InternalNodeManager internalNodeManager,
SessionPropertyManager sessionPropertyManager,
int maxCompletedQueries,
Duration queryExpirationTimeout,
Duration completedQueryExpirationTimeout,
Duration nodeStatusTimeout,
Duration memoryPoolInfoRefreshDuration,
Duration resourceGroupRuntimeInfoTimeout,
boolean isReservedPoolEnabled,
ScheduledExecutorService scheduledExecutorService)
{
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
checkArgument(maxCompletedQueries > 0, "maxCompletedQueries must be > 0, was %s", maxCompletedQueries);
this.sessionPropertyManager = requireNonNull(sessionPropertyManager, "sessionPropertyManager is null");
this.maxCompletedQueries = maxCompletedQueries;
this.queryExpirationTimeout = requireNonNull(queryExpirationTimeout, "queryExpirationTimeout is null");
this.completedQueryExpirationTimeout = requireNonNull(completedQueryExpirationTimeout, "completedQueryExpirationTimeout is null");
// Memoized suppliers take in a time unit > 0
requireNonNull(memoryPoolInfoRefreshDuration, "memoryPoolInfoRefreshDuration is null");
if (memoryPoolInfoRefreshDuration.toMillis() > 0) {
this.clusterMemoryPoolInfosSupplier = Suppliers.memoizeWithExpiration(this::getClusterMemoryPoolInfoInternal, memoryPoolInfoRefreshDuration.toMillis(), MILLISECONDS);
}
else {
this.clusterMemoryPoolInfosSupplier = this::getClusterMemoryPoolInfoInternal;
}
this.isReservedPoolEnabled = isReservedPoolEnabled;
requireNonNull(scheduledExecutorService, "scheduledExecutorService is null");
scheduledExecutorService.scheduleAtFixedRate(() -> {
for (CoordinatorQueriesState coordinatorQueriesState : ImmutableList.copyOf(nodeQueryStates.values())) {
coordinatorQueriesState.purgeExpiredQueries();
}
}, 100, 100, MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
for (Map.Entry<String, InternalNodeState> nodeEntry : ImmutableList.copyOf(nodeStatuses.entrySet())) {
if ((System.currentTimeMillis() - nodeEntry.getValue().getLastHeartbeatInMillis()) > nodeStatusTimeout.toMillis()) {
nodeStatuses.remove(nodeEntry.getKey());
}
}
}, 100, 100, MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
for (Map.Entry<String, CoordinatorResourceGroupState> resourceGroupState : ImmutableList.copyOf(resourceGroupStates.entrySet())) {
if ((System.currentTimeMillis() - resourceGroupState.getValue().getLastHeartbeatInMillis()) > resourceGroupRuntimeInfoTimeout.toMillis()) {
resourceGroupStates.remove(resourceGroupState.getKey());
}
}
}, 100, 100, MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
adjustedQueueSize.set(computeAdjustedQueueSize());
}, 100, 1000, MILLISECONDS);
}
public void registerQueryHeartbeat(String nodeId, BasicQueryInfo basicQueryInfo, long sequenceId)
{
requireNonNull(nodeId, "nodeId is null");
requireNonNull(basicQueryInfo, "basicQueryInfo is null");
Stream<InternalNode> activeOrShuttingDownCoordinators = concat(internalNodeManager.getCoordinators().stream(),
internalNodeManager.getShuttingDownCoordinator().stream());
checkArgument(
activeOrShuttingDownCoordinators.anyMatch(i -> nodeId.equals(i.getNodeIdentifier())),
"%s is not a coordinator (coordinators: %s)",
nodeId,
internalNodeManager.getCoordinators().stream().collect(toImmutableSet()));
CoordinatorQueriesState state = nodeQueryStates.computeIfAbsent(nodeId, identifier -> new CoordinatorQueriesState(
nodeId,
maxCompletedQueries,
queryExpirationTimeout.toMillis(),
completedQueryExpirationTimeout.toMillis()));
state.addOrUpdateQuery(basicQueryInfo, sequenceId);
}
public void registerNodeHeartbeat(NodeStatus nodeStatus)
{
requireNonNull(nodeStatus, "nodeStatus is null");
InternalNodeState nodeState = nodeStatuses.get(nodeStatus.getNodeId());
if (nodeState == null) {
nodeStatuses.put(nodeStatus.getNodeId(), new InternalNodeState(nodeStatus));
}
else {
nodeState.updateNodeStatus(nodeStatus);
}
}
public void registerResourceGroupRuntimeHeartbeat(String node, List<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfos)
{
resourceGroupStates.put(node, new CoordinatorResourceGroupState(node, System.currentTimeMillis(), resourceGroupRuntimeInfos));
}
public int getAdjustedQueueSize()
{
return adjustedQueueSize.get();
}
private int computeAdjustedQueueSize()
{
Map<ResourceGroupId, ResourceGroupRuntimeInfo.Builder> resourceGroupBuilders = new HashMap<>();
resourceGroupStates.values().stream()
.map(CoordinatorResourceGroupState::getResourceGroups)
.flatMap(Collection::stream)
.forEach(resourceGroupRuntimeInfo -> {
ResourceGroupId resourceGroupId = resourceGroupRuntimeInfo.getResourceGroupId();
ResourceGroupRuntimeInfo.Builder runtimeInfoBuilder = resourceGroupBuilders.computeIfAbsent(resourceGroupId, ResourceGroupRuntimeInfo::builder);
runtimeInfoBuilder.addQueuedQueries(resourceGroupRuntimeInfo.getQueuedQueries());
runtimeInfoBuilder.addRunningQueries(resourceGroupRuntimeInfo.getRunningQueries());
runtimeInfoBuilder.addDescendantQueuedQueries(resourceGroupRuntimeInfo.getDescendantQueuedQueries());
runtimeInfoBuilder.addDescendantRunningQueries(resourceGroupRuntimeInfo.getDescendantRunningQueries());
if (resourceGroupRuntimeInfo.getResourceGroupConfigSpec().isPresent()) {
runtimeInfoBuilder.setResourceGroupSpecInfo(resourceGroupRuntimeInfo.getResourceGroupConfigSpec().get());
}
});
List<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfos = resourceGroupBuilders.values().stream().map(ResourceGroupRuntimeInfo.Builder::build).collect(toImmutableList());
int adjustedQueueSize = 0;
for (ResourceGroupRuntimeInfo runtimeInfo : resourceGroupRuntimeInfos) {
checkState(runtimeInfo.getResourceGroupConfigSpec().isPresent());
adjustedQueueSize += Math.max(Math.min(runtimeInfo.getQueuedQueries(), runtimeInfo.getResourceGroupConfigSpec().get().getSoftConcurrencyLimit() - runtimeInfo.getRunningQueries()), 0);
}
return adjustedQueueSize;
}
public List<ResourceGroupRuntimeInfo> getClusterResourceGroups(String excludingNode)
throws ResourceManagerInconsistentException
{
requireNonNull(excludingNode, "excludingNode is null");
validateCoordinatorConsistency();
Map<ResourceGroupId, ResourceGroupRuntimeInfo.Builder> resourceGroupBuilders = new HashMap<>();
nodeQueryStates.values().stream()
.filter(state -> !state.getNodeId().equals(excludingNode))
.map(CoordinatorQueriesState::getActiveQueries)
.flatMap(Collection::stream)
.map(Query::getBasicQueryInfo)
.filter(info -> info.getResourceGroupId().isPresent())
.forEach(info -> {
ResourceGroupId resourceGroupId = info.getResourceGroupId().get();
ResourceGroupRuntimeInfo.Builder builder = resourceGroupBuilders.computeIfAbsent(resourceGroupId, ResourceGroupRuntimeInfo::builder);
if (info.getState() == QUEUED) {
builder.addQueuedQueries(1);
}
else if (!info.getState().isDone() && info.getState() != WAITING_FOR_PREREQUISITES) {
builder.addRunningQueries(1);
}
builder.addUserMemoryReservationBytes(info.getQueryStats().getUserMemoryReservation().toBytes());
while (resourceGroupId.getParent().isPresent()) {
resourceGroupId = resourceGroupId.getParent().get();
ResourceGroupRuntimeInfo.Builder parentBuilder = resourceGroupBuilders.computeIfAbsent(resourceGroupId, ResourceGroupRuntimeInfo::builder);
if (info.getState() == QUEUED) {
parentBuilder.addDescendantQueuedQueries(1);
}
else if (!info.getState().isDone() && info.getState() != WAITING_FOR_PREREQUISITES) {
parentBuilder.addDescendantRunningQueries(1);
}
}
});
return resourceGroupBuilders.values().stream().map(ResourceGroupRuntimeInfo.Builder::build).collect(toImmutableList());
}
public List<BasicQueryInfo> getClusterQueries()
{
return ImmutableList.copyOf(nodeQueryStates.values()).stream()
.map(CoordinatorQueriesState::getAllQueries)
.flatMap(Collection::stream)
.map(Query::getBasicQueryInfo)
.collect(toImmutableList());
}
public int getRunningTaskCount()
{
int runningTaskCount = nodeQueryStates.values().stream()
.map(CoordinatorQueriesState::getActiveQueries)
.flatMap(Collection::stream)
.map(Query::getBasicQueryInfo)
.filter(q -> q.getState() == RUNNING)
.map(BasicQueryInfo::getQueryStats)
.collect(Collectors.summingInt(BasicQueryStats::getRunningTasks));
return runningTaskCount;
}
public Map<MemoryPoolId, ClusterMemoryPoolInfo> getClusterMemoryPoolInfo()
{
return clusterMemoryPoolInfosSupplier.get();
}
private Map<MemoryPoolId, ClusterMemoryPoolInfo> getClusterMemoryPoolInfoInternal()
{
List<MemoryInfo> memoryInfos = nodeStatuses.values().stream()
.map(nodeStatus -> nodeStatus.getNodeStatus().getMemoryInfo())
.collect(toImmutableList());
int queriesAssignedToGeneralPool = 0;
int queriesAssignedToReservedPool = 0;
Query largestGeneralPoolQuery = null;
for (CoordinatorQueriesState nodeQueryState : nodeQueryStates.values()) {
for (Query query : nodeQueryState.getActiveQueries()) {
MemoryPoolId memoryPool = query.getBasicQueryInfo().getMemoryPool();
if (GENERAL_POOL.equals(memoryPool)) {
queriesAssignedToGeneralPool = Math.incrementExact(queriesAssignedToGeneralPool);
if (!resourceOvercommit(query.getBasicQueryInfo().getSession().toSession(sessionPropertyManager))) {
largestGeneralPoolQuery = getLargestMemoryQuery(Optional.ofNullable(largestGeneralPoolQuery), query);
}
}
else if (RESERVED_POOL.equals(memoryPool)) {
queriesAssignedToReservedPool = Math.incrementExact(queriesAssignedToReservedPool);
}
else {
throw new IllegalArgumentException("Unrecognized memory pool: " + memoryPool);
}
}
}
List<QueryId> runningQueries = nodeQueryStates.values().stream()
.map(CoordinatorQueriesState::getActiveQueries)
.flatMap(Collection::stream)
.filter(query -> query.getBasicQueryInfo().getState() == RUNNING)
.map(Query::getQueryId)
.collect(toImmutableList());
ImmutableMap.Builder<MemoryPoolId, ClusterMemoryPoolInfo> memoryPoolInfos = ImmutableMap.builder();
ClusterMemoryPool pool = new ClusterMemoryPool(GENERAL_POOL);
pool.update(memoryInfos, queriesAssignedToGeneralPool);
ClusterMemoryPoolInfo clusterInfo = pool.getClusterInfo(Optional.ofNullable(largestGeneralPoolQuery).map(Query::getQueryId), Optional.ofNullable(runningQueries));
memoryPoolInfos.put(GENERAL_POOL, clusterInfo);
if (isReservedPoolEnabled) {
pool = new ClusterMemoryPool(RESERVED_POOL);
pool.update(memoryInfos, queriesAssignedToReservedPool);
memoryPoolInfos.put(RESERVED_POOL, pool.getClusterInfo());
}
return memoryPoolInfos.build();
}
private Query getLargestMemoryQuery(Optional<Query> existingLargeQuery, Query newQuery)
{
requireNonNull(newQuery, "newQuery must not be null");
return existingLargeQuery
.map(largeQuery -> {
long largestGeneralBytes = largeQuery.getBasicQueryInfo().getQueryStats().getTotalMemoryReservation().toBytes();
long currentGeneralBytes = newQuery.getBasicQueryInfo().getQueryStats().getTotalMemoryReservation().toBytes();
if (currentGeneralBytes > largestGeneralBytes) {
return newQuery;
}
return largeQuery;
})
.orElse(newQuery);
}
public Map<String, MemoryInfo> getWorkerMemoryInfo()
{
return nodeStatuses.entrySet().stream().collect(toImmutableMap(e -> {
String nodeIdentifier = e.getValue().getNodeStatus().getNodeId();
String nodeHost = URI.create(e.getValue().getNodeStatus().getExternalAddress()).getHost();
return nodeIdentifier + " [" + nodeHost + "]";
}, e -> e.getValue().getNodeStatus().getMemoryInfo()));
}
private void validateCoordinatorConsistency()
{
Set<String> coordinators = internalNodeManager.getCoordinators().stream().map(InternalNode::getNodeIdentifier).collect(toImmutableSet());
Set<String> heartbeatedCoordinatorNodes = nodeStatuses.values().stream()
.map(InternalNodeState::getNodeStatus)
.filter(NodeStatus::isCoordinator)
.map(NodeStatus::getNodeId)
.collect(toImmutableSet());
if (!(Sets.difference(coordinators, heartbeatedCoordinatorNodes).isEmpty() && !coordinators.isEmpty())) {
throw new ResourceManagerInconsistentException(format("%s nodes found in discovery vs. %s nodes found in heartbeats", coordinators.size(), heartbeatedCoordinatorNodes.size()));
}
}
private static class CoordinatorResourceGroupState
{
private final String nodeId;
private final long lastHeartbeatInMillis;
private final List<ResourceGroupRuntimeInfo> resourceGroups;
public CoordinatorResourceGroupState(
String nodeId,
long lastHeartbeatInMillis,
List<ResourceGroupRuntimeInfo> resourceGroups)
{
this.nodeId = requireNonNull(nodeId, "nodeId is null");
this.lastHeartbeatInMillis = lastHeartbeatInMillis;
this.resourceGroups = requireNonNull(resourceGroups, "resourceGroups is null");
}
public List<ResourceGroupRuntimeInfo> getResourceGroups()
{
return resourceGroups;
}
public String getNodeId()
{
return nodeId;
}
public long getLastHeartbeatInMillis()
{
return lastHeartbeatInMillis;
}
}
private static class CoordinatorQueriesState
{
private final String nodeId;
private final int maxCompletedQueries;
private final long queryExpirationTimeoutMillis;
private final long completedQueryExpirationTimeoutMillis;
@GuardedBy("this")
private final Map<QueryId, Query> activeQueries = new HashMap<>();
@GuardedBy("this")
private final Map<QueryId, Query> completedQueries = new LinkedHashMap<>();
public CoordinatorQueriesState(
String nodeId,
int maxCompletedQueries,
long queryExpirationTimeoutMillis,
long completedQueryExpirationTimeoutMillis)
{
this.nodeId = requireNonNull(nodeId, "nodeId is null");
checkArgument(maxCompletedQueries > 0);
checkArgument(queryExpirationTimeoutMillis > 0);
checkArgument(completedQueryExpirationTimeoutMillis > 0);
this.maxCompletedQueries = maxCompletedQueries;
this.queryExpirationTimeoutMillis = queryExpirationTimeoutMillis;
this.completedQueryExpirationTimeoutMillis = completedQueryExpirationTimeoutMillis;
}
public synchronized void addOrUpdateQuery(BasicQueryInfo basicQueryInfo, long sequenceId)
{
requireNonNull(basicQueryInfo, "basicQueryInfo is null");
QueryId queryId = basicQueryInfo.getQueryId();
Query query = activeQueries.get(queryId);
if (query == null) {
query = completedQueries.get(queryId);
}
if (query == null) {
query = new Query(basicQueryInfo, sequenceId);
activeQueries.put(queryId, query);
}
else {
query = query.updateQueryInfo(basicQueryInfo, sequenceId);
}
if (isQueryCompleted(query)) {
completedQueries.put(query.getQueryId(), query);
activeQueries.remove(query.getQueryId());
}
}
public synchronized void purgeExpiredQueries()
{
long currentTimeMillis = System.currentTimeMillis();
Iterator<Query> queryIterator = activeQueries.values().iterator();
while (queryIterator.hasNext()) {
Query query = queryIterator.next();
if (isQueryExpired(query, currentTimeMillis, queryExpirationTimeoutMillis)) {
completedQueries.put(query.getQueryId(), query);
queryIterator.remove();
}
}
Iterator<Query> completedQueriesIterator = completedQueries.values().iterator();
while (completedQueriesIterator.hasNext()) {
Query query = completedQueriesIterator.next();
if (completedQueries.size() <= maxCompletedQueries && !isQueryExpired(query, currentTimeMillis, completedQueryExpirationTimeoutMillis)) {
break;
}
completedQueriesIterator.remove();
}
}
public String getNodeId()
{
return nodeId;
}
public synchronized List<Query> getActiveQueries()
{
return ImmutableList.copyOf(activeQueries.values());
}
public synchronized List<Query> getAllQueries()
{
purgeExpiredQueries();
return ImmutableList.<Query>builder().addAll(activeQueries.values()).addAll(completedQueries.values()).build();
}
}
private static final class InternalNodeState
{
private volatile NodeStatus nodeStatus;
private final AtomicLong lastHeartbeatInMillis = new AtomicLong();
private InternalNodeState(NodeStatus nodeStatus)
{
this.nodeStatus = nodeStatus;
recordHeartbeat();
}
private void recordHeartbeat()
{
this.lastHeartbeatInMillis.set(System.currentTimeMillis());
}
public long getLastHeartbeatInMillis()
{
return lastHeartbeatInMillis.get();
}
public InternalNodeState updateNodeStatus(NodeStatus nodeStatus)
{
requireNonNull(nodeStatus, "nodeStatus is null");
this.nodeStatus = nodeStatus;
recordHeartbeat();
return this;
}
public NodeStatus getNodeStatus()
{
return nodeStatus;
}
}
public static class Query
{
private final QueryId queryId;
private volatile BasicQueryInfo basicQueryInfo;
private final AtomicLong lastHeartbeatInMillis = new AtomicLong();
private final AtomicLong sequenceId;
public Query(BasicQueryInfo basicQueryInfo, long sequenceId)
{
this.queryId = basicQueryInfo.getQueryId();
this.basicQueryInfo = basicQueryInfo;
this.sequenceId = new AtomicLong(sequenceId);
recordHeartbeat();
}
private void recordHeartbeat()
{
this.lastHeartbeatInMillis.set(System.currentTimeMillis());
}
public long getLastHeartbeatInMillis()
{
return lastHeartbeatInMillis.get();
}
public Query updateQueryInfo(BasicQueryInfo basicQueryInfo, long sequenceId)
{
requireNonNull(basicQueryInfo, "basicQueryInfo is null");
long newSequenceId = this.sequenceId.updateAndGet(operand -> Math.max(operand, sequenceId));
if (newSequenceId == sequenceId) {
this.basicQueryInfo = basicQueryInfo;
}
recordHeartbeat();
return this;
}
public QueryId getQueryId()
{
return queryId;
}
public BasicQueryInfo getBasicQueryInfo()
{
return basicQueryInfo;
}
}
private static boolean isQueryExpired(Query query, long timeoutInMillis, long timeout)
{
return (timeoutInMillis - query.getLastHeartbeatInMillis()) > timeout;
}
private static boolean isQueryCompleted(Query query)
{
return query.getBasicQueryInfo().getState().isDone();
}
}