(8)---ChannelPipeline概念理解

nelPipeline不是单独存在,它肯定会和Channel、ChannelHandler、ChannelHandlerContext关联在一起,所以有关概念这里一起讲。 一、ChannelHandler 1、概念 先看图 ChannelHandler下主要是两个子接口 ChannelInboundHandler(入站): 处理输入数据和Channel状态类型改变。 适配器: ChannelInboundHandlerAdapter(适配器设计模式) 常用的: SimpleChannelInboundHandler ChannelOutboundHandler(出站): 处理输出数据 适配器: ChannelOutboundHandlerAdapter 每一个Handler都一定会处理出站或者入站(可能两者都处理数据),例如对于入站的Handler可能会继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter, 而SimpleChannelInboundHandler又是继承于ChannelInboundHandlerAdapter,最大的区别在于SimpleChannelInboundHandler会对没有外界引用的资源进行一定的清理, 并且入站的消息可以通过泛型来规定。 这里为什么有设配器模式呢? 我们在写自定义Handel时候,很少会直接实现上面两个接口,因为接口中有很多默认方法需要实现,所以这里就采用了设配器模式,ChannelInboundHandlerAdapter和 ChannelInboundHandlerAdapter就是设配器模式的产物,让它去实现上面接口,实现它所有方法。那么你自己写自定义Handel时,只要继承它,就无须重写上面接口的所有方法了。 2、Channel 生命周期(执行顺序也是从上倒下) (1)channelRegistered: channel注册到一个EventLoop。 (2)channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据 (3)channelInactive: channel处于非活跃状态,没有连接到远程主机 (4)channelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定 3、ChannelHandler 生命周期 handlerAdded: 当 ChannelHandler 添加到 ChannelPipeline 调用 handlerRemoved: 当 ChannelHandler 从 ChannelPipeline 移除时调用 exceptionCaught: 当 ChannelPipeline 执行抛出异常时调用 二、ChannelPipeline 1、概念 先看图 ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据。它提供了一种高级的截取过滤模式(类似serverlet中的filter功能),让用 户可以在ChannelPipeline中完全控制一个事件以及如何处理ChannelHandler与ChannelPipeline的交互。 对于每个新的通道Channel,都会创建一个新的ChannelPipeline,并将器pipeline附加到channel中。 下图描述ChannelHandler与pipeline中的关系,一个io操作可以由一个ChannelInboundHandler或ChannelOutboundHandle进行处理,并通过调用ChannelInboundHandler 处理入站io或通过ChannelOutboundHandler处理出站IO。 2、常用方法 复制代码 addFirst(...) //添加ChannelHandler在ChannelPipeline的第一个位置 addBefore(...) //在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler addAfter(...) //在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler addLast(...) //在ChannelPipeline的末尾添加ChannelHandler remove(...) //删除ChannelPipeline中指定的ChannelHandler replace(...) //替换ChannelPipeline中指定的ChannelHandler 复制代码 ChannelPipeline可以动态添加、删除、替换其中的ChannelHandler,这样的机制可以提高灵活性。示例: 复制代码 1. ChannelPipeline pipeline = ch.pipeline(); 2. FirstHandler firstHandler = new FirstHandler(); 3. pipeline.addLast("handler1", firstHandler); 4. pipeline.addFirst("handler2", new SecondHandler()); 5. pipeline.addLast("handler3", new ThirdHandler()); 6. pipeline.remove("“handler3“"); 7. pipeline.remove(firstHandler); 8. pipeline.replace("handler2", "handler4", new FourthHandler()); 复制代码 3、入站出站Handler执行顺序 一般的项目中,inboundHandler和outboundHandler有多个,在Pipeline中的执行顺序? 重点记住: InboundHandler顺序执行,OutboundHandler逆序执行。 问题: 下面的handel的执行顺序? 复制代码 ch.pipeline().addLast(new InboundHandler1()); ch.pipeline().addLast(new OutboundHandler1()); ch.pipeline().addLast(new OutboundHandler2()); ch.pipeline().addLast(new InboundHandler2()); 或者: ch.pipeline().addLast(new OutboundHandler1()); ch.pipeline().addLast(new OutboundHandler2()); ch.pipeline().addLast(new InboundHandler1()); ch.pipeline().addLast(new InboundHandler2()); 复制代码 其实上面的执行顺序都是一样的: InboundHandler1--> InboundHandler2 -->OutboundHandler2 -->OutboundHandler1 结论 1)InboundHandler顺序执行,OutboundHandler逆序执行 2)InboundHandler之间传递数据,通过ctx.fireChannelRead(msg) 3)InboundHandler通过ctx.write(msg),则会传递到outboundHandler 4) 使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行; 但是使用channel.write(msg)、pipline.write(msg)情况会不一致,都会执行,那是因为channel和pipline会贯穿整个流。 5) outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先outbound再inbound,服务端则相反。 三、ChannelHandlerContext ChannelPipeline并不是直接管理ChannelHandler,而是通过ChannelHandlerContext来间接管理,这一点通过ChannelPipeline的默认实现DefaultChannelPipeline可以看出来。 DefaultChannelHandlerContext和DefaultChannelPipeline是ChannelHandlerContext和ChannelPipeline的默认实现在DefaultPipeline内部 DefaultChannelHandlerContext组成了一个双向链表。 我们看下DefaultChannelPipeline的构造函数: 复制代码 /** * 可以看到,DefaultChinnelPipeline 内部使用了两个特殊的Hander 来表示Handel链的头和尾。 */ public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; TailHandler tailHandler = new TailHandler(); tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler); HeadHandler headHandler = new HeadHandler(channel.unsafe()); head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler); head.next = tail; tail.prev = head; } 复制代码 所以对于DefaultChinnelPipeline它的Handel头部和尾部的Handel是固定的,我们所添加的Handel是添加在这个头和尾之前的Handel。(下面这个图更加清晰) 四、几者关系 先大致说下什么是Channel 通常来说, 所有的 NIO 的 I/O 操作都是从 Channel 开始的. 一个 channel 类似于一个 stream。在Netty中,Channel是客户端和服务端建立的一个连接通道。 虽然java Stream 和 NIO Channel都是负责I/O操作,但他们还是有许多区别的: 1)我们可以在同一个 Channel 中执行读和写操作, 然而同一个 Stream 仅仅支持读或写。 2)Channel 可以异步地读写, 而 Stream 是阻塞的同步读写。 3)Channel 总是从 Buffer 中读取数据, 或将数据写入到 Buffer 中。 几者的关系图如下: 总结: 一个Channel包含一个ChannelPipeline,创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的。 这点从源码中就可以看出,我之前写的博客里有说到:【Netty】5 源码 Bootstrap。每一个ChannelPipeline中可以包含多个ChannelHandler。所有ChannelHandler 都会顺序加入到ChannelPipeline中,ChannelHandler实例与ChannelPipeline之间的桥梁是ChannelHandlerContext实例。 五、整个传播流程 现在将上面的整个传播流程,通过源码大致走一遍。 为了搞清楚事件如何在Pipeline里传播,让我们从Channel的抽象子类AbstractChannel开始,下面是AbstractChannel#write()方法的实现: 复制代码 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { // ... @Override public Channel write(Object msg) { return pipeline.write(msg); } // ... } 复制代码 AbstractChannel直接调用了Pipeline的write()方法: 再看DefaultChannelPipeline的write()方法实现: 复制代码 final class DefaultChannelPipeline implements ChannelPipeline { // ... @Override public ChannelFuture write(Object msg) { return tail.write(msg); } // ... } 复制代码 因为write是个outbound事件,所以DefaultChannelPipeline直接找到tail部分的context,调用其write()方法: 接着看DefaultChannelHandlerContext的write()方法 复制代码 final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { // ... @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } validatePromise(promise, true); write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(); next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } private DefaultChannelHandlerContext findContextOutbound() { DefaultChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } private void invokeWrite(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } // ... } 复制代码 context的write()方法沿着context链往前找,直至找到一个outbound类型的context为止,然后调用其invokeWrite()方法 invokeWrite()接着调用handler的write()方法 最后看看ChannelOutboundHandlerAdapter的write()方法实现: 复制代码 public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { // ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } // ... } 复制代码 默认的实现调用了context的write()方法而不做任何处理,这样write事件就沿着outbound链继续传播: 可见,Pipeline的事件传播,是靠Pipeline,Context和Handler共同协作完成的。 参考 Netty4学习笔记-- ChannelPipeline(非常感谢作者分享,让我对事件传播有了更清晰的认识) 如果一个人充满快乐,正面的思想,那么好的人事物就会和他共鸣,而且被他吸引过来。同样,一个人老带悲伤,倒霉的事情也会跟过来。 ——在自己心情低落的时候,告诫自己不要把负能量带给别人。(大校18) https://www.cnblogs.com/qdhxhz/p/10234908.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信