Netty源码分析 (九)----- 拆包器的奥秘
目录
拆包的原理
netty中拆包的基类
channelRead 方法
总结
正文
Netty 的解码器有很多种,比如基于长度的,基于分割符的,私有协议的。但是,总体的思路都是一致的。
拆包思路:当数据满足了 解码条件时,将其拆开。放到数组。然后发送到业务 handler 处理。
半包思路: 当读取的数据不够时,先存起来,直到满足解码条件后,放进数组。送到业务 handler 处理。
回到顶部
拆包的原理
在没有netty的情况下,用户如果自己需要拆包,基本原理就是不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包
1.如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
2.如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接
回到顶部
netty中拆包的基类
netty 中的拆包也是如上这个原理,在每个SocketChannel中会一个 pipeline ,pipeline 内部会加入解码器,解码器都继承基类 ByteToMessageDecoder,其内部会有一个累加器,每次从当前SocketChannel读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,下面我们先详细分析下这个类
看名字的意思是:将字节转换成消息的解码器。人如其名。而他本身也是一个入站 handler,所以,我们还是从他的 channelRead 方法入手。
channelRead 方法
我们先看看基类中的属性,cumulation是此基类中的一个 ByteBuf 类型的累积区,每次从当前SocketChannel读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,如果不够一个完整的数据包,则等待下一次从TCP的数据到来,继续累加到此cumulation中
复制代码
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
//累积区
ByteBuf cumulation;
private ByteToMessageDecoder.Cumulator cumulator;
private boolean singleDecode;
private boolean decodeWasNull;
private boolean first;
private int discardAfterReads;
private int numReads;
.
.
.
}
复制代码
channelRead方法是每次从TCP缓冲区读到数据都会调用的方法,触发点在AbstractNioByteChannel的read方法中,里面有个while循环不断读取,读取到一次就触发一次channelRead
复制代码
1 @Override
2 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
3 if (msg instanceof ByteBuf) {
4 // 从对象池中取出一个List
5 CodecOutputList out = CodecOutputList.newInstance();
6 try {
7 ByteBuf data = (ByteBuf) msg;
8 first = cumulation == null;
9 if (first) {
10 // 第一次解码
11 cumulation = data;//直接赋值
12 } else {
13 // 第二次解码,就将 data 向 cumulation 追加,并释放 data
14 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
15 }
16 // 得到追加后的 cumulation 后,调用 decode 方法进行解码
17 // 主要目的是将累积区cumulation的内容 decode 到 out数组中
18 callDecode(ctx, cumulation, out);
19 } catch (DecoderException e) {
20 throw e;
21 } catch (Throwable t) {
22 throw new DecoderException(t);
23 } finally {
24 // 如果累计区没有可读字节了,有可能在上面callDecode方法中已经将cumulation全部读完了,此时writerIndex==readerIndex
25 // 每读一个字节,readerIndex会+1
26 if (cumulation != null && !cumulation.isReadable()) {
27 // 将次数归零
28 numReads = 0;
29 // 释放累计区,因为累计区里面的字节都全部读完了
30 cumulation.release();
31 // 便于 gc
32 cumulation = null;
33 // 如果超过了 16 次,还有字节没有读完,就将已经读过的数据丢弃,将 readIndex 归零。
34 } else if (++ numReads >= discardAfterReads) {
35 numReads = 0;
36 //将已经读过的数据丢弃,将 readIndex 归零。
37 discardSomeReadBytes();
38 }
39
40 int size = out.size();
41 decodeWasNull = !out.insertSinceRecycled();
42 //循环数组,向后面的 handler 发送数据
43 fireChannelRead(ctx, out, size);
44 out.recycle();
45 }
46 } else {
47 ctx.fireChannelRead(msg);
48 }
49 }
复制代码
从对象池中取出一个空的数组。
判断成员变量是否是第一次使用,将 unsafe 中传递来的数据写入到这个 cumulation 累积区中。
写到累积区后,在callDecode方法中调用子类的 decode 方法,尝试将累积区的内容解码,每成功解码一个,就调用后面节点的 channelRead 方法。若没有解码成功,什么都不做。
如果累积区没有未读数据了,就释放累积区。
如果还有未读数据,且解码超过了 16 次(默认),就对累积区进行压缩。将读取过的数据清空,也就是将 readIndex 设置为0.
调用 fireChannelRead 方法,将数组中的元素发送到后面的 handler 中。
将数组清空。并还给对象池。
下面来说说详细的步骤。
写入累积区
如果当前累加器没有数据,就直接跳过内存拷贝,直接将字节容器的指针指向新读取的数据,否则,调用累加器累加数据至字节容器
复制代码
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
复制代码
我们看看构造方法
复制代码
protected ByteToMessageDecoder() {
this.cumulator = MERGE_CUMULATOR;
this.discardAfterReads = 16;
CodecUtil.ensureNotSharable(this);
}
复制代码
可以看到 this.cumulator = MERGE_CUMULATOR;,那我们接下来看看 MERGE_CUMULATOR
复制代码
public static final ByteToMessageDecoder.Cumulator MERGE_CUMULATOR = new ByteToMessageDecoder.Cumulator() {
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() <= cumulation.maxCapacity() - in.readableBytes() && cumulation.refCnt() <= 1) {
buffer = cumulation;
} else {
buffer = ByteToMessageDecoder.expandCumulation(alloc, cumulation, in.readableBytes());
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
复制代码
MERGE_CUMULATOR是基类ByteToMessageDecoder中的一个静态常量,其重写了cumulate方法,下面我们看一下 MERGE_CUMULATOR 是如何将新读取到的数据累加到字节容器里的
netty 中ByteBuf的抽象,使得累加非常简单,通过一个简单的api调用 buffer.writeBytes(in); 便将新数据累加到字节容器中,为了防止字节容器大小不够,在累加之前还进行了扩容处理
复制代码
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
复制代码
扩容也是一个内存拷贝操作,新增的大小即是新读取数据的大小
将累加到的数据传递给业务进行拆包
当数据追加到累积区之后,需要调用 decode 方法进行解码,代码如下:
复制代码
public boolean isReadable() {
//写的坐标大于读的坐标则说明还有数据可读
return this.writerIndex > this.readerIndex;
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List