ShuffleChannelHandlerContext.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import io.netty.channel.group.ChannelGroup;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.util.Shell;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_MAX_SHUFFLE_CONNECTIONS;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_BUFFER_SIZE;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MANAGE_OS_CACHE;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_READAHEAD_BYTES;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED;
import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE;
import static org.apache.hadoop.mapred.ShuffleHandler.MAX_SHUFFLE_CONNECTIONS;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_BUFFER_SIZE;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MANAGE_OS_CACHE;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_READAHEAD_BYTES;
import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_TRANSFERTO_ALLOWED;
import static org.apache.hadoop.mapred.ShuffleHandler.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.mapred.ShuffleHandler.WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED;
@SuppressWarnings("checkstyle:VisibilityModifier")
public class ShuffleChannelHandlerContext {
public final Configuration conf;
public final JobTokenSecretManager secretManager;
public final Map<String, String> userRsrc;
public final LoadingCache<ShuffleHandler.AttemptPathIdentifier,
ShuffleHandler.AttemptPathInfo> pathCache;
public final IndexCache indexCache;
public final ShuffleHandler.ShuffleMetrics metrics;
public final ChannelGroup allChannels;
public final boolean connectionKeepAliveEnabled;
public final int sslFileBufferSize;
public final int connectionKeepAliveTimeOut;
public final int mapOutputMetaInfoCacheSize;
public final AtomicInteger activeConnections = new AtomicInteger();
/**
* Should the shuffle use posix_fadvise calls to manage the OS cache during
* sendfile.
*/
public final boolean manageOsCache;
public final int readaheadLength;
public final int maxShuffleConnections;
public final int shuffleBufferSize;
public final boolean shuffleTransferToAllowed;
public final int maxSessionOpenFiles;
public final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
public int port = -1;
public ShuffleChannelHandlerContext(Configuration conf,
Map<String, String> userRsrc,
JobTokenSecretManager secretManager,
LoadingCache<ShuffleHandler.AttemptPathIdentifier,
ShuffleHandler.AttemptPathInfo> patCache,
IndexCache indexCache,
ShuffleHandler.ShuffleMetrics metrics,
ChannelGroup allChannels) {
this.conf = conf;
this.userRsrc = userRsrc;
this.secretManager = secretManager;
this.pathCache = patCache;
this.indexCache = indexCache;
this.metrics = metrics;
this.allChannels = allChannels;
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
connectionKeepAliveEnabled =
conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
connectionKeepAliveTimeOut =
Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
mapOutputMetaInfoCacheSize =
Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
DEFAULT_SHUFFLE_READAHEAD_BYTES);
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
DEFAULT_SHUFFLE_BUFFER_SIZE);
shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
(Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
}
void setPort(int port) {
this.port = port;
}
}