RatisServer.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.resourcemanager;

import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.impl.BaseStateMachine;

import javax.annotation.PostConstruct;
import javax.inject.Inject;

import java.io.File;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;

import static java.util.Objects.requireNonNull;

public class RatisServer
{
    private final InternalNodeManager internalNodeManager;
    private final String id;
    private final String groupId;
    private final int port;
    private final String storageDir;

    @Inject
    public RatisServer(InternalNodeManager internalNodeManager, RaftConfig raftConfig)
    {
        requireNonNull(internalNodeManager, "internalNodeManager is null");
        this.internalNodeManager = internalNodeManager;
        this.id = internalNodeManager.getCurrentNode().getNodeIdentifier();
        requireNonNull(raftConfig, "raftConfig is null");
        this.groupId = raftConfig.getGroupId();
        this.port = raftConfig.getPort();
        this.storageDir = raftConfig.getStorageDir();
    }

    public RaftPeer[] getPeers()
    {
        Set<InternalNode> resourceManagers = internalNodeManager.getResourceManagers();
        return resourceManagers.stream()
                .map(resourceManager -> {
                    RaftPeer.Builder builder = RaftPeer.newBuilder();
                    builder.setId(RaftPeerId.valueOf(resourceManager.getNodeIdentifier()))
                        .setAddress(resourceManager.getHost() + ":" + resourceManager.getRaftPort().getAsInt());
                    return builder.build();
                }).toArray(RaftPeer[]::new);
    }

    @PostConstruct
    public void start()
            throws Exception
    {
        run();
    }

    public void run() throws Exception
    {
        RaftProperties properties = new RaftProperties();
        GrpcConfigKeys.Server.setPort(properties, port);
        File storage = new File(storageDir + "/" + id);
        RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storage));
        final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(UUID.nameUUIDFromBytes(groupId.getBytes())), getPeers());

        RaftServer raftServer = RaftServer.newBuilder()
                .setServerId(RaftPeerId.valueOf(id))
                .setProperties(properties)
                .setGroup(raftGroup)
                .setStateMachine(new BaseStateMachine())
                .build();
        raftServer.start();
    }
}