RestBasedFunctionNamespaceManager.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.rest;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.common.type.UserDefinedType;
import com.facebook.presto.functionNamespace.AbstractSqlInvokedFunctionNamespaceManager;
import com.facebook.presto.functionNamespace.InvalidFunctionHandleException;
import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata;
import com.facebook.presto.functionNamespace.ServingCatalog;
import com.facebook.presto.functionNamespace.SqlInvokedFunctionNamespaceManagerConfig;
import com.facebook.presto.functionNamespace.UdfFunctionSignatureMap;
import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutors;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.AggregationFunctionImplementation;
import com.facebook.presto.spi.function.AlterRoutineCharacteristics;
import com.facebook.presto.spi.function.FunctionHandle;
import com.facebook.presto.spi.function.FunctionMetadata;
import com.facebook.presto.spi.function.FunctionVersion;
import com.facebook.presto.spi.function.Parameter;
import com.facebook.presto.spi.function.RestFunctionHandle;
import com.facebook.presto.spi.function.ScalarFunctionImplementation;
import com.facebook.presto.spi.function.Signature;
import com.facebook.presto.spi.function.SqlFunctionHandle;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.google.common.collect.ImmutableList;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
public class RestBasedFunctionNamespaceManager
extends AbstractSqlInvokedFunctionNamespaceManager
{
private static final Logger log = Logger.get(RestBasedFunctionNamespaceManager.class);
private final RestBasedFunctionApis restApis;
private final List<SqlInvokedFunction> latestFunctions = new ArrayList<>();
private final AtomicReference<Optional<String>> cachedETag = new AtomicReference<>(Optional.empty());
@Inject
public RestBasedFunctionNamespaceManager(
@ServingCatalog String catalogName,
SqlFunctionExecutors sqlFunctionExecutors,
SqlInvokedFunctionNamespaceManagerConfig config,
RestBasedFunctionApis restApis)
{
super(catalogName, sqlFunctionExecutors, config);
this.restApis = requireNonNull(restApis, "restApis is null");
}
@Override
public final AggregationFunctionImplementation getAggregateFunctionImplementation(FunctionHandle functionHandle, TypeManager typeManager)
{
throw new PrestoException(NOT_SUPPORTED, "Aggregate Function is not supported in RestBasedFunctionNamespaceManager");
}
private List<SqlInvokedFunction> getLatestFunctions()
{
// Check if the function list has been modified.
String newETag = restApis.getFunctionsETag();
Optional<String> currentETag = cachedETag.get();
if (newETag != null && currentETag.isPresent() && cachedETag.get().equals(newETag)) {
return latestFunctions;
}
// Clear cached list of functions and get the latest list.
latestFunctions.clear();
UdfFunctionSignatureMap udfFunctionSignatureMap = restApis.getAllFunctions();
if (udfFunctionSignatureMap == null || udfFunctionSignatureMap.isEmpty()) {
return ImmutableList.of();
}
List<SqlInvokedFunction> newFunctions = createSqlInvokedFunctions(udfFunctionSignatureMap);
latestFunctions.addAll(newFunctions);
if (newETag != null) {
cachedETag.set(Optional.of(newETag));
}
return latestFunctions;
}
private List<SqlInvokedFunction> createSqlInvokedFunctions(UdfFunctionSignatureMap udfFunctionSignatureMap)
{
Map<String, List<JsonBasedUdfFunctionMetadata>> udfSignatureMap = udfFunctionSignatureMap.getUDFSignatureMap();
return udfSignatureMap.entrySet().stream()
.flatMap(entry -> entry.getValue().stream()
.map(metaInfo -> createSqlInvokedFunction(entry.getKey(), metaInfo)))
.collect(toImmutableList());
}
private SqlInvokedFunction createSqlInvokedFunction(String functionName, JsonBasedUdfFunctionMetadata jsonBasedUdfFunctionMetaData)
{
QualifiedObjectName qualifiedFunctionName = QualifiedObjectName.valueOf(new CatalogSchemaName(getCatalogName(), jsonBasedUdfFunctionMetaData.getSchema()), functionName);
List<String> parameterNameList = jsonBasedUdfFunctionMetaData.getParamNames();
List<TypeSignature> parameterTypeList = jsonBasedUdfFunctionMetaData.getParamTypes();
ImmutableList.Builder<Parameter> parameterBuilder = ImmutableList.builder();
for (int i = 0; i < parameterNameList.size(); i++) {
parameterBuilder.add(new Parameter(parameterNameList.get(i), parameterTypeList.get(i)));
}
FunctionVersion functionVersion = new FunctionVersion(jsonBasedUdfFunctionMetaData.getVersion());
SqlFunctionId functionId = jsonBasedUdfFunctionMetaData.getFunctionId().orElse(null);
// The function server may return a functionId with a different catalog name. We need to update the catalog name to the current catalog name.
functionId = new SqlFunctionId(
new QualifiedObjectName(
getCatalogName(),
functionId.getFunctionName().getSchemaName(),
functionId.getFunctionName().getObjectName()),
functionId.getArgumentTypes());
return new SqlInvokedFunction(
qualifiedFunctionName,
parameterBuilder.build(),
emptyList(),
emptyList(),
jsonBasedUdfFunctionMetaData.getOutputType(),
jsonBasedUdfFunctionMetaData.getDocString(),
jsonBasedUdfFunctionMetaData.getRoutineCharacteristics(),
"",
jsonBasedUdfFunctionMetaData.getVariableArity(),
functionVersion,
jsonBasedUdfFunctionMetaData.getFunctionKind(),
functionId,
jsonBasedUdfFunctionMetaData.getAggregateMetadata(),
Optional.of(new RestFunctionHandle(
functionId,
functionVersion.toString(),
new Signature(
qualifiedFunctionName,
jsonBasedUdfFunctionMetaData.getFunctionKind(),
jsonBasedUdfFunctionMetaData.getOutputType(),
jsonBasedUdfFunctionMetaData.getParamTypes()),
jsonBasedUdfFunctionMetaData.getExecutionEndpoint())));
}
@Override
protected Collection<SqlInvokedFunction> fetchFunctionsDirect(QualifiedObjectName functionName)
{
UdfFunctionSignatureMap udfFunctionSignatureMap = restApis.getFunctions(functionName.getSchemaName(), functionName.getObjectName());
if (udfFunctionSignatureMap == null || udfFunctionSignatureMap.isEmpty()) {
return ImmutableList.of();
}
List<SqlInvokedFunction> functions = createSqlInvokedFunctions(udfFunctionSignatureMap);
return functions;
}
@Override
protected UserDefinedType fetchUserDefinedTypeDirect(QualifiedObjectName typeName)
{
throw new PrestoException(NOT_SUPPORTED, "User Defined Type is not supported in RestBasedFunctionNamespaceManager");
}
protected Optional<SqlInvokedFunction> getSqlInvokedFunction(SqlFunctionHandle functionHandle)
{
Collection<SqlInvokedFunction> functions = fetchFunctionsDirect(functionHandle.getFunctionId().getFunctionName());
return functions.stream()
.filter(sqlFunction -> sqlFunction.getFunctionId().equals(functionHandle.getFunctionId()) &&
sqlFunction.getVersion().toString().equals(functionHandle.getVersion()))
.findFirst();
}
@Override
protected FunctionMetadata fetchFunctionMetadataDirect(SqlFunctionHandle functionHandle)
{
checkCatalog(functionHandle);
Optional<SqlInvokedFunction> function = getSqlInvokedFunction(functionHandle);
if (!function.isPresent()) {
throw new InvalidFunctionHandleException(functionHandle);
}
return sqlInvokedFunctionToMetadata(function.get());
}
@Override
protected ScalarFunctionImplementation fetchFunctionImplementationDirect(SqlFunctionHandle functionHandle)
{
checkCatalog(functionHandle);
Optional<SqlInvokedFunction> function = getSqlInvokedFunction(functionHandle);
if (!function.isPresent()) {
throw new InvalidFunctionHandleException(functionHandle);
}
return sqlInvokedFunctionToImplementation(function.get());
}
@Override
public void createFunction(SqlInvokedFunction function, boolean replace)
{
throw new PrestoException(NOT_SUPPORTED, "Create Function is not supported in RestBasedFunctionNamespaceManager");
}
@Override
public void alterFunction(QualifiedObjectName functionName, Optional<List<TypeSignature>> parameterTypes, AlterRoutineCharacteristics alterRoutineCharacteristics)
{
throw new PrestoException(NOT_SUPPORTED, "Alter Function is not supported in RestBasedFunctionNamespaceManager");
}
@Override
public void dropFunction(QualifiedObjectName functionName, Optional<List<TypeSignature>> parameterTypes, boolean exists)
{
throw new PrestoException(NOT_SUPPORTED, "Drop Function is not supported in RestBasedFunctionNamespaceManager");
}
@Override
public Collection<SqlInvokedFunction> listFunctions(Optional<String> likePattern, Optional<String> escape)
{
return getLatestFunctions();
}
@Override
public void addUserDefinedType(UserDefinedType userDefinedType)
{
throw new PrestoException(NOT_SUPPORTED, "Add User Defined Type is not supported in RestBasedFunctionNamespaceManager");
}
}