Microsoft.IO.RecyclableMemoryStream源码解读

时间:2022-12-03 19:05:09

源码地址:https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStreamMicrosoft.IO.RecyclableMemoryStream源码解读

小对象池和大对象池管理、RecyclableMemoryStream创建、各场景的ETW消息\事件钩子;线程安全

 

Microsoft.IO.RecyclableMemoryStream源码解读

备注:官方这张图,只是池块增长策略阐述,不能很好理解其内部池具体实现。小对象还好理解,大对象组织分配并不像画的这样。

1.1、构造函数参数

blockSize:小对象池,块大小;默认128KB。

largeBufferMultiple:大对象池,策略被乘数大小;默认1M。

maximumBufferSize:大对象池,块最大大小;默认128M。

useExponentialLargeBuffer:大对象池策略,true:指数、false:线性;一个类实体只能对应一种策略,如果想使用2种策略,就要定义2个类实体;默认线性。

maximumSmallPoolFreeBytes:byte[] 归还小对象池,小对象池最大大小;超过GC回收,否则归还小对象池;默认不限制。

maximumLargePoolFreeBytes:byte[] 归还大对象池,大对象池最大大小;超过GC回收,否则归还大对象池;默认不限制。

1.2、小对象池

private readonly ConcurrentStack<byte[]> smallPool; //小对象池

private long smallPoolFreeSize; //小对象池归还(空闲)byte大小
private long smallPoolInUseSize; //小对象借出(使用)byte大小

smallPool 使用线程安全堆栈,每个元素bye[] 大小一样,对应blockSize
如:blockSize = 128K,则bye[] 大小都是 128K

maximumSmallPoolFreeBytes = 128M,则代表smallPool 所有块大小总和最大值为128M,超出则归还时不保存到最小池中

1.3、大对象池

private readonly ConcurrentStack<byte[]>[] largePools; //大对象池

private readonly long[] largeBufferFreeSize; //大对象每个层级,归还(空闲)byte大小
private readonly long[] largeBufferInUseSize; //大对象每个层级,借出(使用)byte大小

largePools 使用线程安全堆栈的数组,数组每个索引对应指数/线性一个层级大小,每个层级bye[] 大小一样。

如:largeBufferMultiple = 1M、maximumBufferSize = 128M
则,指数 1M、2M、4M、8M、16M、32M、64M、128M,largePools.Length = 8;largePools[2] 下面bye[] 大小都是4M
  线性 1M、2M、3M、4M、5M、6M、7M、......、128M,largePools.Length = 128;largePools[2] 下面bye[] 大小都是3M


largeBufferFreeSize.Length == largePools.Length
largeBufferInUseSize = largePools.Length + 1 ,多出一个元素保存,借出时requiredSize > maximumBufferSize 所有byte[]大小,此byte[] 无法归还到大对象池,会被GC直接回收。

maximumLargePoolFreeBytes = 256M,则代表大池的各个维度块大小总和最大值,超出则归还时不保存到池中,各个维度如:线性有128个维度,指数8个维度,各个维度都是堆栈
线性最大空间:256M * 128 = 32G
指数最大空间:256M * 8 = 2G

1.4、byte[] 借出/归还

借出:

internal byte[] GetBlock() //从小对象池获取byte[],若无则直接创建 new byte[this.BlockSize]
    
internal byte[] GetLargeBuffer(long requiredSize, Guid id, string tag) //从大对象池获取byte[],首先根据requiredSize计算对应大对象索引位置,若无则直接创建 new byte[requiredSize]

归还:

internal void ReturnBlocks(List<byte[]> blocks, Guid id, string tag) //多个块归还小对象池,判断是否maximumSmallPoolFreeBytes超出,不超出则归还
internal void ReturnBlock(byte[] block, Guid id, string tag) //单个块归还小对象池,判断是否maximumSmallPoolFreeBytes超出,不超出则归还

//归还大对象池,首先根据buffer.Length计算对应大对象索引位置,判断索引对应层级大小是否maximumLargePoolFreeBytes超出,不超出则归还
internal void ReturnLargeBuffer(byte[] buffer, Guid id, string tag) 

1.5、RecyclableMemoryStream创建

public MemoryStream GetStream(Guid id, string tag, long requiredSize, bool asContiguousBuffer)

asContiguousBuffer == true && requiredSize > this.BlockSize

请求连续byte[] 且 请求字节大于1个小对象块大小时,则使用大象池创建RecyclableMemoryStream,否则使用小对象池创建RecyclableMemoryStream。

其他重载方法,无asContiguousBuffer参数,默认使用小对象池创建RecyclableMemoryStream。

