netty源码解解析(4.0)-6 线程模型-IO线程EventLoopGroup和NIO实现(一)

接口定义 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 说明 ChannelFuture register(Channel channel) 把一个channel注册到一个EventLoop ChannelFuture register(Channel channel, ChannelPromise promise); 同上 io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup 方法 说明 EventLoopGroup parent() 得到创建这个eventLoop的EventLoopGroup EventLoopGroup定义的主要方法是register, 这个方法的语义是把channel和eventLoop绑定在一起。一个channel对应一个eventLoop, 一个eventLoop会持有多个channel。 I/O线程EventLoopGroup的抽象实现 io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop 两个类主功能都是实现了EventLoopGroup定义的register方法 MultithreadEventLoopGroup public ChannelFuture register(Channel channel) { return next().register(channel); } public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); } SingleThreadEventLoop public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final Channel channel, final ChannelPromise promise) { channel.unsafe().register(this, promise); return promise; } register的实现主要是为了调用Channel.Unsafe实例的register方法。 NIO实现 io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop NioEventLoopGroup是在MultithreadEventLoopGroup基础上实现了对JDK NIO Selector的封装, 它实现以下几个功能: 创建selector 在selector上注册channel感兴趣的NIO事件 实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程。 把NIO事件转换成对channel unsafe的调用或NioTask的调用 控制线程执行I/O操作和排队任务的用时比例 处理epoll selector cpu 100%的bug 下面来具体分析这几个功能的实现。 创建Selector NioEventLoop#openSelector()实现了创建selector的功能,默认情况下,使用SelectorProvider#openSelector()方法创建一个新个selector: final Selector unwrappedSelector = provider.openSelector(); 如果设置环境变量io.netty.noKeySetOptimization=true, 会创建一个selectedKeySet = new SelectedSelectionKeySet(), 然后使用java的反射机制把selector的selectedKeys和publicSelectedKeys替换成selectedKeySet,具体步骤是: 1.得到selector的真正类型: sun.nio.ch.SelectorImpl Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); final Class selectorImplClass = (Class) maybeSelectorImplClass; 2.替换selector是属性unwrappedSelector Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); 之所以会设计一个这样的优化选项,是因为一般情况下调用完selector的select或selectNow方法后需要调用Selector#selectedKeys()得到触发NIO事件的的SelectableChannel,这样优化之后,可以直接从selectedKeySet中得到已经触发了NIO事件的SelectableChannel。 在selector上注册channel感兴趣的NIO事件 NioEventLoop提供了unwrappedSelector方法,这个方法返回了它创建好的Selector实例。这样任何的外部类都可以把任意的SelectableChannel注册到这selector上。在AbstractNioChannel中, doRegister方法的实现就是使用了这个方法: selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 另外,它还提供了一个register方法: public void register(final SelectableChannel ch, final int interestOps, final NioTask task) 这个方法会把task当成SelectableChannel的附件注册到selector上: ch.register(selector, interestOps, task); 实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程 在NioEventLoop的run方法中实现NIO事件和EventExecutor的任务处理逻辑,这个run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定义。在上一章中,我们看到了DefaultEventExecutor中是如何实现这个run方法的,这里我们将要看到这run方法的另一个实现。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不仅要及时地执行taskQueue中的任务,还要能及时地处理NIO事件,因此它会同时检查selector中的NIO事件和和taskQueue队列,任何一个中有事件需要处理或有任务需要执行,它不会阻塞线程。同时它也保证了在没有NIO事件和任务的情况下线程不会无谓的空转浪费CUP资源。 run主要实现如下,为了更清晰的说明它的主要功能,我对原来的代码进行了一些删减。 for(;;){ try{ //phase1: 同时检查NIO事件和任务 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); //在taskQueue中没有任务的时候执行select } //phase2: 进入处理NIO事件,执行executor任务 try{ //处理NIO事件 processSelectedKeys(); }finally{ //处理taskQueu中的任务 runAllTasks(); } }catch(Throwable t){ handleLoopException(t); } } run方法有两个阶段构成: phase1: 检查NIO事件或executor任务,如果有任何的NIO事件或executor任务进入phase2。 这样阶段的主要工作在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。 selectStrategy.calculateStrategy实现 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()) 这行代码的含义是: 如果hasTasks() == true, 调用以下selector#selectNow, 然后进入phase2。 否则调用select。这里使用了strategy模式,默认的strategy实现是io.netty.channe.DefaultSelectStrategy implements SelectStrategy @Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } DefaultSelectStrategy实现了SelectStrategy接口,这接口定义了两个常量: int SELECT = -1; int CONTINUE = -2; 运行时selectSuppler参数传入的是selectNowSupplier, 它的实现如下: private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } }; 这里的get方法调用了selectNow, selectNow调用的是Selector#selectNew方法,这个方法的返回值是>=0。 hashTasks的传入的参数是hasTask()的返回值: return !taskQueue.isEmpty(); 代码读到这里就会发现,使用默认的的SelectStrategy实现,calculateStrategy在hasTasks()==true时返回值>=0, hasTasks() == false时返回值是SelectStrategy.SELECT,不会返回SelectStrategy.CONTINUE。 select实现 select的执行逻辑是: 1. 计算超select方法的结束时间selectDeadLineNanos long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 2. 进入循环,检查超时--超时跳出循环。 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } 3. 如果在select执行过程中有executor任务提交或可以当前的wakeUp由false变成true, 跳出循环 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } 4. 调用selector#select等待NIO事件。 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; 5. 如果满足这些条件的任何一个,跳出循环: 有NIO事件、wakeUp的新旧值都是true、taskQueue中有任务、有定时任务到期。 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } 6. 如果线程被中断,跳出循环。 if (Thread.interrupted()) { break; } 7. 如果selector.select超时,没有检查到任何NIO事件, 会在下次循环开始时跳出循环。 如果每次超时,跳到第2步继续下一次循环。 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } currentTimeNanos = time; select 最迟会在当前时间>= selectDeadLineNanos时返回,这个时间是最近一个到期的定时任务执行的时间,换言之如果没有任何的NIO事件或executor任务,select会在定时任务到期时返回。如果没有定时任务,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select会在检查到任何NIO事件或executor任务时返回,为了保证这点,在selector.select(timeoutMillis)前后都会调用hasTasks检查executor任务,为了能在调用executet提交任务时唤醒selector.select,NioEventLoop覆盖了SingleThreadEventExecutor的wake方法: protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } 这个方法会及时的唤醒selector.select, 保证新提交的任务可以得到及时的执行。 phase2: 进入处理NIO事件,执行executor任务 这个阶段是先调用processSelectedKeys()处理NIO事件,然后掉用 runAllTasks()处理所有已经到期的定时任务和已经在排队的任务。这个阶段还实现了NIO事件和executor任务的用时比例管理,这个特性稍后会详细分析。 标签: java, netty, 后台开发, 架构设计https://www.cnblogs.com/brandonli/p/10100139.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信