文章目录


前言

本节来看看在没有命中缓存的情况下,page级别的内存是如何分配的,还会提及page级别的数据结构(chunk中的内存偏移树图)。命中缓存的分配流程可参考【PooledByteBufAllocator命中缓存的分配流程】

如果对本文的一些基础概念、名词不是很清楚,可以参考【ByteBuf的结构、分类、核心api简介】【内存规格、缓存&结构、chunk、arena、page、subpage等概念介绍】,本文对基础概念也不会再一一赘述了。

Netty Version:4.1.6


实验代码

TestPage.java

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

/**
 * @author WenJie
 */
public class TestPage{
    public static void main(String[] args) {
        int page = 1024 * 8;
        // 获取PooledByteBufAllocator实例。
        PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;

        // 先创建一个ByteBuf,申请16kb内存,即两个页
        ByteBuf byteBuf1 = allocator.directBuffer(2 * page);
        // 回收ByteBuf,这一步会缓存内存、将ByteBuf对象扔进对象池,详细等后边博客更新。
        byteBuf1.release();

    }
}


跟进源码

把断点打在这里:
打断点.png

跟进directBuffer方法:
io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)

    @Override
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, Integer.MAX_VALUE);
    }

继续跟进directBuffer方法:
io.netty.buffer.AbstractByteBufAllocator#directBuffer(int, int)

    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        // 验证参数
        validate(initialCapacity, maxCapacity);
        return newDirectBuffer(initialCapacity, maxCapacity);
    }

跟进newDirectBuffer方法:
io.netty.buffer.PooledByteBufAllocator#newDirectBuffer

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        // 对下面两行不熟悉的,可以参考前言提到的博客
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        ByteBuf buf;
        // 除非特殊情况,否则directArena都不为null
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            // 一般情况下都能拿到jdk的unsafe,返回true。
            if (PlatformDependent.hasUnsafe()) {
                buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
        }

        // 检测ByteBuf
        return toLeakAwareBuffer(buf);
    }

跟进directArena.allocate:
io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        // 创建/复用ByteBuf对象
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        // 给ByteBuf分配内存、初始化。
        allocate(cache, buf, reqCapacity);
        return buf;
    }
  • PooledByteBuf buf = newByteBuf(maxCapacity);就是复用/创建ByteBuf对象,在【上一节】讲过了,这里就直接跳。

跟进allocate方法(这次全部贴出,代码很长):
io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf, int)

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        // 数值规格化,https://wenjie.store/archives/about-bytebuf-4有跟进过
        final int normCapacity = normalizeCapacity(reqCapacity);
        // 判断是够是tiny或者small规格
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                // 尝试从tiny规格缓存拿到内存分配
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                // 尝试从small规格缓存拿到内存分配
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * 同步锁锁住head节点,防止构造的双向链表指针覆盖
             */
            synchronized (head) {
                // 获取头结点的下一个节点。
                // 如果双向链表中只有head节点,那head节点就是指向自己
                final PoolSubpage<T> s = head.next;
                // 如果双向链表不止head一个元素。
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    // 拿到内存“偏移量”
                    long handle = s.allocate();
                    assert handle >= 0;
                    // subpage级别内存分配
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity); 
  
                    // 计数器计数
                    if (tiny) {
                        allocationsTiny.increment();
                    } else {
                        allocationsSmall.increment();
                    }
                    return;
                }
            }
            // subpage级别内存分配(前提是small规格)
            allocateNormal(buf, reqCapacity, normCapacity);
            return;
        }
        // 由于我们的代码申请的内存规格是normal
        // 所以会来到一下代码。
        if (normCapacity <= chunkSize) {
            // 尝试从normal规格缓存分配内存
            // 由于我们代码是第一次,所以不会有缓存,返回false
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            // 通过Arena分配page级别内存
            allocateNormal(buf, reqCapacity, normCapacity);
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
  • 如果上面注释的名词看不懂,同样建议看看前言提到的两篇博客。

为了确实证明没有走缓存,放张断点图:
断点图.png


跟进allocateNormal方法,此处【坐标1】
io.netty.buffer.PoolArena#allocateNormal

    private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        // 先尝试从已经获取到系统内存、不同占用率的chunklist中分配内存
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            ++allocationsNormal;
            return;
        }

        // 如果没有符合要求的chunklist或者程序刚启动时为空,就会创建一个。
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        // 尝试获取符合需求的内存“偏移量”
        long handle = c.allocate(normCapacity);
        ++allocationsNormal;
        assert handle > 0;
        // 初始化ByteBuf
        c.initBuf(buf, handle, reqCapacity);
        // 检测chunklist容量、添加chunk至chunklist等。
        qInit.add(c);
    }
  • 这里的q050,q025...是什么意思?在讲PoolArena数据结构的时候有讲过,忘记的话可以回去看看,这里就不一一赘述了。

