InnodbSchema.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.innodb;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.innodb.java.reader.TableReaderFactory;
import com.alibaba.innodb.java.reader.column.ColumnType;
import com.alibaba.innodb.java.reader.schema.Column;
import com.alibaba.innodb.java.reader.schema.TableDef;
import com.alibaba.innodb.java.reader.schema.provider.TableDefProvider;
import com.alibaba.innodb.java.reader.schema.provider.impl.SqlFileTableDefProvider;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.stream.Collectors.toList;
/**
* Schema for an InnoDB data source.
*/
public class InnodbSchema extends AbstractSchema {
final List<String> sqlFilePathList;
final String ibdDataFileBasePath;
final TableReaderFactory tableReaderFactory;
static final ColumnTypeToSqlTypeConversionRules COLUMN_TYPE_TO_SQL_TYPE =
ColumnTypeToSqlTypeConversionRules.instance();
public InnodbSchema(List<String> sqlFilePathList,
String ibdDataFileBasePath) {
checkArgument(sqlFilePathList != null && !sqlFilePathList.isEmpty(),
"SQL file path list cannot be empty");
checkArgument(StringUtils.isNotEmpty(ibdDataFileBasePath),
"InnoDB data file with ibd suffix cannot be empty");
this.sqlFilePathList = sqlFilePathList;
this.ibdDataFileBasePath = ibdDataFileBasePath;
List<TableDefProvider> tableDefProviderList = sqlFilePathList.stream()
.map(SqlFileTableDefProvider::new).collect(toList());
this.tableReaderFactory = TableReaderFactory.builder()
.withProviders(tableDefProviderList)
.withDataFileBasePath(ibdDataFileBasePath)
.build();
}
RelProtoDataType getRelDataType(String tableName) {
// Temporary type factory, just for the duration of this method. Allowable
// because we're creating a proto-type, not a type; before being used, the
// proto-type will be copied into a real type factory.
final RelDataTypeFactory typeFactory =
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
if (!tableReaderFactory.existTableDef(tableName)) {
throw new RuntimeException("Table definition " + tableName
+ " not found");
}
TableDef tableDef = tableReaderFactory.getTableDef(tableName);
for (Column column : tableDef.getColumnList()) {
final SqlTypeName sqlTypeName =
COLUMN_TYPE_TO_SQL_TYPE.lookup(column.getType());
final int precision;
final int scale;
switch (column.getType()) {
case ColumnType.TIMESTAMP:
case ColumnType.TIME:
case ColumnType.DATETIME:
precision = column.getPrecision();
scale = 0;
break;
default:
precision = column.getPrecision();
scale = column.getScale();
break;
}
if (sqlTypeName.allowsPrecScale(true, true)
&& column.getPrecision() >= 0
&& column.getScale() >= 0) {
fieldInfo.add(column.getName(), sqlTypeName, precision, scale);
} else if (sqlTypeName.allowsPrecNoScale() && precision >= 0) {
fieldInfo.add(column.getName(), sqlTypeName, precision);
} else {
assert sqlTypeName.allowsNoPrecNoScale();
fieldInfo.add(column.getName(), sqlTypeName);
}
fieldInfo.nullable(column.isNullable());
}
return RelDataTypeImpl.proto(fieldInfo.build());
}
/**
* Return table definition.
*/
public TableDef getTableDef(String tableName) {
if (!tableReaderFactory.existTableDef(tableName)) {
throw new RuntimeException("cannot find table definition for " + tableName);
}
return tableReaderFactory.getTableDef(tableName);
}
@Override protected Map<String, Table> getTableMap() {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
Map<String, TableDef> map = tableReaderFactory.getTableNameToDefMap();
for (Map.Entry<String, TableDef> entry : map.entrySet()) {
String tableName = entry.getKey();
builder.put(tableName, new InnodbTable(this, tableName));
}
return builder.build();
}
}