ThriftTaskService.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.thrift;
import com.facebook.drift.annotations.ThriftMethod;
import com.facebook.drift.annotations.ThriftService;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskManager;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
import com.facebook.presto.execution.buffer.PageBufferInfo;
import com.facebook.presto.execution.buffer.ThriftBufferResult;
import com.facebook.presto.server.ForAsyncRpc;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import javax.inject.Inject;
import java.util.concurrent.ScheduledExecutorService;
import static com.facebook.airlift.concurrent.MoreFutures.addTimeout;
import static com.facebook.presto.util.TaskUtils.DEFAULT_MAX_WAIT_TIME;
import static com.facebook.presto.util.TaskUtils.randomizeWaitTime;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
// TODO: the server currently only supports exchange; more end points (for /v1/task) should be supported
@ThriftService(value = "presto-task", idlName = "ThriftTaskService")
public class ThriftTaskService
{
private final TaskManager taskManager;
private final ScheduledExecutorService timeoutExecutor;
@Inject
public ThriftTaskService(TaskManager taskManager, @ForAsyncRpc ScheduledExecutorService timeoutExecutor)
{
this.taskManager = requireNonNull(taskManager, "taskManager is null");
this.timeoutExecutor = requireNonNull(timeoutExecutor, "timeoutExecutor is null");
}
@ThriftMethod
public ListenableFuture<ThriftBufferResult> getResults(TaskId taskId, OutputBufferId bufferId, long token, long maxSizeInBytes)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
ListenableFuture<BufferResult> bufferResultFuture = taskManager.getTaskResults(taskId, bufferId, token, maxSizeInBytes);
Duration waitTime = randomizeWaitTime(DEFAULT_MAX_WAIT_TIME);
bufferResultFuture = addTimeout(
bufferResultFuture,
() -> BufferResult.emptyResults(
taskManager.getTaskInstanceId(taskId),
token,
taskManager.getOutputBufferInfo(taskId).getBuffers().stream()
.filter(info -> info.getBufferId().equals(bufferId))
.map(BufferInfo::getPageBufferInfo)
.map(PageBufferInfo::getBufferedBytes)
.findFirst()
.orElse(0L),
false),
waitTime,
timeoutExecutor);
return Futures.transform(
bufferResultFuture,
ThriftBufferResult::fromBufferResult,
directExecutor());
}
@ThriftMethod
public ListenableFuture<Void> acknowledgeResults(TaskId taskId, OutputBufferId bufferId, long token)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
taskManager.acknowledgeTaskResults(taskId, bufferId, token);
return Futures.immediateFuture(null);
}
@ThriftMethod
public ListenableFuture<Void> abortResults(TaskId taskId, OutputBufferId bufferId)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
// Use thrift server pool to abort tasks; it is dangerous to use a fixed thread pool to abort tasks.
// When having a surge of aborting results, a fixed thread pool may not be able to handle requests fast enough causing query to hang.
// TaskManager does not support async calls with a thread pool.
// Even getTaskResults is a fake async call with an immediate future wrapping around ClientBuffer::processRead.
// It might worth exploring true async RPC for /v1/task endpoint
taskManager.abortTaskResults(taskId, bufferId);
return Futures.immediateFuture(null);
}
}