目录 netty中的reactor线程 新连接的建立 检测到有新连接进入 注册到reactor线程 doReadMessages() pipeline.fireChannelRead(NioSocketChannel) 注册读事件 总结 正文 通读本文,你会了解到 1.netty如何接受新的请求 2.netty如何给新请求分配reactor线程 3.netty如何给每个新连接增加ChannelHandler 回到顶部 netty中的reactor线程 netty中最核心的东西莫过于两种类型的reactor线程,可以看作netty中两种类型的发动机,驱动着netty整个框架的运转 一种类型的reactor线程是boos线程组,专门用来接受新的连接,然后封装成channel对象扔给worker线程组;还有一种类型的reactor线程是worker线程组,专门用来处理连接的读写 不管是boos线程还是worker线程,所做的事情均分为以下三个步骤 轮询注册在selector上的IO事件 处理IO事件 执行异步task 对于boos线程来说,第一步轮询出来的基本都是 accept 事件,表示有新的连接,而worker线程轮询出来的基本都是read/write事件,表示网络的读写事件 回到顶部 新连接的建立 简单来说,新连接的建立可以分为三个步骤 1.检测到有新的连接 2.将新的连接注册到worker线程组 3.注册新连接的读事件 检测到有新连接进入 我们已经知道,当服务端绑启动之后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生 NioEventLoop.java 复制代码 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //检查该SelectionKey是否有效,如果无效,则关闭channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // Connection already closed - no need to handle write. return; } } // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } 复制代码 该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况 1)OP_ACCEPT,接受客户端连接 2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。 3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。 4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。 本篇博文主要来看下当boss线程 selector检测到OP_ACCEPT事件时,内部干了些什么。 复制代码 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件 // Connection already closed - no need to handle write. return; } } 复制代码 boos reactor线程已经轮询到 SelectionKey.OP_ACCEPT 事件,说明有新的连接进入,此时将调用channel的 unsafe来进行实际的操作,此时的channel为 NioServerSocketChannel,则unsafe为NioServerSocketChannel的属性NioMessageUnsafe 那么,我们进入到它的read方法,进入新连接处理的第二步 注册到reactor线程 NioMessageUnsafe.java 复制代码 private final List readBuf = new ArrayList(); public void read() { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } } while (allocHandle.continueReading()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); } 复制代码 调用 doReadMessages 方法不断地读取消息,用 readBuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.fireChannelRead(),将每条新连接经过一层服务端channel的洗礼,之后清理容器,触发 pipeline.fireChannelReadComplete() 下面我们具体看下这两个方法 1.doReadMessages(List) 2.pipeline.fireChannelRead(NioSocketChannel) doReadMessages() 复制代码 protected int doReadMessages(List buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } 复制代码 我们终于窥探到netty调用jdk底层nio的边界 javaChannel().accept();,由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept方法是立即返回的,返回jdk底层nio创建的一条channel ServerSocketChannel有阻塞和非阻塞两种模式: a、阻塞模式:ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。 b、非阻塞模式:,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null. 在NioServerSocketChannel的构造函数分析中,我们知道,其通过ch.configureBlocking(false);语句设置当前的ServerSocketChannel为非阻塞的。 netty将jdk的 SocketChannel 封装成自定义的 NioSocketChannel,加入到list里面,这样外层就可以遍历该list,做后续处理 从上一篇文章中,我们已经知道服务端的创建过程中会创建netty中一系列的核心组件,包括pipeline,unsafe等等,那么,接受一条新连接的时候是否也会创建这一系列的组件呢? 带着这个疑问,我们跟进去 NioSocketChannel.java 复制代码 public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } 复制代码 我们重点分析 super(parent, socket),NioSocketChannel的父类为 AbstractNioByteChannel AbstractNioByteChannel.java 复制代码 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } 复制代码 这里,我们看到jdk nio里面熟悉的影子—— SelectionKey.OP_READ,一般在原生的jdk nio编程中,也会注册这样一个事件,表示对channel的读感兴趣 我们继续往上,追踪到AbstractNioByteChannel的父类 AbstractNioChannel, 这里,我相信读了上一篇文章你对于这部分代码肯定是有印象的 复制代码 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } 复制代码 在创建服务端channel的时候,最终也会进入到这个方法,super(parent), 便是在AbstractChannel中创建一系列和该channel绑定的组件,如下 复制代码 protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } 复制代码 而这里的 readInterestOp 表示该channel关心的事件是 SelectionKey.OP_READ,后续会将该事件注册到selector,之后设置该通道为非阻塞模式,在channel中创建 unsafe 和一条 pipeline pipeline.fireChannelRead(NioSocketChannel) 对于 pipeline我们前面已经了解过,在netty的各种类型的channel中,都会包含一个pipeline,字面意思是管道,我们可以理解为一条流水线工艺,流水线工艺有起点,有结束,中间还有各种各样的流水线关卡,一件物品,在流水线起点开始处理,经过各个流水线关卡的加工,最终到流水线结束 对应到netty里面,流水线的开始就是HeadContxt,流水线的结束就是TailConext,HeadContxt中调用Unsafe做具体的操作,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告 通过前面的文章中,我们已经知道在服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 ServerBootstrapAcceptor, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,我们就来看下ServerBootstrapAcceptor ServerBootstrapAcceptor.java 复制代码 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry, Object>[] childOptions; private final Entry, Object>[] childAttrs; ServerBootstrapAcceptor( EventLoopGroup childGroup, ChannelHandler childHandler, Entry, Object>[] childOptions, Entry, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; } public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry, Object> e: childAttrs) { child.attr((AttributeKey) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } } 复制代码 前面的 pipeline.fireChannelRead(NioSocketChannel); 最终通过head->unsafe->ServerBootstrapAcceptor的调用链,调用到这里的 ServerBootstrapAcceptor 的channelRead方法,而 channelRead 一上来就把这里的msg强制转换为 Channel 然后,拿到该channel,也就是我们之前new出来的 NioSocketChannel中对应的pipeline,将用户代码中的 childHandler,添加到pipeline,这里的 childHandler 在用户代码中的体现为 复制代码 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); 复制代码 其实对应的是 ChannelInitializer,到了这里,NioSocketChannel中pipeline对应的处理器为 head->ChannelInitializer->tail,牢记,后面会再次提到! 接着,设置 NioSocketChannel 对应的 attr和option,然后进入到 childGroup.register(child),这里的childGroup就是我们在启动代码中new出来的NioEventLoopGroup 我们进入到NioEventLoopGroup的register方法,代理到其父类MultithreadEventLoopGroup MultithreadEventLoopGroup.java 复制代码 public ChannelFuture register(Channel channel) { return next().register(channel); } 复制代码 这里又扯出来一个 next()方法,我们跟进去 MultithreadEventLoopGroup.java 复制代码 @Override public EventLoop next() { return (EventLoop) super.next(); } 复制代码 回到其父类 MultithreadEventExecutorGroup.java 复制代码 @Override public EventExecutor next() { return chooser.next(); } 复制代码 这里的chooser对应的类为 EventExecutorChooser,字面意思为事件执行器选择器,放到我们这里的上下文中的作用就是从worker reactor线程组中选择一个reactor线程 复制代码 public interface EventExecutorChooserFactory { /** * Returns a new {@link EventExecutorChooser}. */ EventExecutorChooser newChooser(EventExecutor[] executors); /** * Chooses the next {@link EventExecutor} to use. */ @UnstableApi interface EventExecutorChooser { /** * Returns the new {@link EventExecutor} to use. */ EventExecutor next(); } } 复制代码 chooser的实现有两种 复制代码 public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTowEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } } 复制代码 默认情况下,chooser通过 DefaultEventExecutorChooserFactory被创建,在创建reactor线程选择器的时候,会判断reactor线程的个数,如果是2的幂,就创建PowerOfTowEventExecutorChooser,否则,创建GenericEventExecutorChooser 两种类型的选择器在选择reactor线程的时候,都是通过Round-Robin的方式选择reactor线程,唯一不同的是,PowerOfTowEventExecutorChooser是通过与运算,而GenericEventExecutorChooser是通过取余运算,与运算的效率要高于求余运算 选择完一个reactor线程,即 NioEventLoop 之后,我们回到注册的地方 复制代码 public ChannelFuture register(Channel channel) { return next().register(channel); } 复制代码 SingleThreadEventLoop.java 复制代码 @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel