CouchbaseDatabase.java

/*-
 * ========================LICENSE_START=================================
 * flyway-database-nc-couchbase
 * ========================================================================
 * Copyright (C) 2010 - 2025 Red Gate Software Ltd
 * ========================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * =========================LICENSE_END==================================
 */
package org.flywaydb.database.nc.couchbase;

import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS;
import static org.flywaydb.core.internal.logging.PreviewFeatureWarning.logPreviewFeature;

import com.couchbase.client.core.env.SecurityConfig;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.ClusterOptions;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.query.QueryOptions;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.java.transactions.TransactionQueryOptions;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.function.BiFunction;
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.api.configuration.Configuration;
import org.flywaydb.core.internal.configuration.ConfigUtils;
import org.flywaydb.core.internal.configuration.models.ResolvedEnvironment;
import org.flywaydb.core.internal.nc.ConnectionType;
import org.flywaydb.core.internal.nc.DatabaseSupport;
import org.flywaydb.core.internal.nc.DatabaseVersionImpl;
import org.flywaydb.core.internal.nc.MetaData;
import org.flywaydb.core.internal.nc.schemahistory.SchemaHistoryItem;
import org.flywaydb.core.internal.nc.schemahistory.SchemaHistoryModel;
import org.flywaydb.core.internal.parser.Parser;
import org.flywaydb.core.internal.parser.ParsingContext;
import org.flywaydb.nc.NativeConnectorsNonJdbc;
import org.flywaydb.nc.executors.NonJdbcExecutorExecutionUnit;

public class CouchbaseDatabase extends NativeConnectorsNonJdbc {
    private static final String URL_PREFIX = "couchbases:";
    private static final String DEFAULT_SCOPE = "_default";
    
    private Cluster cluster;
    private Bucket bucket;
    private Scope scope;
    private String currentFQCNPrefix;

    @Override
    public boolean isOnByDefault(final Configuration configuration) {
        return true;
    }

    @Override
    protected String getDefaultSchema(final Configuration configuration) {
        final String defaultSchema = ConfigUtils.getCalculatedDefaultSchema(configuration);

        if (defaultSchema == null) {
            throw new FlywayException("Couchbase connection failed: schema configuration is missing");
        }

        return defaultSchema;
    }

    @Override
    public DatabaseSupport supportsUrl(final String url) {
        if (url.startsWith(URL_PREFIX)) {
            return new DatabaseSupport(true, 1);
        }
        return new DatabaseSupport(false, 0);
    }

    @Override
    public List<String> supportedVerbs() {
        return List.of("info", "migrate", "clean", "undo", "baseline", "validate", "repair");
    }

    @Override
    public boolean supportsTransactions() {
        return true;
    }

    @Override
    public void initialize(final ResolvedEnvironment environment, final Configuration configuration) {
        logPreviewFeature(getDatabaseType() + " Support");

        initializeConnectionType(configuration);

        ClusterEnvironment env = ClusterEnvironment.builder()
            .securityConfig(SecurityConfig.enableTls(true))
            .build();

        cluster = Cluster.connect(environment.getUrl(),
            ClusterOptions.clusterOptions(environment.getUser(), environment.getPassword()).environment(env));

        initializeBucketAndScope(getDefaultSchema(configuration));
    }

    private void initializeBucketAndScope(String defaultSchema) {
        currentSchema = defaultSchema;

        bucket = cluster.bucket(getBucketFromSchema(defaultSchema));
        scope = bucket.scope(getScopeFromSchema(defaultSchema));
        currentFQCNPrefix = "`" + bucket.name() + "`.`" + scope.name() + "`";
    }

    private void initializeConnectionType(final Configuration configuration) {
        connectionType = ConnectionType.API;
    }

