HiveFileRenamer.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.collect.ImmutableList;
import org.weakref.jmx.Managed;

import javax.annotation.PreDestroy;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.Verify.verify;

public class HiveFileRenamer
{
    private final Map<QueryId, Map<HiveMetadataUpdateKey, AtomicLong>> queryPartitionFileCounterMap = new ConcurrentHashMap<>();
    private final Map<QueryId, Map<HiveMetadataUpdateHandle, String>> queryHiveMetadataResultMap = new ConcurrentHashMap<>();

    public List<ConnectorMetadataUpdateHandle> getMetadataUpdateResults(List<ConnectorMetadataUpdateHandle> metadataUpdateRequests, QueryId queryId)
    {
        ImmutableList.Builder<ConnectorMetadataUpdateHandle> metadataUpdateResults = ImmutableList.builder();

        for (ConnectorMetadataUpdateHandle connectorMetadataUpdateHandle : metadataUpdateRequests) {
            HiveMetadataUpdateHandle request = (HiveMetadataUpdateHandle) connectorMetadataUpdateHandle;
            String fileName = getFileName(request, queryId);
            metadataUpdateResults.add(new HiveMetadataUpdateHandle(request.getRequestId(), request.getSchemaTableName(), request.getPartitionName(), Optional.of(fileName)));
        }
        return metadataUpdateResults.build();
    }

    public void cleanup(QueryId queryId)
    {
        queryPartitionFileCounterMap.remove(queryId);
        queryHiveMetadataResultMap.remove(queryId);
    }

    private String getFileName(HiveMetadataUpdateHandle request, QueryId queryId)
    {
        if (!queryPartitionFileCounterMap.containsKey(queryId) || !queryHiveMetadataResultMap.containsKey(queryId)) {
            queryPartitionFileCounterMap.putIfAbsent(queryId, new ConcurrentHashMap<>());
            queryHiveMetadataResultMap.putIfAbsent(queryId, new ConcurrentHashMap<>());
        }

        // To keep track of the file counter per query per partition
        Map<HiveMetadataUpdateKey, AtomicLong> partitionFileCounterMap = queryPartitionFileCounterMap.get(queryId);

        // To keep track of the file name result per query per request
        // This is to make sure that request - fileName mapping is 1:1
        Map<HiveMetadataUpdateHandle, String> hiveMetadataResultMap = queryHiveMetadataResultMap.get(queryId);

        // If we have seen this request before then directly return the result.
        if (hiveMetadataResultMap.containsKey(request)) {
            // We come here if for some reason the worker did not receive the fileName and it retried the request.
            return hiveMetadataResultMap.get(request);
        }

        HiveMetadataUpdateKey key = new HiveMetadataUpdateKey(request);
        // File names start from 0
        partitionFileCounterMap.putIfAbsent(key, new AtomicLong(0));

        AtomicLong fileCount = partitionFileCounterMap.get(key);
        String fileName = Long.valueOf(fileCount.getAndIncrement()).toString();

        // Store the request - fileName mapping
        hiveMetadataResultMap.put(request, fileName);

        return fileName;
    }

    @PreDestroy
    public void stop()
    {
        // Mappings should be deleted when query finishes. So verify that map is empty before its closed.
        verify(queryPartitionFileCounterMap.isEmpty(), "Query partition file counter map has %s entries left behind", queryPartitionFileCounterMap.size());
        verify(queryHiveMetadataResultMap.isEmpty(), "Query hive metadata result map has %s entries left behind", queryHiveMetadataResultMap.size());
    }

    @Managed
    public int getQueryPartitionFileCounterMapSize()
    {
        return queryPartitionFileCounterMap.size();
    }

    private static class HiveMetadataUpdateKey
    {
        private final SchemaTableName schemaTableName;
        private final Optional<String> partitionName;

        private HiveMetadataUpdateKey(HiveMetadataUpdateHandle hiveMetadataUpdateHandle)
        {
            this.schemaTableName = hiveMetadataUpdateHandle.getSchemaTableName();
            this.partitionName = hiveMetadataUpdateHandle.getPartitionName();
        }

        @Override
        public boolean equals(Object obj)
        {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            HiveMetadataUpdateKey o = (HiveMetadataUpdateKey) obj;
            return schemaTableName.equals(o.schemaTableName) &&
                    partitionName.equals(o.partitionName);
        }

        @Override
        public int hashCode()
        {
            return Objects.hash(schemaTableName, partitionName);
        }
    }
}