RemoteProjectOperator.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.function.SqlFunctionResult;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.InputReferenceExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class RemoteProjectOperator
implements Operator
{
private final OperatorContext operatorContext;
private final FunctionAndTypeManager functionAndTypeManager;
private final List<RowExpression> projections;
private final CompletableFuture<SqlFunctionResult>[] result;
private boolean finishing;
private RemoteProjectOperator(OperatorContext operatorContext, FunctionAndTypeManager functionAndTypeManager, List<RowExpression> projections)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null");
this.projections = ImmutableList.copyOf(requireNonNull(projections, "projections is null"));
this.result = new CompletableFuture[projections.size()];
}
@Override
public OperatorContext getOperatorContext()
{
return operatorContext;
}
@Override
public boolean needsInput()
{
return !finishing && !processingPage();
}
@Override
public void addInput(Page page)
{
checkState(!finishing, "Operator is already finishing");
checkState(!processingPage(), "Still processing previous input");
requireNonNull(page, "page is null");
for (int channel = 0; channel < projections.size(); channel++) {
RowExpression projection = projections.get(channel);
if (projection instanceof InputReferenceExpression) {
result[channel] = completedFuture(new SqlFunctionResult(page.getBlock(((InputReferenceExpression) projection).getField()), 0));
}
else if (projection instanceof CallExpression) {
CallExpression remoteCall = (CallExpression) projection;
result[channel] = functionAndTypeManager.executeFunction(
operatorContext.getDriverContext().getTaskId().toString(),
remoteCall.getFunctionHandle(),
page,
remoteCall.getArguments().stream()
.map(InputReferenceExpression.class::cast)
.map(InputReferenceExpression::getField)
.collect(toImmutableList()));
}
else {
checkState(projection instanceof ConstantExpression, format("Does not expect expression type %s", projection.getClass()));
}
}
}
@Override
public Page getOutput()
{
if (resultReady()) {
Block[] blocks = new Block[result.length];
Page output;
try {
for (int i = 0; i < blocks.length; i++) {
blocks[i] = result[i].get().getResult();
operatorContext.recordAdditionalCpu(MILLISECONDS.toNanos(result[i].get().getCpuTimeMs()));
}
output = new Page(blocks);
Arrays.fill(result, null);
return output;
}
catch (InterruptedException ie) {
currentThread().interrupt();
throw new RuntimeException(ie);
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfUnchecked(cause);
throw new PrestoException(GENERIC_INTERNAL_ERROR, cause);
}
throw new PrestoException(GENERIC_INTERNAL_ERROR, e);
}
}
return null;
}
@Override
public void finish()
{
finishing = true;
}
@Override
public boolean isFinished()
{
return finishing && !processingPage();
}
private boolean processingPage()
{
// Array result will be filled with nulls when getOutput() produce output.
// If result has non-null values that means input page is in processing.
for (int i = 0; i < result.length; i++) {
if (result[i] != null) {
return true;
}
}
return false;
}
private boolean resultReady()
{
for (int i = 0; i < result.length; i++) {
if (result[i] == null || !result[i].isDone()) {
return false;
}
}
return true;
}
public static class RemoteProjectOperatorFactory
implements OperatorFactory
{
private final int operatorId;
private final PlanNodeId planNodeId;
private final FunctionAndTypeManager functionAndTypeManager;
private final List<RowExpression> projections;
private boolean closed;
public RemoteProjectOperatorFactory(int operatorId, PlanNodeId planNodeId, FunctionAndTypeManager functionAndTypeManager, List<RowExpression> projections)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionManager is null");
this.projections = ImmutableList.copyOf(requireNonNull(projections, "projections is null"));
}
@Override
public Operator createOperator(DriverContext driverContext)
{
checkState(!closed, "Factory is already closed");
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, RemoteProjectOperator.class.getSimpleName());
return new RemoteProjectOperator(operatorContext, functionAndTypeManager, projections);
}
@Override
public void noMoreOperators()
{
closed = true;
}
@Override
public OperatorFactory duplicate()
{
return new RemoteProjectOperatorFactory(operatorId, planNodeId, functionAndTypeManager, projections);
}
}
}