方法中,byte[] buffer、Memory<byte> buffer、ReadOnlySpan<byte> buffer 参数,会把其中的byte 数据写入新申请的小对象blocks里面,不会复用这些对象。

二、RecyclableMemoryStream

非线程安全

2.1、构造函数

internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, long requestedSize, byte[] initialLargeBuffer)

memoryManager:RecyclableMemoryStreamManager引用,调用其提供借出\归还\通知状态等方法或者属性。

initialLargeBuffer:不为null代表使用大对象池,否则小对象池。GetBuffer()方法也会影响小对象池转为大对象池。

requestedSize:根据请求大小,分配1个大对象池bye[] 或者多个小对象池Block byte[]

2.2、属性

private readonly List<byte[]> blocks = new List<byte[]>(); //如果使用小对象池,则保存借出的多个小对象

private byte[] largeBuffer; //如果使用大对象池,则保存借出的大对象

/*
* 如果使用大对象池,Capacity调整需要更换更大的大对象;
* 老的大对象归还,大对象池超出暂时无法回收大对象,则保存到此,在对象Dispose时再次尝试归还。
* 因为可能有多次此情况发生,所有为List<>
*/
private List<byte[]> dirtyBuffers;

 2.3、释放/关闭/析构方法

/// <summary>
/// The finalizer will be called when a stream is not disposed properly.
/// </summary>
/// <remarks>Failing to dispose indicates a bug in the code using streams. Care should be taken to properly account for stream lifetime.</remarks>
~RecyclableMemoryStream()
{
    // 析构方法,兜底释放
	this.Dispose(false);
}

//非公开方法
/// <summary>
/// Returns the memory used by this stream back to the pool.
/// </summary>
/// <param name="disposing">Whether we're disposing (true), or being called by the finalizer (false).</param>
//disposing 区分是否析构函数调用
protected override void Dispose(bool disposing)
{
	if (this.disposed)
	{
		// 已释放不在释放,记录通知事件
		string doubleDisposeStack = null;
		if (this.memoryManager.GenerateCallStacks)
		{
			doubleDisposeStack = Environment.StackTrace;
		}

		this.memoryManager.ReportStreamDoubleDisposed(this.id, this.tag, this.AllocationStack, this.DisposeStack, doubleDisposeStack);
		return;
	}

	//标记已释放
	this.disposed = true;

	if (this.memoryManager.GenerateCallStacks)
	{
		this.DisposeStack = Environment.StackTrace;
	}

	this.memoryManager.ReportStreamDisposed(this.id, this.tag, this.AllocationStack, this.DisposeStack);

	if (disposing)
	{
		//已释放,不用进入析构队列,不会触发析构函数。
		GC.SuppressFinalize(this);
	}
	else
	{
		// We're being finalized.
		this.memoryManager.ReportStreamFinalized(this.id, this.tag, this.AllocationStack);

		//如果此应用程序域正在卸载,并且公共语言运行时已开始调用终止程序,则不执行归还池逻辑。
		if (AppDomain.CurrentDomain.IsFinalizingForUnload())
		{
			// If we're being finalized because of a shutdown, don't go any further.
			// We have no idea what's already been cleaned up. Triggering events may cause
			// a crash.
			base.Dispose(disposing);
			return;
		}
	}

	this.memoryManager.ReportStreamLength(this.length);

	if (this.largeBuffer != null)
	{
		//归还大对象
		this.memoryManager.ReturnLargeBuffer(this.largeBuffer, this.id, this.tag);
	}

	if (this.dirtyBuffers != null)
	{
		//再次尝试归还老的大对象列表
		foreach (var buffer in this.dirtyBuffers)
		{
			this.memoryManager.ReturnLargeBuffer(buffer, this.id, this.tag);
		}
	}

	//归还小对象块列表
	this.memoryManager.ReturnBlocks(this.blocks, this.id, this.tag);
	this.blocks.Clear();

	base.Dispose(disposing);
}

//公共方法
/// <summary>
/// Equivalent to <c>Dispose</c>.
/// </summary>
public override void Close()
{
	this.Dispose(true);
}

 2.4、小对象使用内部类

标识位置信息,方便参数传递,操作blocks属性。

private struct BlockAndOffset
{
	public int Block; //小对象块所在整体位置索引
	public int Offset; //小对象块中未使用字节开始位置\已使用字节结束位置

	public BlockAndOffset(int block, int offset)
	{
		this.Block = block;
		this.Offset = offset;
	}
}

 2.5、不连续字节流

