DataDefinitionExecution.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;
import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.Statement;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public abstract class DataDefinitionExecution<T extends Statement>
implements QueryExecution
{
protected final T statement;
protected final TransactionManager transactionManager;
protected final Metadata metadata;
protected final AccessControl accessControl;
protected final QueryStateMachine stateMachine;
protected final List<Expression> parameters;
private final String slug;
private final int retryCount;
private final AtomicReference<Optional<ResourceGroupQueryLimits>> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty());
protected DataDefinitionExecution(
T statement,
String slug,
int retryCount,
TransactionManager transactionManager,
Metadata metadata,
AccessControl accessControl,
QueryStateMachine stateMachine,
List<Expression> parameters)
{
this.statement = requireNonNull(statement, "statement is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
this.parameters = parameters;
this.slug = requireNonNull(slug, "slug is null");
this.retryCount = retryCount;
}
@Override
public String getSlug()
{
return slug;
}
@Override
public int getRetryCount()
{
return retryCount;
}
@Override
public VersionedMemoryPoolId getMemoryPool()
{
return stateMachine.getMemoryPool();
}
@Override
public void setMemoryPool(VersionedMemoryPoolId poolId)
{
stateMachine.setMemoryPool(poolId);
}
@Override
public Session getSession()
{
return stateMachine.getSession();
}
@Override
public long getUserMemoryReservationInBytes()
{
return 0L;
}
@Override
public long getTotalMemoryReservationInBytes()
{
return 0L;
}
@Override
public long getCreateTimeInMillis()
{
return stateMachine.getCreateTimeInMillis();
}
@Override
public long getExecutionStartTimeInMillis()
{
return stateMachine.getExecutionStartTimeInMillis();
}
@Override
public long getLastHeartbeatInMillis()
{
return stateMachine.getLastHeartbeatInMillis();
}
@Override
public long getEndTimeInMillis()
{
return stateMachine.getEndTimeInMillis();
}
@Override
public Duration getTotalCpuTime()
{
return new Duration(0, NANOSECONDS);
}
@Override
public long getRawInputDataSizeInBytes()
{
return 0L;
}
@Override
public long getWrittenIntermediateDataSizeInBytes()
{
return 0L;
}
@Override
public long getOutputPositions()
{
return 0;
}
@Override
public long getOutputDataSizeInBytes()
{
return 0L;
}
@Override
public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
{
return resourceGroupQueryLimits.get();
}
@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{
if (!this.resourceGroupQueryLimits.compareAndSet(Optional.empty(), Optional.of(requireNonNull(resourceGroupQueryLimits, "resourceGroupQueryLimits is null")))) {
throw new IllegalStateException("Cannot set resourceGroupQueryLimits more than once");
}
}
@Override
public BasicQueryInfo getBasicQueryInfo()
{
return stateMachine.getFinalQueryInfo()
.map(BasicQueryInfo::new)
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty()));
}
@Override
public int getRunningTaskCount()
{
return stateMachine.getCurrentRunningTaskCount();
}
@Override
public void start()
{
try {
// transition to running
if (!stateMachine.transitionToRunning()) {
// query already running or finished
return;
}
ListenableFuture<?> future = executeTask();
Futures.addCallback(future, new FutureCallback<Object>()
{
@Override
public void onSuccess(@Nullable Object result)
{
stateMachine.transitionToFinishing();
}
@Override
public void onFailure(Throwable throwable)
{
fail(throwable);
}
}, directExecutor());
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
}
}
@Override
public void addOutputInfoListener(Consumer<QueryOutputInfo> listener)
{
// DDL does not have an output
}
@Override
public ListenableFuture<QueryState> getStateChange(QueryState currentState)
{
return stateMachine.getStateChange(currentState);
}
@Override
public void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener)
{
stateMachine.addStateChangeListener(stateChangeListener);
}
@Override
public void addFinalQueryInfoListener(StateChangeListener<QueryInfo> stateChangeListener)
{
stateMachine.addQueryInfoStateChangeListener(stateChangeListener);
}
@Override
public void fail(Throwable cause)
{
stateMachine.transitionToFailed(cause);
stateMachine.updateQueryInfo(Optional.empty());
}
@Override
public boolean isDone()
{
return getState().isDone();
}
@Override
public void cancelQuery()
{
stateMachine.transitionToCanceled();
}
@Override
public void cancelStage(StageId stageId)
{
// no-op
}
@Override
public void recordHeartbeat()
{
stateMachine.recordHeartbeat();
}
@Override
public void pruneExpiredQueryInfo()
{
// no-op
}
@Override
public void pruneFinishedQueryInfo()
{
// no-op
}
@Override
public QueryId getQueryId()
{
return stateMachine.getQueryId();
}
@Override
public QueryInfo getQueryInfo()
{
return stateMachine.getFinalQueryInfo().orElseGet(() -> stateMachine.updateQueryInfo(Optional.empty()));
}
@Override
public Plan getQueryPlan()
{
throw new UnsupportedOperationException();
}
@Override
public QueryState getState()
{
return stateMachine.getQueryState();
}
public List<Expression> getParameters()
{
return parameters;
}
protected abstract ListenableFuture<?> executeTask();
}