当还有空闲空间的chunk

假设当前有符合要求的点chunkList,那么就会从中取一个chunk分配内存,我们这里取q050.allocate方法跟进看看,其它原理都一样:
io.netty.buffer.PoolChunkList#allocate

    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (head == null || normCapacity > maxCapacity) {
            // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
            // be handled by the PoolChunks that are contained in this PoolChunkList.
            return false;
        }

        for (PoolChunk<T> cur = head;;) {
            // 尝试获取符合需求的内存“偏移量”
            // 这个allocate方法等下也会遇到,下面一块讲
            long handle = cur.allocate(normCapacity);
            // < 0 说明没有
            if (handle < 0) {
                // 取下一个chunk
                cur = cur.next;
                // 没有chunk说明到双向链表结尾了,返回false
                if (cur == null) {
                    return false;
                }
            } else {
                // 获取成功,分配内存初始化ByteBuf并返回给客户端。
                // 这个initBuf其实和【坐标1】的initBuf一样,同样到下面一块讲
                cur.initBuf(buf, handle, reqCapacity);
                // 双向链表满了,放到下一个chunklist。
                if (cur.usage() >= maxUsage) {
                    remove(cur);
                    nextList.add(cur);
                }
                return true;
            }
        }
    }
  • 如果成功拿到chunk并且chunk的剩余内存能满足需求,那就在这里初始化ByteBuf并返回。

如果拿不到可用的chunk,就会进入一下流程。


当需要新建chunk

视角重新转回【坐标1】的代码,来继续看剩余的代码:
io.netty.buffer.PoolArena#allocateNormal

        // 如果没有符合要求的chunklist或者程序刚启动时为空,就会创建一个。
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        // 尝试获取符合需求的内存“偏移量”
        long handle = c.allocate(normCapacity);
        ++allocationsNormal;
        assert handle > 0;
        // 初始化ByteBuf
        c.initBuf(buf, handle, reqCapacity);
        // 检测chunklist容量、添加chunk至chunklist等。
        qInit.add(c);

创建chunk流程

先来看看newChunk方法:
io.netty.buffer.PoolArena.DirectArena#newChunk

        @Override
        protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
            return new PoolChunk<ByteBuffer>(
                    this, allocateDirect(chunkSize),
                    pageSize, maxOrder, pageShifts, chunkSize);
        }
  • 这里的allocateDirect(chunkSize)就是返回jdk底层的DirectByteBuffer对象,即jdk底层向操作系统申请直接内存,默认是16MB,直接验证了【chunk等概念介绍】的结论。
  • allocateDirect方法我就不跟进了,有兴趣的自己补充下,因为底层的底层是native方法,c++比较??的可以继续跟下去。