public ReadOnlySequence<byte> GetReadOnlySequence()
{
	this.CheckDisposed();

	if (this.largeBuffer != null)
	{
		//大对象,只有1个字节数组,连续的
		AssertLengthIsSmall();
		return new ReadOnlySequence<byte>(this.largeBuffer, 0, (int)this.length);
	}

	if (this.blocks.Count == 1)
	{
		//小对象1个块,只有1个字节数组,连续的
		AssertLengthIsSmall();
		return new ReadOnlySequence<byte>(this.blocks[0], 0, (int)this.length);
	}

	//小对象多个块,多个字节数据,不连续的
	var first = new BlockSegment(this.blocks[0]);
	var last = first;

	//创建关联下一个块对象
	for (int blockIdx = 1; last.RunningIndex + last.Memory.Length < this.length; blockIdx++)
	{
		last = last.Append(this.blocks[blockIdx]);
	}

	//首尾对象
	return new ReadOnlySequence<byte>(first, 0, last, (int)(this.length - last.RunningIndex));
}

private sealed class BlockSegment : ReadOnlySequenceSegment<byte>
{
	public BlockSegment(Memory<byte> memory) => Memory = memory;

	public BlockSegment Append(Memory<byte> memory)
	{
		var nextSegment = new BlockSegment(memory) { RunningIndex = RunningIndex + Memory.Length };
		Next = nextSegment;
		return nextSegment;
	}
}

 2.6、IBufferWriter<T>接口实现

private byte[] bufferWriterTempBuffer;

private ArraySegment<byte> GetWritableBuffer(int sizeHint)
{
	this.CheckDisposed();
	if (sizeHint < 0)
	{
		throw new ArgumentOutOfRangeException(nameof(sizeHint), $"{nameof(sizeHint)} must be non-negative.");
	}

	var minimumBufferSize = Math.Max(sizeHint, 1);

	this.EnsureCapacity(this.position + minimumBufferSize);
	if (this.bufferWriterTempBuffer != null)
	{
		this.ReturnTempBuffer(this.bufferWriterTempBuffer);
		this.bufferWriterTempBuffer = null;
	}

	if (this.largeBuffer != null)
	{
		return new ArraySegment<byte>(this.largeBuffer, (int)this.position, this.largeBuffer.Length - (int)this.position);
	}

	BlockAndOffset blockAndOffset = this.GetBlockAndRelativeOffset(this.position);
	int remainingBytesInBlock = this.MemoryManager.BlockSize - blockAndOffset.Offset;
	if (remainingBytesInBlock >= minimumBufferSize)
	{
		//分配小对象,范围属于一个block,则返回block连续段
		return new ArraySegment<byte>(this.blocks[blockAndOffset.Block], blockAndOffset.Offset, this.MemoryManager.BlockSize - blockAndOffset.Offset);
	}

	//分配小对象,单位大于一个block块,则通过大对象/小对象分配byte[];记录赋值给属性bufferWriterTempBuffer;规避不好返回多个blocks中byte
	this.bufferWriterTempBuffer = minimumBufferSize > this.memoryManager.BlockSize ?
		this.memoryManager.GetLargeBuffer(minimumBufferSize, this.id, this.tag) :
		this.memoryManager.GetBlock();

	return new ArraySegment<byte>(this.bufferWriterTempBuffer);
}

public void Advance(int count)
{
	this.CheckDisposed();
	if (count < 0)
	{
		throw new ArgumentOutOfRangeException(nameof(count), $"{nameof(count)} must be non-negative.");
	}

	byte[] buffer = this.bufferWriterTempBuffer;
	if (buffer != null)
	{
		if (count > buffer.Length)
		{
			throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {buffer.Length}.");
		}
        
		//把bufferWriterTempBuffer属性中数据,写回小对象blocks
		this.Write(buffer, 0, count);
		this.ReturnTempBuffer(buffer);
		this.bufferWriterTempBuffer = null;
	}
	else
	{
		long bufferSize = this.largeBuffer == null
			? this.memoryManager.BlockSize - this.GetBlockAndRelativeOffset(this.position).Offset
			: this.largeBuffer.Length - this.position;

		if (count > bufferSize)
		{
			throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {bufferSize}.");
		}

		this.position += count;
		this.length = Math.Max(this.position, this.length);
	}
}

 2.7、GetBuffer()

如果使用大对象,则返回大对象数据

如果使用小对象+块1个,则直接返回这个块。如果大于1个,则申请新大对象返回,之前小对象归还。升级为使用大对象。

2.8、ToArray()

通过new byte[this.Length] 创建新byte数组,拷贝大对象/小对象数据过来。