RedisSchema.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to you under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.calcite.adapter.redis;

import org.apache.calcite.model.JsonCustomTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/**
 * Schema mapped onto a set of URLs / HTML tables. Each table in the schema
 * is an HTML table on a URL.
 */
class RedisSchema extends AbstractSchema {
  private static final String DATA_FORMAT = "dataFormat";
  private static final String FIELDS = "fields";
  private static final String KEY_DELIMITER = "keyDelimiter";
  private static final String OPERAND = "operand";

  public final String host;
  public final int port;
  public final int database;
  public final String password;
  public final List<Map<String, Object>> tables;

  RedisSchema(String host,
      int port,
      int database,
      String password,
      List<Map<String, Object>> tables) {
    this.host = host;
    this.port = port;
    this.database = database;
    this.password = password;
    this.tables = tables;
  }

  @Override protected Map<String, Table> getTableMap() {
    JsonCustomTable[] jsonCustomTables = new JsonCustomTable[tables.size()];
    Set<String> tableNames = Arrays.stream(tables.toArray(jsonCustomTables))
        .map(e -> e.name).collect(Collectors.toSet());
    return Maps.asMap(ImmutableSet.copyOf(tableNames),
        CacheBuilder.newBuilder()
            .build(CacheLoader.from(this::table)));
  }

  private Table table(String tableName) {
    RedisConfig redisConfig = new RedisConfig(host, port, database, password);
    return RedisTable.create(RedisSchema.this, tableName, redisConfig, null);
  }

  public RedisTableFieldInfo getTableFieldInfo(String tableName) {
    RedisTableFieldInfo tableFieldInfo = new RedisTableFieldInfo();
    List<LinkedHashMap<String, Object>> fields = new ArrayList<>();
    String dataFormat = "";
    String keyDelimiter = "";
    @SuppressWarnings({"unchecked", "rawtypes"})
    List<JsonCustomTable> jsonCustomTables =
        (List<JsonCustomTable>) (List) this.tables;
    for (JsonCustomTable jsonCustomTable : jsonCustomTables) {
      if (jsonCustomTable.name.equals(tableName)) {
        Map<String, Object> map =
            requireNonNull(jsonCustomTable.operand, OPERAND);
        if (isEmptyObject(map.get(DATA_FORMAT))) {
          throw new RuntimeException("dataFormat is invalid, it must be raw, csv or json");
        }
        RedisDataFormat dataFormatEnum =
            RedisDataFormat.fromTypeName(map.get(DATA_FORMAT).toString());
        if (dataFormatEnum == null) {
          throw new RuntimeException("dataFormat is invalid, it must be raw, csv or json");
        }
        if (isEmptyObject(map.get(FIELDS))) {
          throw new RuntimeException("fields is null");
        }
        dataFormat = map.get(DATA_FORMAT).toString();
        fields = (List<LinkedHashMap<String, Object>>) map.get(FIELDS);
        if (map.get(KEY_DELIMITER) != null) {
          keyDelimiter = map.get(KEY_DELIMITER).toString();
        }
        break;
      }
    }
    tableFieldInfo.setTableName(tableName);
    tableFieldInfo.setDataFormat(dataFormat);
    tableFieldInfo.setFields(fields);
    if (!keyDelimiter.isEmpty()) {
      tableFieldInfo.setKeyDelimiter(keyDelimiter);
    }
    return tableFieldInfo;
  }

  /** Returns whether an object should be considered "empty" for configuration/validation.
   *
   * <p>Semantics are aligned with
   * {@code org.apache.commons.lang3.ObjectUtils#isEmptyObject(Object)}.
   * <ul>
   *   <li>{@code null} is empty
   *   <li>{@link CharSequence}: {@code length()==0} is empty
   *   <li>{@link Collection}/{@link Map}: {@code isEmpty()} is empty
   *   <li>{@link Optional}: {@code !isPresent()} is empty
   *   <li>Arrays: {@code length==0} is empty
   * </ul>
   */
  public static boolean isEmptyObject(@Nullable Object o) {
    if (o == null) {
      return true;
    }
    if (o instanceof CharSequence) {
      return ((CharSequence) o).length() == 0;
    }
    if (o instanceof Collection<?>) {
      return ((Collection<?>) o).isEmpty();
    }
    if (o instanceof Map) {
      return ((Map<?, ?>) o).isEmpty();
    }
    if (o instanceof Optional) {
      return !((Optional<?>) o).isPresent();
    }
    final Class<?> c = o.getClass();
    if (c.isArray()) {
      return java.lang.reflect.Array.getLength(o) == 0;
    }
    return false;
  }

}