    @Override
    public void doExecute(final NonJdbcExecutorExecutionUnit executionUnit, final boolean outputQueryResults) {
        try {
            scope.query(executionUnit.getScript(), QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));

            // Wait for metadata change to propagate before executing the next statement
            // REQUEST_PLUS only applies to data queries. It doesn't help for scenarios like an INSERT follows a CREATE COLLECTION
            Thread.sleep(300);
        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public String getDatabaseType() {
        return "Couchbase";
    }


    @Override
    public MetaData getDatabaseMetaData() {
        if (this.metaData != null) {
            return metaData;
        }

        QueryResult result = cluster.query("SELECT VERSION() AS version;");
        String serverVersion = result.rowsAsObject().get(0).getString("version");

        metaData =  new MetaData(
            getDatabaseType(),
            "Couchbase",
            new DatabaseVersionImpl(serverVersion),
            serverVersion,
            getCurrentSchema(),
            connectionType
        );

        return metaData;
    }

    @Override
    public void createSchemaHistoryTable(final Configuration configuration) {
        try {
            String fqcn = String.format("%s.%s", currentFQCNPrefix, configuration.getTable());
            scope.query("CREATE COLLECTION " + fqcn);

            // Allow time for collection to become visible
            Thread.sleep(300);
            
            scope.query("CREATE PRIMARY INDEX ON " + fqcn);
        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public boolean schemaHistoryTableExists(final String tableName) {
        String sql = "SELECT * FROM system:keyspaces  WHERE `bucket` = $bucket AND `scope` = $scope AND `name` = $collection";

        QueryResult result = cluster.query(sql, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS).parameters(JsonObject.create()
                .put("bucket", bucket.name())
                .put("scope", scope.name())
                .put("collection", tableName)));

        return !result.rowsAsObject().isEmpty();
    }

    @Override
    public SchemaHistoryModel getSchemaHistoryModel(final String tableName) {
        String fqcn = String.format("%s.%s", currentFQCNPrefix, tableName);

        String query = "SELECT installed_rank, version, description, type, script, checksum, " +
            "installed_on, installed_by, execution_time, success FROM " + fqcn;

        List<SchemaHistoryItem> items = new ArrayList<>();

        try {
            QueryResult result = cluster.query(query, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));

            for (JsonObject row : result.rowsAsObject()) {
                items.add(SchemaHistoryItem.builder()
                    .installedRank(row.getInt("installed_rank"))
                    .version(row.getString("version"))
                    .description(row.getString("description"))
                    .type(row.getString("type"))
                    .script(row.getString("script"))
                    .checksum(row.getInt("checksum"))
                    .installedOn(fromTimestampString(row.getString("installed_on")))
                    .installedBy(row.getString("installed_by"))
                    .executionTime(row.getInt("execution_time"))
                    .success(row.getBoolean("success"))
                    .build());
            }

            return new SchemaHistoryModel(items);
        } catch (Exception ignored) {
            return new SchemaHistoryModel();
        }
    }

    private LocalDateTime fromTimestampString(final String timestamp) {
        String pattern = "yyyy-MM-dd HH:mm:ss.";
        pattern = pattern + "S".repeat(timestamp.length() - pattern.length());
        return LocalDateTime.parse(timestamp, DateTimeFormatter.ofPattern(pattern, Locale.ENGLISH));
    }

    @Override
    public void appendSchemaHistoryItem(final SchemaHistoryItem item, final String tableName) {
        String fqcn = String.format("%s.%s", currentFQCNPrefix, tableName);

        JsonObject params = JsonObject.create()
            .put("installed_rank", item.getInstalledRank())
            .put("version", item.getVersion())
            .put("description", item.getDescription())
            .put("type", item.getType())
            .put("script", item.getScript())
            .put("checksum", item.getChecksum())
            .put("installed_on", Timestamp.from(Instant.now()).toString())
            .put("installed_by", item.getInstalledBy())
            .put("execution_time", item.getExecutionTime())
            .put("success", item.isSuccess());

        String query = "INSERT INTO " + fqcn + " (KEY, VALUE) VALUES (UUID(), " + params.toString() + ");";

        try {
            if (batch.isEmpty()) {
                scope.query(query, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));
            } else {
                batch.add(new NonJdbcExecutorExecutionUnit(query, ""));
            }

        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public boolean isSchemaEmpty(final String schema) {
        String bucket = getBucketFromSchema(schema);
        String scope = getScopeFromSchema(schema);

        String sql = "SELECT * FROM system:keyspaces  WHERE `bucket` = $bucket AND `scope` = $scope";

        QueryResult result = cluster.query(sql, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS).parameters(JsonObject.create()
                .put("bucket", bucket)
                .put("scope", scope)));

        return result.rowsAsObject().isEmpty();
    }

    @Override
    public boolean isSchemaExists(final String schema) {
        String bucket = getBucketFromSchema(schema);
        String scope = getScopeFromSchema(schema);

        String sql = "SELECT * FROM system:scopes  WHERE `bucket` = $bucket";

        QueryResult result = cluster.query(sql, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS).parameters(JsonObject.create()
            .put("bucket", bucket)));

        if (result.rowsAsObject().isEmpty()) {
            return false;
        } else {
            if (DEFAULT_SCOPE.equals(scope)) {
                return true;
            } else {
                return result.rowsAsObject().stream()
                    .anyMatch(row -> scope.equals(row.getObject("scopes").getString("name")));
            }
        }
    }

