ProportionRouterRpcFairnessPolicyController.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.federation.fairness;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_PROPORTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;

/**
 * Proportion fairness policy extending {@link AbstractRouterRpcFairnessPolicyController}
 * and fetching proportion of handlers from configuration for all available name services,
 * based on the proportion and the total number of handlers, calculate the handlers of all ns.
 * The handlers count will not change for this controller.
 */
public class ProportionRouterRpcFairnessPolicyController extends
    AbstractRouterRpcFairnessPolicyController{

  private static final Logger LOG =
      LoggerFactory.getLogger(ProportionRouterRpcFairnessPolicyController.class);
  // For unregistered ns, the default ns is used,
  // so the configuration can be simplified if the handler ratio of all ns is 1,
  // and transparent expansion of new ns can be supported.
  private static final String DEFAULT_NS = "default_ns";

  public ProportionRouterRpcFairnessPolicyController(Configuration conf){
    init(conf);
  }

  @Override
  public void init(Configuration conf) {
    super.init(conf);
    // Total handlers configured to process all incoming Rpc.
    int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);

    LOG.info("Handlers available for fairness assignment {} ", handlerCount);

    // Get all name services configured
    Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);

    // Insert the concurrent nameservice into the set to process together
    allConfiguredNS.add(CONCURRENT_NS);

    // Insert the default nameservice into the set to process together
    allConfiguredNS.add(DEFAULT_NS);
    for (String nsId : allConfiguredNS) {
      double dedicatedHandlerProportion = conf.getDouble(
          DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + nsId,
                DFS_ROUTER_FAIR_HANDLER_PROPORTION_DEFAULT);
      int dedicatedHandlers = (int) (dedicatedHandlerProportion * handlerCount);
      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
      // Each NS should have at least one handler assigned.
      if (dedicatedHandlers <= 0) {
        dedicatedHandlers = 1;
      }
      insertNameServiceWithPermits(nsId, dedicatedHandlers);
      LOG.info("Assigned {} handlers to nsId {} ", dedicatedHandlers, nsId);
    }
  }

  @Override
  public boolean acquirePermit(String nsId) {
    if (contains(nsId)) {
      return super.acquirePermit(nsId);
    }
    return super.acquirePermit(DEFAULT_NS);
  }

  @Override
  public void releasePermit(String nsId) {
    if (contains(nsId)) {
      super.releasePermit(nsId);
    } else {
      super.releasePermit(DEFAULT_NS);
    }
  }
}