跟进PoolChunk的构造方法,接下来有重头戏了:
io.netty.buffer.PoolChunk#PoolChunk(io.netty.buffer.PoolArena, T, int, int, int, int)

    PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
        // 是否Pooled的标记
        unpooled = false;
        // 记录所属Arena
        this.arena = arena;
        // 刚刚向操作系统申请到的直接内存的对象
        this.memory = memory;
        // 页大小,默认8192B
        this.pageSize = pageSize;
        // 参与位运算的的一个单位,2^13就是8192B
        this.pageShifts = pageShifts;
        // “树”高度,默认为11,等下你就明白了
        this.maxOrder = maxOrder;
        // chunk的大小,默认为16MB
        this.chunkSize = chunkSize;
        // 不可用标记,即“树”的第十二层
        unusable = (byte) (maxOrder + 1);
        log2ChunkSize = log2(chunkSize);
        // 也是用于参与位运算,默认是-8192
        subpageOverflowMask = ~(pageSize - 1);
        // 当前chunk剩余内存
        freeBytes = chunkSize;

        assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
        // 1 << 11 = 2048
        maxSubpageAllocs = 1 << maxOrder;

        // 构建内存“偏移量”的“树”
        memoryMap = new byte[maxSubpageAllocs << 1];
        depthMap = new byte[memoryMap.length];
        int memoryMapIndex = 1;
        for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
            int depth = 1 << d;
            // for循环从左到右构建“树”
            for (int p = 0; p < depth; ++ p) {
                // in each level traverse left to right and set value to the depth of subtree
                memoryMap[memoryMapIndex] = (byte) d;
                // 用于memoryMap改变以后,统计“树”
                depthMap[memoryMapIndex] = (byte) d;
                memoryMapIndex ++;
            }
        }
        // 每个chunk都有2048个Page
        subpages = newSubpageArray(maxSubpageAllocs);
    }

chunk中的内存偏移树图

下面来看看上面for循环构建完成的“树”是什么样的,相信看完你也能明白树字为什么要打双引号了:
menoryMap结构.png

  • 你可以把它当做是一颗很高的二叉树去理解,因为Netty确实是通过位运算技巧把这个数组当成二叉树一样遍历(只能说太diao了),但务必不要真的当做二叉树,因为源码的数据结构仅仅是一个数组,只是画成类似二叉树的模样方便理解。
  • 相信看完这个图,你应该能理解maxOrder为什么=11了。
  • 第n层的最左节点值为2^n,然后从左到右依次增加,比如第11层的[0 ~ 8]KB节点值为4096,[8 ~ 16]KB的节点的值为4097。
  • 后面根据申请的内存大小,从这里面就取出对应的块,并打上标记,具体过程在下面向chunk申请内存的时候讲到。

既然chunk已经成功向操作系统申请到内存,并且完成初始化了,那么接下来就是要从chunk中取内存了。

向chunk申请page级别内存

视角再转回【坐标1】中的代码,我们接下来要跟这一段:
io.netty.buffer.PoolArena#allocateNormal

// 向chunk申请内存并返回“偏移量”,申请失败则返回-1
long handle = c.allocate(normCapacity);

跟进c.allocate方法:

    long allocate(int normCapacity) {
        if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
            // page级别分页
            return allocateRun(normCapacity);
        } else {
            // subpage级别分页,下篇博客重点讲解。
            return allocateSubpage(normCapacity);
        }
    }
  • 这里说一下,jemalloc将多个page称为run,我的塑料英语又被刷新了~

跟进allocateRun方法:
io.netty.buffer.PoolChunk#allocateRun

    private long allocateRun(int normCapacity) {
        // 根据需要申请的内存大小计算出“树”的第几层
        // 这里d=10,因为实验代码申请16kb,对应“树”的第十层
        int d = maxOrder - (log2(normCapacity) - pageShifts);
        // 取“树”第十层的可用节点,如果实验代码首次执行,则id=1024。
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }
        // 更新chunk的剩余内存大小
        freeBytes -= runLength(id);
        // 返回节点下标
        return id;
    }