    @Override
    public void createSchemas(final String... schemas) {
        for (String schema : schemas) {
            String bucket = getBucketFromSchema(schema);
            String scope = getScopeFromSchema(schema);

            try {
                if (!isSchemaExists(bucket)) {
                    throw new FlywayException("Bucket creation is currently not supported");
                }

                if (!DEFAULT_SCOPE.equals(scope)) {
                    String fullScope = String.format("`%s`.`%s`", bucket, scope);
                    cluster.query("CREATE SCOPE " + fullScope, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));
                }

            } catch (Exception e) {
                throw new FlywayException(e);
            }
        }
    }

    @Override
    public BiFunction<Configuration, ParsingContext, Parser> getParser() {
        return null;
    }

    @Override
    public void doExecuteBatch() {
        if (batch.isEmpty()) {
            return;
        }

        try {
            cluster.transactions().run(ctx -> {
                for (NonJdbcExecutorExecutionUnit executionUnit : batch) {
                    ctx.query(scope,
                        executionUnit.getScript(),
                        TransactionQueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));
                }
            });
        } catch (Exception e) {
            throw new FlywayException(e);
        } finally {
            batch.clear();
        }
    }

    @Override
    public boolean transactionAsBatch() {
        return true;
    }

    @Override
    public String getCurrentUser() {
        return null;
    }

    @Override
    public void startTransaction() {
        batch.clear();
    }

    @Override
    public void commitTransaction() {
        batch.clear();
    }

    @Override
    public void rollbackTransaction() {
        batch.clear();
    }

    @Override
    public void doCleanSchema(final String schema) {
        String bucket = getBucketFromSchema(schema);
        String scope = getScopeFromSchema(schema);

        try {
            String sql = "SELECT * FROM system:keyspaces  WHERE `bucket` = $bucket AND `scope` = $scope";

            QueryResult result = cluster.query(sql, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS).parameters(JsonObject.create()
                .put("bucket", bucket)
                .put("scope", scope)));

            for (JsonObject row : result.rowsAsObject()) {
                String collectionName =  row.getObject("keyspaces").getString("name");

                String fqcn = String.format("`%s`.`%s`.%s", bucket, scope, collectionName);

                cluster.query("DROP COLLECTION " + fqcn);
            }
        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public void doDropSchema(final String schema) {
        String bucket = getBucketFromSchema(schema);
        String scope = getScopeFromSchema(schema);

        try {
            if (!DEFAULT_SCOPE.equals(scope)) {
                String fullScope = String.format("`%s`.`%s`", bucket, scope);
                cluster.query("DROP SCOPE " + fullScope, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));
            }
        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public void removeFailedSchemaHistoryItems(final String tableName) {
        try {
            String fqcn = String.format("%s.%s", currentFQCNPrefix, tableName);
            scope.query("DELETE FROM " + fqcn + " WHERE success = false", QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS));
        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public void updateSchemaHistoryItem(final SchemaHistoryItem item, final String tableName) {
        try {
            String fqcn = String.format("%s.%s", currentFQCNPrefix, tableName);

            String sql = "UPDATE " + fqcn
                + " SET checksum = $checksum, description = $description, type = $type WHERE installed_rank = $installed_rank LIMIT 1";

            JsonObject params = JsonObject.create()
                .put("installed_rank", item.getInstalledRank())
                .put("checksum", item.getChecksum())
                .put("description", item.getDescription())
                .put("type", item.getType());

            scope.query(sql, QueryOptions.queryOptions().scanConsistency(REQUEST_PLUS).parameters(params));

        } catch (Exception e) {
            throw new FlywayException(e);
        }
    }

    @Override
    public boolean isClosed() {
        return false;
    }

    @Override
    public void close() throws Exception {
        if (cluster != null) {
            cluster.close();
        }
    }

    private String getBucketFromSchema(String schema) {
        int index = schema.indexOf(".");
        if (index != -1) {
            return schema.substring(0, index);
        } else {
            return schema;
        }
    }

    private String getScopeFromSchema(String schema) {
        int index = schema.indexOf(".");
        if (index != -1) {
            return schema.substring(index+1);
        } else {
            return DEFAULT_SCOPE;
        }
    }

}