CalciteConnectionImpl.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to you under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.calcite.jdbc;

import org.apache.calcite.DataContext;
import org.apache.calcite.DataContexts;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.AvaticaSite;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.calcite.avatica.Helper;
import org.apache.calcite.avatica.InternalProperty;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.NoSuchStatementException;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.CalcitePrepare.Context;
import org.apache.calcite.linq4j.BaseQueryable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.materialize.Lattice;
import org.apache.calcite.materialize.MaterializationService;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.DelegatingTypeSystem;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.TimeFrameSet;
import org.apache.calcite.rel.type.TimeFrames;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.LongSchemaVersion;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.server.CalciteServer;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.sql.advise.SqlAdvisor;
import org.apache.calcite.sql.advise.SqlAdvisorValidator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorWithHints;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Holder;
import org.apache.calcite.util.Util;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.lang.reflect.Type;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;

import static org.apache.calcite.linq4j.Nullness.castNonNull;

import static java.util.Objects.requireNonNull;

/**
 * Implementation of JDBC connection
 * in the Calcite engine.
 *
 * <p>Abstract to allow newer versions of JDBC to add methods.
 */
abstract class CalciteConnectionImpl
    extends AvaticaConnection
    implements CalciteConnection, QueryProvider {
  public final JavaTypeFactory typeFactory;

  final CalciteSchema rootSchema;
  final Supplier<CalcitePrepare> prepareFactory;
  final CalciteServer server = new CalciteServerImpl();

  // must be package-protected
  static final Trojan TROJAN = createTrojan();

  /**
   * Creates a CalciteConnectionImpl.
   *
   * <p>Not public; method is called only from the driver.
   *
   * @param driver Driver
   * @param factory Factory for JDBC objects
   * @param url Server URL
   * @param info Other connection properties
   * @param rootSchema Root schema, or null
   * @param typeFactory Type factory, or null
   */
  protected CalciteConnectionImpl(Driver driver, AvaticaFactory factory,
      String url, Properties info, @Nullable CalciteSchema rootSchema,
      @Nullable JavaTypeFactory typeFactory) {
    super(driver, factory, url, info);
    CalciteConnectionConfig cfg = new CalciteConnectionConfigImpl(info);
    this.prepareFactory = driver::createPrepare;
    if (typeFactory != null) {
      this.typeFactory = typeFactory;
    } else {
      RelDataTypeSystem typeSystem =
          cfg.typeSystem(RelDataTypeSystem.class, RelDataTypeSystem.DEFAULT);
      if (cfg.conformance().shouldConvertRaggedUnionTypesToVarying()) {
        typeSystem =
            new DelegatingTypeSystem(typeSystem) {
              @Override public boolean
              shouldConvertRaggedUnionTypesToVarying() {
                return true;
              }
            };
      }
      this.typeFactory = new JavaTypeFactoryImpl(typeSystem);
    }
    this.rootSchema =
        requireNonNull(rootSchema != null
            ? rootSchema
            : CalciteSchema.createRootSchema(true));
    // Add dual table metadata when isSupportedDualTable return true
    if (cfg.conformance().isSupportedDualTable()) {
      SchemaPlus schemaPlus = this.rootSchema.plus();
      // Dual table contains one row with a value X
      schemaPlus.add(
          "DUAL", ViewTable.viewMacro(schemaPlus, "VALUES ('X')",
          ImmutableList.of(), null, false));
    }
    checkArgument(this.rootSchema.isRoot(), "must be root schema");
    this.properties.put(InternalProperty.CASE_SENSITIVE, cfg.caseSensitive());
    this.properties.put(InternalProperty.UNQUOTED_CASING, cfg.unquotedCasing());
    this.properties.put(InternalProperty.QUOTED_CASING, cfg.quotedCasing());
    this.properties.put(InternalProperty.QUOTING, cfg.quoting());
  }

  CalciteMetaImpl meta() {
    return (CalciteMetaImpl) meta;
  }

  @Override public CalciteConnectionConfig config() {
    return new CalciteConnectionConfigImpl(info);
  }

  @Override public Context createPrepareContext() {
    return new ContextImpl(this);
  }

  /** Called after the constructor has completed and the model has been
   * loaded. */
  void init() {
    final MaterializationService service = MaterializationService.instance();
    for (CalciteSchema.LatticeEntry e : Schemas.getLatticeEntries(rootSchema)) {
      final Lattice lattice = e.getLattice();
      for (Lattice.Tile tile : lattice.computeTiles()) {
        service.defineTile(lattice, tile.bitSet(), tile.measures, e.schema,
            true, true);
      }
    }
  }

  @Override public <T> T unwrap(Class<T> iface) throws SQLException {
    if (iface == RelRunner.class) {
      return iface.cast((RelRunner) rel ->
          prepareStatement_(CalcitePrepare.Query.of(rel),
              ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
              getHoldability()));
    }
    return super.unwrap(iface);
  }

  @Override public CalciteStatement createStatement(int resultSetType,
      int resultSetConcurrency, int resultSetHoldability) throws SQLException {
    return (CalciteStatement) super.createStatement(resultSetType,
        resultSetConcurrency, resultSetHoldability);
  }

  @Override public CalcitePreparedStatement prepareStatement(
      String sql,
      int resultSetType,
      int resultSetConcurrency,
      int resultSetHoldability) throws SQLException {
    final CalcitePrepare.Query<Object> query = CalcitePrepare.Query.of(sql);
    return prepareStatement_(query, resultSetType, resultSetConcurrency,
        resultSetHoldability);
  }

  private CalcitePreparedStatement prepareStatement_(
      CalcitePrepare.Query<?> query,
      int resultSetType,
      int resultSetConcurrency,
      int resultSetHoldability) throws SQLException {
    try {
      final Meta.Signature signature =
          parseQuery(query, createPrepareContext(), -1);
      final CalcitePreparedStatement calcitePreparedStatement =
          (CalcitePreparedStatement) factory.newPreparedStatement(this, null,
              signature, resultSetType, resultSetConcurrency, resultSetHoldability);
      server.getStatement(calcitePreparedStatement.handle).setSignature(signature);
      return calcitePreparedStatement;
    } catch (Exception e) {
      String message = query.rel == null
          ? "Error while preparing statement [" + query.sql + "]"
          : "Error while preparing plan [" + RelOptUtil.toString(query.rel) + "]";
      throw Helper.INSTANCE.createException(message, e);
    }
  }

  <T> CalcitePrepare.CalciteSignature<T> parseQuery(
      CalcitePrepare.Query<T> query,
      CalcitePrepare.Context prepareContext, long maxRowCount) {
    CalcitePrepare.Dummy.push(prepareContext);
    try {
      final CalcitePrepare prepare = prepareFactory.get();
      return prepare.prepareSql(prepareContext, query, Object[].class,
          maxRowCount);
    } finally {
      CalcitePrepare.Dummy.pop(prepareContext);
    }
  }

  @Override public AtomicBoolean getCancelFlag(Meta.StatementHandle handle)
      throws NoSuchStatementException {
    final CalciteServerStatement serverStatement = server.getStatement(handle);
    return ((CalciteServerStatementImpl) serverStatement).cancelFlag;
  }

  // CalciteConnection methods

  @Override public SchemaPlus getRootSchema() {
    return rootSchema.plus();
  }

  @Override public JavaTypeFactory getTypeFactory() {
    return typeFactory;
  }

  @Override public Properties getProperties() {
    return info;
  }

  // QueryProvider methods

  @Override public <T> Queryable<T> createQuery(
      Expression expression, Class<T> rowType) {
    return new CalciteQueryable<>(this, rowType, expression);
  }

  @Override public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
    return new CalciteQueryable<>(this, rowType, expression);
  }

  @Override public <T> T execute(Expression expression, Type type) {
    return castNonNull(null); // TODO:
  }

  @Override public <T> T execute(Expression expression, Class<T> type) {
    return castNonNull(null); // TODO:
  }

  @Override public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
    try {
      CalciteStatement statement = (CalciteStatement) createStatement();
      CalcitePrepare.CalciteSignature<T> signature =
          statement.prepare(queryable);
      return enumerable(statement.handle, signature, null).enumerator();
    } catch (SQLException e) {
      throw new RuntimeException(e);
    }
  }

  public <T> Enumerable<T> enumerable(Meta.StatementHandle handle,
      CalcitePrepare.CalciteSignature<T> signature,
      @Nullable List<TypedValue> parameterValues0) throws SQLException {
    Map<String, Object> map = new LinkedHashMap<>();
    AvaticaStatement statement = lookupStatement(handle);
    final List<TypedValue> parameterValues;
    if (parameterValues0 == null || parameterValues0.isEmpty()) {
      parameterValues = TROJAN.getParameterValues(statement);
    } else {
      parameterValues = parameterValues0;
    }

    if (MetaImpl.checkParameterValueHasNull(parameterValues)) {
      throw new SQLException("exception while executing query: unbound parameter");
    }

    Ord.forEach(parameterValues,
        (e, i) -> map.put("?" + i, e.toLocal()));
    map.putAll(signature.internalParameters);
    final AtomicBoolean cancelFlag;
    try {
      cancelFlag = getCancelFlag(handle);
    } catch (NoSuchStatementException e) {
      throw new RuntimeException(e);
    }
    map.put(DataContext.Variable.CANCEL_FLAG.camelName, cancelFlag);
    int queryTimeout = statement.getQueryTimeout();
    // Avoid overflow
    if (queryTimeout > 0 && queryTimeout < Integer.MAX_VALUE / 1000) {
      map.put(DataContext.Variable.TIMEOUT.camelName, queryTimeout * 1000L);
    }
    final DataContext dataContext = createDataContext(map, signature.rootSchema);
    return signature.enumerable(dataContext);
  }

  public DataContext createDataContext(Map<String, Object> parameterValues,
      @Nullable CalciteSchema rootSchema) {
    if (config().spark()) {
      return DataContexts.EMPTY;
    }
    return new DataContextImpl(this, parameterValues, rootSchema);
  }

  // do not make public
  UnregisteredDriver getDriver() {
    return driver;
  }

  // do not make public
  AvaticaFactory getFactory() {
    return factory;
  }

  /** Implementation of Queryable.
   *
   * @param <T> element type */
  static class CalciteQueryable<T> extends BaseQueryable<T> {
    CalciteQueryable(CalciteConnection connection, Type elementType,
        Expression expression) {
      super(connection, elementType, expression);
    }

    public CalciteConnection getConnection() {
      return (CalciteConnection) provider;
    }
  }

  /** Implementation of Server. */
  private static class CalciteServerImpl implements CalciteServer {
    final Map<Integer, CalciteServerStatement> statementMap = new HashMap<>();

    @Override public void removeStatement(Meta.StatementHandle h) {
      statementMap.remove(h.id);
    }

    @Override public void addStatement(CalciteConnection connection,
        Meta.StatementHandle h) {
      final CalciteConnectionImpl c = (CalciteConnectionImpl) connection;
      final CalciteServerStatement previous =
          statementMap.put(h.id, new CalciteServerStatementImpl(c));
      if (previous != null) {
        throw new AssertionError();
      }
    }

    @Override public CalciteServerStatement getStatement(Meta.StatementHandle h)
        throws NoSuchStatementException {
      CalciteServerStatement statement = statementMap.get(h.id);
      if (statement == null) {
        throw new NoSuchStatementException(h);
      }
      return statement;
    }
  }

  /** Schema that has no parents. */
  static class RootSchema extends AbstractSchema {
    RootSchema() {
      super();
    }

    @Override public Expression getExpression(@Nullable SchemaPlus parentSchema,
        String name) {
      return Expressions.call(
          DataContext.ROOT,
          BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
    }
  }

  /** Implementation of DataContext. */
  static class DataContextImpl implements DataContext {
    private final ImmutableMap<Object, Object> map;
    private final @Nullable CalciteSchema rootSchema;
    private final QueryProvider queryProvider;
    private final JavaTypeFactory typeFactory;

    DataContextImpl(CalciteConnectionImpl connection,
        Map<String, Object> parameters, @Nullable CalciteSchema rootSchema) {
      this.queryProvider = connection;
      this.typeFactory = connection.getTypeFactory();
      this.rootSchema = rootSchema;

      // Store the time at which the query started executing. The SQL
      // standard says that functions such as CURRENT_TIMESTAMP return the
      // same value throughout the query.
      final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis());

      // Give a hook chance to alter the clock.
      Hook.CURRENT_TIME.run(timeHolder);
      final long time = timeHolder.get();
      final TimeZone timeZone = connection.getTimeZone();
      final TimeFrameSet timeFrameSet =
          connection.typeFactory.getTypeSystem()
              .deriveTimeFrameSet(TimeFrames.CORE);
      final long localOffset = timeZone.getOffset(time);
      final long currentOffset = localOffset;
      final long sysOffset = TimeZone.getDefault().getOffset(time);
      final String user = "sa";
      final String systemUser = System.getProperty("user.name");
      final String localeName = connection.config().locale();
      final Locale locale = localeName != null
          ? Util.parseLocale(localeName) : Locale.ROOT;

      // Give a hook chance to alter standard input, output, error streams.
      final Holder<Object[]> streamHolder =
          Holder.of(new Object[] {System.in, System.out, System.err});
      Hook.STANDARD_STREAMS.run(streamHolder);

      ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
      builder.put(Variable.UTC_TIMESTAMP.camelName, time)
          .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset)
          .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset)
          .put(Variable.SYS_TIMESTAMP.camelName, time + sysOffset)
          .put(Variable.TIME_ZONE.camelName, timeZone)
          .put(Variable.TIME_FRAME_SET.camelName, timeFrameSet)
          .put(Variable.USER.camelName, user)
          .put(Variable.SYSTEM_USER.camelName, systemUser)
          .put(Variable.LOCALE.camelName, locale)
          .put(Variable.STDIN.camelName, streamHolder.get()[0])
          .put(Variable.STDOUT.camelName, streamHolder.get()[1])
          .put(Variable.STDERR.camelName, streamHolder.get()[2]);
      for (Map.Entry<String, Object> entry : parameters.entrySet()) {
        Object e = entry.getValue();
        if (e == null) {
          e = AvaticaSite.DUMMY_VALUE;
        }
        builder.put(entry.getKey(), e);
      }
      map = builder.build();
    }

    @Override public synchronized @Nullable Object get(String name) {
      Object o = map.get(name);
      if (o == AvaticaSite.DUMMY_VALUE) {
        return null;
      }
      if (o == null && Variable.SQL_ADVISOR.camelName.equals(name)) {
        return getSqlAdvisor();
      }
      return o;
    }

    private SqlAdvisor getSqlAdvisor() {
      final CalciteConnectionImpl con = (CalciteConnectionImpl) queryProvider;
      final String schemaName;
      try {
        schemaName = con.getSchema();
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
      final List<String> schemaPath =
          schemaName == null
              ? ImmutableList.of()
              : ImmutableList.of(schemaName);
      final SqlValidatorWithHints validator =
          new SqlAdvisorValidator(SqlStdOperatorTable.instance(),
              new CalciteCatalogReader(requireNonNull(rootSchema, "rootSchema"),
                  schemaPath, typeFactory, con.config()),
              typeFactory, SqlValidator.Config.DEFAULT);
      final CalciteConnectionConfig config = con.config();
      // This duplicates org.apache.calcite.prepare.CalcitePrepareImpl.prepare2_
      final SqlParser.Config parserConfig = SqlParser.config()
          .withQuotedCasing(config.quotedCasing())
          .withUnquotedCasing(config.unquotedCasing())
          .withQuoting(config.quoting())
          .withConformance(config.conformance())
          .withCaseSensitive(config.caseSensitive());
      return new SqlAdvisor(validator, parserConfig);
    }

    @Override public @Nullable SchemaPlus getRootSchema() {
      return rootSchema == null ? null : rootSchema.plus();
    }

    @Override public JavaTypeFactory getTypeFactory() {
      return typeFactory;
    }

    @Override public QueryProvider getQueryProvider() {
      return queryProvider;
    }
  }

  /** Implementation of Context. */
  static class ContextImpl implements CalcitePrepare.Context {
    private final CalciteConnectionImpl connection;
    private final CalciteSchema mutableRootSchema;
    private final CalciteSchema rootSchema;

    ContextImpl(CalciteConnectionImpl connection) {
      this.connection = requireNonNull(connection, "connection");
      long now = System.currentTimeMillis();
      SchemaVersion schemaVersion = new LongSchemaVersion(now);
      this.mutableRootSchema = connection.rootSchema;
      this.rootSchema = mutableRootSchema.createSnapshot(schemaVersion);
    }

    @Override public JavaTypeFactory getTypeFactory() {
      return connection.typeFactory;
    }

    @Override public CalciteSchema getRootSchema() {
      return rootSchema;
    }

    @Override public CalciteSchema getMutableRootSchema() {
      return mutableRootSchema;
    }

    @Override public List<String> getDefaultSchemaPath() {
      final String schemaName;
      try {
        schemaName = connection.getSchema();
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
      return schemaName == null
          ? ImmutableList.of()
          : ImmutableList.of(schemaName);
    }

    @Override public @Nullable List<String> getObjectPath() {
      return null;
    }

    @Override public CalciteConnectionConfig config() {
      return connection.config();
    }

    @Override public DataContext getDataContext() {
      return connection.createDataContext(ImmutableMap.of(),
          rootSchema);
    }

    @Override public RelRunner getRelRunner() {
      final RelRunner runner;
      try {
        runner = connection.unwrap(RelRunner.class);
      } catch (SQLException e) {
        throw new RuntimeException(e);
      }
      if (runner == null) {
        throw new UnsupportedOperationException();
      }
      return runner;
    }

    @Override public CalcitePrepare.SparkHandler spark() {
      final boolean enable = config().spark();
      return CalcitePrepare.Dummy.getSparkHandler(enable);
    }
  }

  /** Implementation of {@link CalciteServerStatement}. */
  static class CalciteServerStatementImpl
      implements CalciteServerStatement {
    private final CalciteConnectionImpl connection;
    private @Nullable Iterator<Object> iterator;
    private Meta.@Nullable Signature signature;
    private final AtomicBoolean cancelFlag = new AtomicBoolean();

    CalciteServerStatementImpl(CalciteConnectionImpl connection) {
      this.connection = requireNonNull(connection, "connection");
    }

    @Override public Context createPrepareContext() {
      return connection.createPrepareContext();
    }

    @Override public CalciteConnection getConnection() {
      return connection;
    }

    @Override public void setSignature(Meta.Signature signature) {
      this.signature = signature;
    }

    @Override public Meta.@Nullable Signature getSignature() {
      return signature;
    }

    @Override public @Nullable Iterator<Object> getResultSet() {
      return iterator;
    }

    @Override public void setResultSet(Iterator<Object> iterator) {
      this.iterator = iterator;
    }
  }
}