跟进allocateNode方法看看,这个方法会更新“树”,还是比较重要的:
io.netty.buffer.PoolChunk#allocateNode

    private int allocateNode(int d) {
        int id = 1;
        int initial = - (1 << d); // has last d bits = 0 and rest all = 1
        byte val = value(id);
        if (val > d) { // unusable
            return -1;
        }
        while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
            id <<= 1;
            val = value(id);
            if (val > d) {
                id ^= 1;
                val = value(id);
            }
        }
        byte value = value(id);
        assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                value, id & initial, d);
        // 以上运算都是在找指定层的可用节点。
        // 取到有效节点后,将节点标记为unusable,也可以理解为放到“树”的12层(见下图)
        setValue(id, unusable); // mark as unusable
        // 更新当前节点的父节点状态(见下图)。
        updateParentsAlloc(id);
        // 返回节点坐标
        return id;
    }

内存偏移树图的变化

        // 取到有效节点后,将节点标记为unusable,也可以理解为放到“树”的12层(见下图)
        setValue(id, unusable); // mark as unusable
        // 更新当前节点的父节点状态(见下图)。
        updateParentsAlloc(id);

以下为上面两行代码的流程体现:

流程.png

  • id=1024的块设置成不可用之后,根据Netty自顶向下的算法,下一层0 ~ 8kb和8 ~ 16kb是不会被遍历到的,所以当前层的下层不用更新状态。

为什么当前节点不可用就要自底向上的更新父节点的状态呢?这是因为:假如我这个chunk一开始是完整的16MB,现在有个bytebuf1拿走了16KB。之后有个bytebuf2需要申请完整的16MB内存,如果前面chunk的状态没有改变,那就会拿走整块chunk,这样就造成了两个(或多个)bytebuf拿到部分相同内存,之后bytebuf读写的时候就“撞车”了。

好了,拿到chunk分配的内存后,就需要拿这个内存初始化ByteBuf了。


初始化ByteBuf

其实这里初始化ByteBuf和之前博客【命中缓存】中的初始化ByteBuf是一样的,下面再跟一遍就当是温故而知新吧。

chunk分配完内存后,我们就拿到一个“偏移量”值handle了,返回【坐标1】的代码,开始初始化ByteBuf。

回到如下代码:

        // 尝试获取符合需求的内存“偏移量”
        long handle = c.allocate(normCapacity);
        ++allocationsNormal;
        assert handle > 0;
        // 上面的代码刚刚执行完。
        // 初始化ByteBuf
        c.initBuf(buf, handle, reqCapacity);
        // 检测chunklist容量、添加chunk至chunklist等。
        qInit.add(c);

跟进initBuf方法:
io.netty.buffer.PoolChunk#initBuf

    void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
        // subpage、page级别都有的"偏移量",相对于chunk的"偏移量"
        // 此处为1024(结合实验代码)
        int memoryMapIdx = memoryMapIdx(handle);
        // subpage级别的"偏移量",相对于page的"偏移量"
        // 因为这个偏移量是相对于page的,所以page级别下这个值必定为0
        int bitmapIdx = bitmapIdx(handle);
        // page级别
        if (bitmapIdx == 0) {
            byte val = value(memoryMapIdx);
            assert val == unusable : String.valueOf(val);
            // 初始化
            buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
                     arena.parent.threadCache());
        } else {
            // subpage级别
            initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
        }
    }

跟进init方法:
io.netty.buffer.PooledUnsafeDirectByteBuf#init

    @Override
    void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
              PoolThreadCache cache) {
        super.init(chunk, handle, offset, length, maxLength, cache);
        // unsafe类型的ByteBuf特有,需要记录内存相关信息。
        initMemoryAddress();
    }

跟进super.init:
io.netty.buffer.PooledByteBuf#init

    void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;
        // 保存所属chunk
        this.chunk = chunk;
        // subpage
        this.handle = handle;
        // 内存对象
        memory = chunk.memory;
        // subpage相对于page的偏移量
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;
        // 保存所属PoolThreadCache
        this.cache = cache;
    }

当ByteBuf初始化完后,还会调整当前chunk的位置到chunkList的头结点(类似LRU),即【坐标1】qInit.add(c)所做的事情,这个代码很简单,有兴趣的自己跟进下吧。