Netty源码分析 (四)----- ChannelPipeline
2019-09-10 15:47
目录
pipeline 初始化
pipeline添加节点
检查是否有重复handler
创建节点
添加节点
回调用户方法
pipeline删除节点
找到待删除的节点
调整双向链表指针删除
回调用户函数
总结
正文
netty在服务端端口绑定和新连接建立的过程中会建立相应的channel,而与channel的动作密切相关的是pipeline这个概念,pipeline像是可以看作是一条流水线,原始的原料(字节流)进来,经过加工,最后输出
回到顶部
pipeline 初始化
在上一篇文章中,我们已经知道了创建NioSocketChannel的时候会将netty的核心组件创建出来
pipeline是其中的一员,在下面这段代码中被创建
复制代码
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
复制代码
复制代码
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
复制代码
NioSocketChannel中保存了pipeline的引用
DefaultChannelPipeline
复制代码
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
pipeline中保存了channel的引用,创建完pipeline之后,整个pipeline是这个样子的
pipeline中的每个节点是一个ChannelHandlerContext对象,每个context节点保存了它包裹的执行器 ChannelHandler 执行操作所需要的上下文,其实就是pipeline,因为pipeline包含了channel的引用,可以拿到所有的context信息
回到顶部
pipeline添加节点
下面是一段非常常见的客户端代码
复制代码
bootstrap.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});
复制代码
首先,用一个spliter将来源TCP数据包拆包,然后将拆出来的包进行decoder,传入业务处理器BusinessHandler,业务处理完encoder,输出
整个pipeline结构如下
我用两种颜色区分了一下pipeline中两种不同类型的节点,一个是 ChannelInboundHandler,处理inBound事件,最典型的就是读取数据流,加工处理;还有一种类型的Handler是 ChannelOutboundHandler, 处理outBound事件,比如当调用writeAndFlush()类方法时,就会经过该种类型的handler
不管是哪种类型的handler,其外层对象 ChannelHandlerContext 之间都是通过双向链表连接,而区分一个 ChannelHandlerContext到底是in还是out,在添加节点的时候我们就可以看到netty是怎么处理的
DefaultChannelPipeline
复制代码
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
复制代码
复制代码
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
复制代码
复制代码
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.检查是否有重复handler
checkMultiplicity(handler);
// 2.创建节点
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加节点
addLast0(newCtx);
}
// 4.回调用户方法
callHandlerAdded0(handler);
return this;
}
复制代码
这里简单地用synchronized方法是为了防止多线程并发操作pipeline底层的双向链表
我们还是逐步分析上面这段代码
检查是否有重复handler
在用户代码添加一条handler的时候,首先会查看该handler有没有添加过
复制代码
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
复制代码
netty使用一个成员变量added标识一个channel是否已经添加,上面这段代码很简单,如果当前要添加的Handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加
由此可见,一个Handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个Handler被共用,只需要加一个@Sharable标注即可,如下
复制代码
@Sharable
public class BusinessHandler {
}
复制代码
而如果Handler是sharable的,一般就通过spring的注入的方式使用,不需要每次都new 一个
isSharable() 方法正是通过该Handler对应的类是否标注@Sharable来实现的
ChannelHandlerAdapter
复制代码
public boolean isSharable() {
Class> clazz = getClass();
Map, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
复制代码
通过反射判断是否有Sharable.class注解
创建节点
回到主流程,看创建上下文这段代码
newCtx = newContext(group, filterName(name, handler), handler);
这里我们需要先分析 filterName(name, handler) 这段代码,这个函数用于给handler创建一个唯一性的名字
复制代码
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
复制代码
显然,我们传入的name为null,netty就给我们生成一个默认的name,否则,检查是否有重名,检查通过的话就返回
netty创建默认name的规则为 简单类名#0,下面我们来看些具体是怎么实现的
复制代码
private static final FastThreadLocal