目录 allocHandler的实例化过程 byteBuf = allocHandle.allocate(allocator); doReadBytes方法 正文 在上一篇文章中,我们分析了processSelectedKey这个方法中的accept过程,本文将分析一下work线程中的read过程。 复制代码 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //检查该SelectionKey是否有效,如果无效,则关闭channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // Connection already closed - no need to handle write. return; } } // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } 复制代码 该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况 1)OP_ACCEPT,接受客户端连接 2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。 3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。 4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。 本篇博文主要来看下当work 线程 selector检测到OP_READ事件时,内部干了些什么。 复制代码 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // Connection already closed - no need to handle write. return; } } 复制代码 从代码中可以看到,当selectionKey发生的事件是SelectionKey.OP_READ,执行unsafe的read方法。注意这里的unsafe是NioByteUnsafe的实例 为什么说这里的unsafe是NioByteUnsafe的实例呢?在上篇博文Netty源码分析:accept中我们知道Boss NioEventLoopGroup中的NioEventLoop只负责accpt客户端连接,然后将该客户端注册到Work NioEventLoopGroup中的NioEventLoop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的实例 下面来看下NioByteUnsafe中的read方法 复制代码 @Override public void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { //1、分配缓存 byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();//可写的字节容量 //2、将socketChannel数据写入缓存 int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } //3、触发pipeline的ChannelRead事件来对byteBuf进行后续处理 pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } } 复制代码 下面一一介绍比较重要的代码 回到顶部 allocHandler的实例化过程 allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allocHandler的实例化过程 复制代码 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } 复制代码 其中, config.getRecvByteBufAllocator()得到的是一个 AdaptiveRecvByteBufAllocator实例DEFAULT。 public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator(); 而AdaptiveRecvByteBufAllocator中的newHandler()方法的代码如下: 复制代码 @Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; } 复制代码 其中,上面方法中所用到参数:minIndex maxIndex initial是什么意思呢?含义如下:minIndex是最小缓存在SIZE_TABLE中对应的下标。maxIndex是最大缓存在SIZE_TABLE中对应的下标,initial为初始化缓存大小。 AdaptiveRecvByteBufAllocator的相关常量字段 复制代码 public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1; private static final int[] SIZE_TABLE; 复制代码 上面这些字段的具体含义说明如下: 1)、SIZE_TABLE:按照从小到大的顺序预先存储可以分配的缓存大小。 从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。SIZE_TABLE初始化过程如下。 复制代码 static { List sizeTable = new ArrayList(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } } 复制代码 2)、DEFAULT_MINIMUM:最小缓存(64),在SIZE_TABLE中对应的下标为3。 3)、DEFAULT_MAXIMUM :最大缓存(65536),在SIZE_TABLE中对应的下标为38。 4)、DEFAULT_INITIAL :初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。 5)、INDEX_INCREMENT:上次预估缓存偏小,下次index的递增值。 6)、INDEX_DECREMENT :上次预估缓存偏大,下次index的递减值。 构造函数: 复制代码 private AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { if (minimum <= 0) { throw new IllegalArgumentException("minimum: " + minimum); } if (initial < minimum) { throw new IllegalArgumentException("initial: " + initial); } if (maximum < initial) { throw new IllegalArgumentException("maximum: " + maximum); } int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; } 复制代码 该构造函数对参数进行了有效性检查,然后初始化了如下3个字段,这3个字段就是上面用于产生allocHandle对象所要用到的参数。 复制代码 private final int minIndex; private final int maxIndex; private final int initial; 复制代码 其中,getSizeTableIndex函数的代码如下,该函数的功能为:找到SIZE_TABLE中的元素刚好大于或等于size的位置。 复制代码 private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { //这里的情况就是 a < size <= b 的情况 return mid + 1; } } } 复制代码 回到顶部 byteBuf = allocHandle.allocate(allocator); 申请一块指定大小的内存 AdaptiveRecvByteBufAllocator#HandlerImpl 复制代码 @Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(nextReceiveBufferSize); } 复制代码 直接调用了ioBuffer方法,继续看 AbstractByteBufAllocator.java 复制代码 @Override public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); } 复制代码 ioBuffer函数中主要逻辑为:看平台是否支持unsafe,选择使用直接物理内存还是堆上内存。先看 heapBuffer AbstractByteBufAllocator.java 复制代码 @Override public ByteBuf heapBuffer(int initialCapacity) { return heapBuffer(initialCapacity, Integer.MAX_VALUE); } @Override public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); } 复制代码 这里的newHeapBuffer有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator,则直接返回一个UnpooledHeapByteBuf对象。 UnpooledByteBufAllocator.java 复制代码 @Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } 复制代码 PooledByteBufAllocator.java 复制代码 @Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena heapArena = cache.heapArena; ByteBuf buf; if (heapArena != null) { buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); } 复制代码 再看directBuffer AbstractByteBufAllocator.java 复制代码 @Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, Integer.MAX_VALUE); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity);//参数的有效性检查 return newDirectBuffer(initialCapacity, maxCapacity); } 复制代码 与newHeapBuffer一样,这里的newDirectBuffer方法也有两种实现:至于具体用哪一种,取决于我们对系统属性io.netty.allocator.type的设置,如果设置为: “pooled”,则缓存分配器就为:PooledByteBufAllocator,进而利用对象池技术进行内存分配。如果不设置或者设置为其他,则缓存分配器为:UnPooledByteBufAllocator。这里主要看下UnpooledByteBufAllocator. newDirectBuffer的内部实现 UnpooledByteBufAllocator.java 复制代码 @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); } 复制代码 UnpooledUnsafeDirectByteBuf是如何实现缓存管理的?对Nio的ByteBuffer进行了封装,通过ByteBuffer的allocateDirect方法实现缓存的申请。 复制代码 protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); //省略了部分参数检查的代码 this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity)); } 复制代码 复制代码 protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; capacity = buffer.remaining(); } 复制代码 上面代码的主要逻辑为: 1、先利用ByteBuffer的allocateDirect方法分配了大小为initialCapacity的缓存 2、然后判断将旧缓存给free掉 3、最后将新缓存赋给字段buffer上 其中:memoryAddress = PlatformDependent.directBufferAddress(buffer