博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink TaskManager的managed memory
阅读量:7236 次
发布时间:2019-06-29

本文共 15914 字,大约阅读时间需要 53 分钟。

本文主要研究一下flink TaskManager的managed memory

TaskManagerOptions

flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolvingpublic class TaskManagerOptions {	//......	/**	 * JVM heap size for the TaskManagers with memory size.	 */	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)	public static final ConfigOption
TASK_MANAGER_HEAP_MEMORY = key("taskmanager.heap.size") .defaultValue("1024m") .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" + " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" + " YARN container, minus a certain tolerance value."); /** * Amount of memory to be allocated by the task manager's memory manager. If not * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}. */ public static final ConfigOption
MANAGED_MEMORY_SIZE = key("taskmanager.memory.size") .defaultValue("0") .withDescription("Amount of memory to be allocated by the task manager's memory manager." + " If not set, a relative fraction will be allocated."); /** * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is * not set. */ public static final ConfigOption
MANAGED_MEMORY_FRACTION = key("taskmanager.memory.fraction") .defaultValue(0.7f) .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" + " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." + " For example, a value of `0.8` means that a task manager reserves 80% of its memory" + " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" + " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() + " is not set."); /** * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager * as well as the network buffers. **/ public static final ConfigOption
MEMORY_OFF_HEAP = key("taskmanager.memory.off-heap") .defaultValue(false) .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" + " TaskManager as well as the network buffers."); /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ public static final ConfigOption
MANAGED_MEMORY_PRE_ALLOCATE = key("taskmanager.memory.preallocate") .defaultValue(false) .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting."); //......}复制代码
  • taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {	//......	/**	 * Calculates the amount of heap memory to use (to set via -Xmx and -Xms)	 * based on the total memory to use and the given configuration parameters.	 *	 * @param totalJavaMemorySizeMB	 * 		overall available memory to use (heap and off-heap)	 * @param config	 * 		configuration object	 *	 * @return heap memory to use (in megabytes)	 */	public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {		Preconditions.checkArgument(totalJavaMemorySizeMB > 0);		// subtract the Java memory used for network buffers (always off-heap)		final long networkBufMB =			calculateNetworkBufferMemory(				totalJavaMemorySizeMB << 20, // megabytes to bytes				config) >> 20; // bytes to megabytes		final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;		// split the available Java memory between heap and off-heap		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);		final long heapSizeMB;		if (useOffHeap) {			long offHeapSize;			String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();			if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {				try {					offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();				} catch (IllegalArgumentException e) {					throw new IllegalConfigurationException(						"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);				}			} else {				offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);			}			if (offHeapSize <= 0) {				// calculate off-heap section via fraction				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);				offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);			}			TaskManagerServicesConfiguration				.checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,					TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),					"Managed memory size too large for " + networkBufMB +						" MB network buffer memory and a total of " + totalJavaMemorySizeMB +						" MB JVM memory");			heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;		} else {			heapSizeMB = remainingJavaMemorySizeMB;		}		return heapSizeMB;	}	//......}复制代码
  • taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
  • 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
  • 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {	//......	/**	 * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.	 *	 * @param taskManagerServicesConfiguration to create the memory manager from	 * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory	 * @param maxJvmHeapMemory the maximum JVM heap size	 * @return Memory manager	 * @throws Exception	 */	private static MemoryManager createMemoryManager(			TaskManagerServicesConfiguration taskManagerServicesConfiguration,			long freeHeapMemoryWithDefrag,			long maxJvmHeapMemory) throws Exception {		// computing the amount of memory to use depends on how much memory is available		// it strictly needs to happen AFTER the network stack has been initialized		// check if a value has been configured		long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();		MemoryType memType = taskManagerServicesConfiguration.getMemoryType();		final long memorySize;		boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();		if (configuredMemory > 0) {			if (preAllocateMemory) {				LOG.info("Using {} MB for managed memory." , configuredMemory);			} else {				LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);			}			memorySize = configuredMemory << 20; // megabytes to bytes		} else {			// similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)			float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();			if (memType == MemoryType.HEAP) {				// network buffers allocated off-heap -> use memoryFraction of the available heap:				long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);				if (preAllocateMemory) {					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,						memoryFraction , relativeMemSize >> 20);				} else {					LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +						"memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);				}				memorySize = relativeMemSize;			} else if (memType == MemoryType.OFF_HEAP) {				// The maximum heap memory has been adjusted according to the fraction (see				// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.				// maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)				// directMemorySize = jvmTotalNoNet * memoryFraction				long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);				if (preAllocateMemory) {					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,						memoryFraction, directMemorySize >> 20);				} else {					LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +						" memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);				}				memorySize = directMemorySize;			} else {				throw new RuntimeException("No supported memory type detected.");			}		}		// now start the memory manager		final MemoryManager memoryManager;		try {			memoryManager = new MemoryManager(				memorySize,				taskManagerServicesConfiguration.getNumberOfSlots(),				taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),				memType,				preAllocateMemory);		} catch (OutOfMemoryError e) {			if (memType == MemoryType.HEAP) {				throw new Exception("OutOfMemory error (" + e.getMessage() +					") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);			} else if (memType == MemoryType.OFF_HEAP) {				throw new Exception("OutOfMemory error (" + e.getMessage() +					") while allocating the TaskManager off-heap memory (" + memorySize +					" bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);			} else {				throw e;			}		}		return memoryManager;	}	//......}复制代码
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
  • 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {	//......	/**	 * Utility method to extract TaskManager config parameters from the configuration and to	 * sanity check them.	 *	 * @param configuration The configuration.	 * @param remoteAddress identifying the IP address under which the TaskManager will be accessible	 * @param localCommunication True, to skip initializing the network stack.	 *                                      Use only in cases where only one task manager runs.	 * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.	 */	public static TaskManagerServicesConfiguration fromConfiguration(			Configuration configuration,			InetAddress remoteAddress,			boolean localCommunication) throws Exception {		// we need this because many configs have been written with a "-1" entry		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);		if (slots == -1) {			slots = 1;		}		final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);		String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);		if (localStateRootDir.length == 0) {			// default to temp dirs.			localStateRootDir = tmpDirs;		}		boolean localRecoveryMode = configuration.getBoolean(			CheckpointingOptions.LOCAL_RECOVERY.key(),			CheckpointingOptions.LOCAL_RECOVERY.defaultValue());		final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(			configuration,			localCommunication,			remoteAddress,			slots);		final QueryableStateConfiguration queryableStateConfig =				parseQueryableStateConfiguration(configuration);		// extract memory settings		long configuredMemory;		String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();		if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {			try {				configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();			} catch (IllegalArgumentException e) {				throw new IllegalConfigurationException(					"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);			}		} else {			configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);		}		checkConfigParameter(			configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||				configuredMemory > 0, configuredMemory,			TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),			"MemoryManager needs at least one MB of memory. " +				"If you leave this config parameter empty, the system automatically " +				"pick a fraction of the available memory.");		// check whether we use heap or off-heap memory		final MemoryType memType;		if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {			memType = MemoryType.OFF_HEAP;		} else {			memType = MemoryType.HEAP;		}		boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);		float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);		checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,			TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");		long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();		return new TaskManagerServicesConfiguration(			remoteAddress,			tmpDirs,			localStateRootDir,			localRecoveryMode,			networkConfig,			queryableStateConfig,			slots,			configuredMemory,			memType,			preAllocateMemory,			memoryFraction,			timerServiceShutdownTimeout,			ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));	}	//......}复制代码
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

小结

  • TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

doc

转载地址:http://asgfm.baihongyu.com/

你可能感兴趣的文章
[cocos2d-x 3.0] 触摸显示器
查看>>
Linux 修改计算机名
查看>>
python --subprocess 范例
查看>>
菜鸟学SSH(十二)——Hibernate与Spring配合生成表结构
查看>>
Python IO多路复用
查看>>
op挂载摄像头
查看>>
json和pickle
查看>>
Mac系统的下载(图文详解)
查看>>
【UESTC 482】Charitable Exchange(优先队列+bfs)
查看>>
通过VS2010的内存分析工具来分析程序性能问题
查看>>
mini-cygwin
查看>>
如何能低成本地快速获取大量目标用户,而不是与竞争对手持久战?
查看>>
三分钟教你同步 Visual Studio Code 设置
查看>>
程序员,你是选择25k的996还是18k的8小时工作日?
查看>>
Socket编程入门(基于Java实现)
查看>>
RX第一章
查看>>
DOM0级和DOM2级事件
查看>>
iOS Client 与WebSocket 通信(二)(转)
查看>>
网易考拉海购Java后台开发实习-面经(已拿offer)
查看>>
React-Router看这里
查看>>