文章目录
前言
本节来看看在没有命中缓存的情况下,page级别的内存是如何分配的,还会提及page级别的数据结构(chunk中的内存偏移树图)。命中缓存的分配流程可参考【PooledByteBufAllocator命中缓存的分配流程】。
如果对本文的一些基础概念、名词不是很清楚,可以参考【ByteBuf的结构、分类、核心api简介】和【内存规格、缓存&结构、chunk、arena、page、subpage等概念介绍】,本文对基础概念也不会再一一赘述了。
Netty Version:4.1.6
实验代码
TestPage.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* @author WenJie
*/
public class TestPage{
public static void main(String[] args) {
int page = 1024 * 8;
// 获取PooledByteBufAllocator实例。
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
// 先创建一个ByteBuf,申请16kb内存,即两个页
ByteBuf byteBuf1 = allocator.directBuffer(2 * page);
// 回收ByteBuf,这一步会缓存内存、将ByteBuf对象扔进对象池,详细等后边博客更新。
byteBuf1.release();
}
}
跟进源码
把断点打在这里:
跟进directBuffer方法:
io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, Integer.MAX_VALUE);
}
继续跟进directBuffer方法:
io.netty.buffer.AbstractByteBufAllocator#directBuffer(int, int)
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
// 验证参数
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
跟进newDirectBuffer方法:
io.netty.buffer.PooledByteBufAllocator#newDirectBuffer
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 对下面两行不熟悉的,可以参考前言提到的博客
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
// 除非特殊情况,否则directArena都不为null
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 一般情况下都能拿到jdk的unsafe,返回true。
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
// 检测ByteBuf
return toLeakAwareBuffer(buf);
}
跟进directArena.allocate:
io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 创建/复用ByteBuf对象
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 给ByteBuf分配内存、初始化。
allocate(cache, buf, reqCapacity);
return buf;
}
- PooledByteBuf
buf = newByteBuf(maxCapacity);就是复用/创建ByteBuf对象,在【上一节】讲过了,这里就直接跳。
跟进allocate方法(这次全部贴出,代码很长):
io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 数值规格化,https://wenjie.store/archives/about-bytebuf-4有跟进过
final int normCapacity = normalizeCapacity(reqCapacity);
// 判断是够是tiny或者small规格
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
// 尝试从tiny规格缓存拿到内存分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 尝试从small规格缓存拿到内存分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* 同步锁锁住head节点,防止构造的双向链表指针覆盖
*/
synchronized (head) {
// 获取头结点的下一个节点。
// 如果双向链表中只有head节点,那head节点就是指向自己
final PoolSubpage<T> s = head.next;
// 如果双向链表不止head一个元素。
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
// 拿到内存“偏移量”
long handle = s.allocate();
assert handle >= 0;
// subpage级别内存分配
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
// 计数器计数
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
// subpage级别内存分配(前提是small规格)
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
// 由于我们的代码申请的内存规格是normal
// 所以会来到一下代码。
if (normCapacity <= chunkSize) {
// 尝试从normal规格缓存分配内存
// 由于我们代码是第一次,所以不会有缓存,返回false
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 通过Arena分配page级别内存
allocateNormal(buf, reqCapacity, normCapacity);
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
- 如果上面注释的名词看不懂,同样建议看看前言提到的两篇博客。
为了确实证明没有走缓存,放张断点图:
跟进allocateNormal方法,此处【坐标1】:
io.netty.buffer.PoolArena#allocateNormal
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 先尝试从已经获取到系统内存、不同占用率的chunklist中分配内存
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
++allocationsNormal;
return;
}
// 如果没有符合要求的chunklist或者程序刚启动时为空,就会创建一个。
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
// 尝试获取符合需求的内存“偏移量”
long handle = c.allocate(normCapacity);
++allocationsNormal;
assert handle > 0;
// 初始化ByteBuf
c.initBuf(buf, handle, reqCapacity);
// 检测chunklist容量、添加chunk至chunklist等。
qInit.add(c);
}
- 这里的q050,q025...是什么意思?在讲PoolArena数据结构的时候有讲过,忘记的话可以回去看看,这里就不一一赘述了。
当还有空闲空间的chunk
假设当前有符合要求的点chunkList,那么就会从中取一个chunk分配内存,我们这里取q050.allocate方法跟进看看,其它原理都一样:
io.netty.buffer.PoolChunkList#allocate
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null || normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
for (PoolChunk<T> cur = head;;) {
// 尝试获取符合需求的内存“偏移量”
// 这个allocate方法等下也会遇到,下面一块讲
long handle = cur.allocate(normCapacity);
// < 0 说明没有
if (handle < 0) {
// 取下一个chunk
cur = cur.next;
// 没有chunk说明到双向链表结尾了,返回false
if (cur == null) {
return false;
}
} else {
// 获取成功,分配内存初始化ByteBuf并返回给客户端。
// 这个initBuf其实和【坐标1】的initBuf一样,同样到下面一块讲
cur.initBuf(buf, handle, reqCapacity);
// 双向链表满了,放到下一个chunklist。
if (cur.usage() >= maxUsage) {
remove(cur);
nextList.add(cur);
}
return true;
}
}
}
- 如果成功拿到chunk并且chunk的剩余内存能满足需求,那就在这里初始化ByteBuf并返回。
如果拿不到可用的chunk,就会进入一下流程。
当需要新建chunk
视角重新转回【坐标1】的代码,来继续看剩余的代码:
io.netty.buffer.PoolArena#allocateNormal
// 如果没有符合要求的chunklist或者程序刚启动时为空,就会创建一个。
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
// 尝试获取符合需求的内存“偏移量”
long handle = c.allocate(normCapacity);
++allocationsNormal;
assert handle > 0;
// 初始化ByteBuf
c.initBuf(buf, handle, reqCapacity);
// 检测chunklist容量、添加chunk至chunklist等。
qInit.add(c);
创建chunk流程
先来看看newChunk方法:
io.netty.buffer.PoolArena.DirectArena#newChunk
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>(
this, allocateDirect(chunkSize),
pageSize, maxOrder, pageShifts, chunkSize);
}
- 这里的allocateDirect(chunkSize)就是返回jdk底层的DirectByteBuffer对象,即jdk底层向操作系统申请直接内存,默认是16MB,直接验证了【chunk等概念介绍】的结论。
- allocateDirect方法我就不跟进了,有兴趣的自己补充下,因为底层的底层是native方法,c++比较??的可以继续跟下去。
跟进PoolChunk的构造方法,接下来有重头戏了:
io.netty.buffer.PoolChunk#PoolChunk(io.netty.buffer.PoolArena
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
// 是否Pooled的标记
unpooled = false;
// 记录所属Arena
this.arena = arena;
// 刚刚向操作系统申请到的直接内存的对象
this.memory = memory;
// 页大小,默认8192B
this.pageSize = pageSize;
// 参与位运算的的一个单位,2^13就是8192B
this.pageShifts = pageShifts;
// “树”高度,默认为11,等下你就明白了
this.maxOrder = maxOrder;
// chunk的大小,默认为16MB
this.chunkSize = chunkSize;
// 不可用标记,即“树”的第十二层
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
// 也是用于参与位运算,默认是-8192
subpageOverflowMask = ~(pageSize - 1);
// 当前chunk剩余内存
freeBytes = chunkSize;
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
// 1 << 11 = 2048
maxSubpageAllocs = 1 << maxOrder;
// 构建内存“偏移量”的“树”
memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
// for循环从左到右构建“树”
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
// 用于memoryMap改变以后,统计“树”
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
// 每个chunk都有2048个Page
subpages = newSubpageArray(maxSubpageAllocs);
}
- 最后一行验证了前面博客【chunk、page等介绍】中chunk有2048个Page的结论。
chunk中的内存偏移树图
下面来看看上面for循环构建完成的“树”是什么样的,相信看完你也能明白树字为什么要打双引号了:
- 你可以把它当做是一颗很高的二叉树去理解,因为Netty确实是通过位运算技巧把这个数组当成二叉树一样遍历(只能说太diao了),但务必不要真的当做二叉树,因为源码的数据结构仅仅是一个数组,只是画成类似二叉树的模样方便理解。
- 相信看完这个图,你应该能理解maxOrder为什么=11了。
- 第n层的最左节点值为2^n,然后从左到右依次增加,比如第11层的[0 ~ 8]KB节点值为4096,[8 ~ 16]KB的节点的值为4097。
- 后面根据申请的内存大小,从这里面就取出对应的块,并打上标记,具体过程在下面向chunk申请内存的时候讲到。
既然chunk已经成功向操作系统申请到内存,并且完成初始化了,那么接下来就是要从chunk中取内存了。
向chunk申请page级别内存
视角再转回【坐标1】中的代码,我们接下来要跟这一段:
io.netty.buffer.PoolArena#allocateNormal
// 向chunk申请内存并返回“偏移量”,申请失败则返回-1
long handle = c.allocate(normCapacity);
跟进c.allocate方法:
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
// page级别分页
return allocateRun(normCapacity);
} else {
// subpage级别分页,下篇博客重点讲解。
return allocateSubpage(normCapacity);
}
}
- 这里说一下,jemalloc将多个page称为run,我的塑料英语又被刷新了~
跟进allocateRun方法:
io.netty.buffer.PoolChunk#allocateRun
private long allocateRun(int normCapacity) {
// 根据需要申请的内存大小计算出“树”的第几层
// 这里d=10,因为实验代码申请16kb,对应“树”的第十层
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 取“树”第十层的可用节点,如果实验代码首次执行,则id=1024。
int id = allocateNode(d);
if (id < 0) {
return id;
}
// 更新chunk的剩余内存大小
freeBytes -= runLength(id);
// 返回节点下标
return id;
}
跟进allocateNode方法看看,这个方法会更新“树”,还是比较重要的:
io.netty.buffer.PoolChunk#allocateNode
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable
return -1;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// 以上运算都是在找指定层的可用节点。
// 取到有效节点后,将节点标记为unusable,也可以理解为放到“树”的12层(见下图)
setValue(id, unusable); // mark as unusable
// 更新当前节点的父节点状态(见下图)。
updateParentsAlloc(id);
// 返回节点坐标
return id;
}
内存偏移树图的变化
// 取到有效节点后,将节点标记为unusable,也可以理解为放到“树”的12层(见下图)
setValue(id, unusable); // mark as unusable
// 更新当前节点的父节点状态(见下图)。
updateParentsAlloc(id);
以下为上面两行代码的流程体现:
- id=1024的块设置成不可用之后,根据Netty自顶向下的算法,下一层0 ~ 8kb和8 ~ 16kb是不会被遍历到的,所以当前层的下层不用更新状态。
为什么当前节点不可用就要自底向上的更新父节点的状态呢?这是因为:假如我这个chunk一开始是完整的16MB,现在有个bytebuf1拿走了16KB。之后有个bytebuf2需要申请完整的16MB内存,如果前面chunk的状态没有改变,那就会拿走整块chunk,这样就造成了两个(或多个)bytebuf拿到部分相同内存,之后bytebuf读写的时候就“撞车”了。
好了,拿到chunk分配的内存后,就需要拿这个内存初始化ByteBuf了。
初始化ByteBuf
其实这里初始化ByteBuf和之前博客【命中缓存】中的初始化ByteBuf是一样的,下面再跟一遍就当是温故而知新吧。
chunk分配完内存后,我们就拿到一个“偏移量”值handle了,返回【坐标1】的代码,开始初始化ByteBuf。
回到如下代码:
// 尝试获取符合需求的内存“偏移量”
long handle = c.allocate(normCapacity);
++allocationsNormal;
assert handle > 0;
// 上面的代码刚刚执行完。
// 初始化ByteBuf
c.initBuf(buf, handle, reqCapacity);
// 检测chunklist容量、添加chunk至chunklist等。
qInit.add(c);
跟进initBuf方法:
io.netty.buffer.PoolChunk#initBuf
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
// subpage、page级别都有的"偏移量",相对于chunk的"偏移量"
// 此处为1024(结合实验代码)
int memoryMapIdx = memoryMapIdx(handle);
// subpage级别的"偏移量",相对于page的"偏移量"
// 因为这个偏移量是相对于page的,所以page级别下这个值必定为0
int bitmapIdx = bitmapIdx(handle);
// page级别
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
// 初始化
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
// subpage级别
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
跟进init方法:
io.netty.buffer.PooledUnsafeDirectByteBuf#init
@Override
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
// unsafe类型的ByteBuf特有,需要记录内存相关信息。
initMemoryAddress();
}
跟进super.init:
io.netty.buffer.PooledByteBuf#init
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
// 保存所属chunk
this.chunk = chunk;
// subpage
this.handle = handle;
// 内存对象
memory = chunk.memory;
// subpage相对于page的偏移量
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
// 保存所属PoolThreadCache
this.cache = cache;
}
当ByteBuf初始化完后,还会调整当前chunk的位置到chunkList的头结点(类似LRU),即【坐标1】的qInit.add(c)
所做的事情,这个代码很简单,有兴趣的自己跟进下吧。