netty中TCP的黏包/拆包解决之道

时间:2022-12-15 18:43:16

这篇博客的主要内容是:
1. TCP黏包/拆包的基础知识
2.没考虑TCP黏包/拆包 导致的异常案例
3.netty中解决TCP黏包/拆包的方法

1.TCP黏包/拆包的原理

TCP 是一个“流”协议,所谓流就是没有界限的一串数据。TCP并不了解上层业务数据的具体定义,它只会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送(拆包);也有可能把多个小的包封装成一个大的数据包一起发送(黏包)。

1)黏包/拆包问题说明

通过一个图示来说明TCP的黏包/拆包的问题:
netty中TCP的黏包/拆包解决之道

如上图所示:客户端分别发送两个数据包D1、D2给服务端,由于服务端一次读取到的字节数是不一定的,故可能存在以下4种情况:
(1)服务端分两次读取到了两个独立的数据包,分别是D1、D2,没有黏包和拆包。这也是最理想的情况。

(2)服务端一次读取到了两个数据包,D1、D2粘合在一起,被称为TCP黏包。

(3)服务端分两次读取到了数据包:第一次读取到了完整的D1包和部分D2包D2_1;第二次读取到了剩余的D2包D2_2, 这称为TCP的拆包。

(4)服务端分两次读取到了数据包:第一次读取到了部分的D1包D1_1;第二次读取到了剩余的D1包D1_2和完整的D2包, 这也称为TCP的拆包。

(5)如果此时服务端TCP接收窗口比较小,而数据包D1和D2又比较大,可能会分多次服务端才能完全将D1和D2完全接收。

2) 黏包拆包问题的常用解决思路:

前面就说了底层的TCP协议是无法理解上层应用协议的业务数据也就无法避免会发生拆包和黏包,所以只能通过上层的应用协议栈来解决。常用解决思路有如下几个方面:
(1)消息定长,例如每个报文的大小为固定长度200个字节,如果不够,空位补空格。
(2)在包尾增加回车换行符进行分割,
(3)将消息分为消息头和消息体,消息头中包含表示消息总长度的字段(或则是消息体总长度)。常用设计思路是消息头第一个字段使用int32表示消息总长度。
(4)更加复杂的应用协议栈。

2.未考虑TCP黏包/拆包 导致的异常案例

本示例基于之前的一片博文:基于netty的时间服务器上:
http://blog.csdn.net/u010853261/article/details/54799089

在那个时间服务器上并没有考虑到读取半包的问题,这在功能测试时没有问题,但是一旦压力上来,发送的数据包比较大时候,就可能存在黏包和拆包问题,下面还是通过实例来说明:

TimeServer 的改造

这里我们对服务端的handler事件进行改造:

package netty.quanwei.p4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.DateUtil;
import utils.LogUtil;

import java.util.Date;

/**
* Created by louyuting on 17/1/31.
*/

public class TimeServerHandler extends ChannelInboundHandlerAdapter{

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

LogUtil.log_debug("Server -> read");

ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);

String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());

LogUtil.log_debug("timeServer received order: " + body + "the counter is:" + (++counter)) ;

String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? DateUtil.fmtDateToMillisecond(new Date()) : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");

//response
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//异步发送应答消息给客户端: 这里并没有把消息直接写入SocketChannel,而是放入发送缓冲数组中
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
LogUtil.log_debug("Server -> read complete");

//将发送缓冲区中数据全部写入SocketChannel
//ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}

主要做了如下改变:收到消息之后就计数一次, 然后发送应答消息给客户端。按照这种设计, 逻辑上应该是服务端收到的消息总数和客户端发送的消息总数相同; 而且请求消息删除回车换行符之后应该是 “QUERY TIME ORDER”

TImeClient改造

package netty.quanwei.p4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
* Created by louyuting on 17/1/31.
*/

public class TimeCLientHandler extends ChannelInboundHandlerAdapter{
private byte[] req;

private int counter;

public TimeCLientHandler() {
this.req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
}


/**
* 链路建立成功时发送100条消息到服务端, 每发送一条就刷新一次数据到SocketChannel
* @param ctx
* @throws Exception
*/

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log_debug("client -> active");
ByteBuf message=null;

for(int i=0; i<100; i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LogUtil.log_debug("client -> read");

ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);

String body = new String(req, "UTF-8");

LogUtil.log_debug("NOW is: " + body + " the counter is:" + (++counter));

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

主要做了如下改变:
1)链路建立成功时发送100条消息到服务端, 每发送一条就刷新一次数据到SocketChannel,保证每条消息都被及时写入到channel
2)按逻辑服务端应该接收到100条查询时间的指令.
3)此外,客户端每收到一次响应就打印一次响应并计数。

运行结果:

服务端:

2017-01-31 13:30:51Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDERthe counter is:1
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDERthe counter is:2
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:3
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:4
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:5
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:6
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:7
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:8
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:9
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:10
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:11
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:12
2017-01-31 13:30:57Server -> read complete
2017-01-31 13:30:57Server -> read
2017-01-31 13:30:57:timeServer received order: QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDERthe counter is:13
2017-01-31 13:30:57Server -> read complete

这说明:counter值13,服务端只接收到了13条消息;本应该是100条消息的,说明这是发生了TCP的黏包。

客户端:

2017-01-31 13:30:57:client -> active
2017-01-31 13:30:57:client -> read
2017-01-31 13:30:57:NOW is: 2017-01-31 13:30:57-939
2017-01-31 13:30:57-941
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
BAD ORDER
the counter is:1

客户端理想下应该是收到100条响应的,但是根据counter值可知也只收到了1条,但是服务端确实收到了13条请求的消息的,所以返回的是一个时间和12个BAD ORDER字符。 这说明发生了黏包。

这冲充分说明在没考虑黏包和拆包情况下,服务器很可能不能正常工作。

3.netty中解决TCP黏包/拆包的方法

下面通过netty自带的LineBasedFrameDecoder和StringDecoder来解决TCP黏包的问题。

还是基于上面的时间服务器来更正,解决TCP黏包和拆包问题:

服务端

更改服务端的ChildChannelInitializer类,增加LineBasedFrameDecoder和StringDecoder

package netty.quanwei.p4_2;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* Created by louyuting on 17/1/31.
*/

public class ChildChannelInitializer extends ChannelInitializer<SocketChannel> {


@Override
protected void initChannel(SocketChannel channel) throws Exception {

channel.pipeline().addLast(new LineBasedFrameDecoder(1024));

channel.pipeline().addLast(new StringDecoder());

channel.pipeline().addLast("timeServerHandler", new TimeServerHandler());
}
}

然后更改timeServerHandler, 因为消息已经经过了StringDecoder解码,所以msg可以直接转换成string

package netty.quanwei.p4_2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.DateUtil;
import utils.LogUtil;

import java.util.Date;

/**
* Created by louyuting on 17/1/31.
*
* 收到消息之后就计数一次, 然后发送应答消息给客户端.
* 按照这种设计, 逻辑上应该是服务端收到的消息总数和客户端发送的消息总数相同;
* 请求消息删除回车换行符之后应该是 "QUERY TIME ORDER"
*
*/

public class TimeServerHandler extends ChannelInboundHandlerAdapter{

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

LogUtil.log_debug("Server -> read");

String body = (String)msg;

LogUtil.log_debug("timeServer received order: " + body + "the counter is:" + (++counter)) ;

String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? DateUtil.fmtDateToMillisecond(new Date()) : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");

//response
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//异步发送应答消息给客户端: 这里并没有把消息直接写入SocketChannel,而是放入发送缓冲数组中
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
LogUtil.log_debug("Server -> read complete");

//将发送缓冲区中数据全部写入SocketChannel
//ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//释放资源
ctx.close();
}
}

客户端

客户端的修改和服务端基本类似:

package netty.quanwei.p4_2;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
* Created by louyuting on 17/1/31.
* netty 时间服务器 客户端
*/

public class TimeClient {

public void connect(int port, String host) throws Exception{
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap client = new Bootstrap();

try {
client.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));

channel.pipeline().addLast(new StringDecoder());

channel.pipeline().addLast(new TimeCLientHandler());

}
});

//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();

//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}

/**
* main 函数
* @param args
*/

public static void main(String[] args) {
TimeClient client = new TimeClient();
try {
client.connect(18888, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}

handler 的修改:

package netty.quanwei.p4_2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import utils.LogUtil;

/**
* Created by louyuting on 17/1/31.
* 链路建立成功时发送100条消息到服务端, 每发送一条就刷新一次数据到SocketChannel,保证每条消息都被及时写入到channel
* 按逻辑服务端应该接收到100条查询时间的指令.
*
* 此外,客户端每收到一次响应就打印一次响应
*
*/

public class TimeCLientHandler extends ChannelInboundHandlerAdapter{
private byte[] req;

private int counter;

public TimeCLientHandler() {
this.req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
}


/**
* 链路建立成功时发送100条消息到服务端, 每发送一条就刷新一次数据到SocketChannel
* @param ctx
* @throws Exception
*/

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.log_debug("client -> active");
ByteBuf message=null;

for(int i=0; i<100; i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
LogUtil.log_debug("client -> read");

String body = (String)msg;

LogUtil.log_debug("NOW is: " + body + " the counter is:" + (++counter));

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

运行结果:

加入了netty自带的解决TCP黏包拆包的解码器之后,便可以按照我们预期的运行了:
服务端:

2017-01-31 14:05:34Server -> read
2017-01-31 14:05:34timeServer received order: QUERY TIME ORDERthe counter is:1
2017-01-31 14:05:34Server -> read complete
2017-01-31 14:05:34Server -> read
2017-01-31 14:05:34timeServer received order: QUERY TIME ORDERthe counter is:2
..........
..........
2017-01-31 14:06:21Server -> read
2017-01-31 14:06:21timeServer received order: QUERY TIME ORDERthe counter is:99
2017-01-31 14:06:21Server -> read
2017-01-31 14:06:21timeServer received order: QUERY TIME ORDERthe counter is:100
2017-01-31 14:06:21Server -> read complete

客户端:

2017-01-31 14:06:21client -> active
2017-01-31 14:06:21client -> read
2017-01-31 14:06:21NOW is: 2017-01-31 14:06:21-082 the counter is:1
2017-01-31 14:06:21client -> read
2017-01-31 14:06:21NOW is: 2017-01-31 14:06:21-083 the counter is:2
........
........
2017-01-31 14:06:21client -> read
2017-01-31 14:06:21NOW is: 2017-01-31 14:06:21-157 the counter is:98
2017-01-31 14:06:21client -> read
2017-01-31 14:06:21NOW is: 2017-01-31 14:06:21-158 the counter is:99
2017-01-31 14:06:21client -> read
2017-01-31 14:06:21NOW is: 2017-01-31 14:06:21-159 the counter is:100

服务端和客户端都是发送并接收了100次请求。

LineBasedFrameDecoder和StringDecoder原理分析

LineBasedFrameDecoder的工作原理是:依次遍历ByteBuf中的可读字节,判断看其是否有”\n” 或则 “\r\n”, 如果有就以此位置为结束位置。 从可读索引到结束位置的区间的字节就组成了一行。 它是以换行符为结束标志的解码器,支持携带结束符和不带结束符两种解码方式,同时支持配置单行的最大长度,如果读到了最大长度之后仍然没有发现换行符,则抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder的功能就非常简单了,就是将之前接收到的对象转换成字符串。

LineBasedFrameDecoder + StringDecoder 就是一个按行切换的文本解码器。

这里有一个问题,如果消息不是以换行符结束的怎么办呢?不用担心,netty提供了多种TCP黏包拆包解码器,满足不同需求。

本文所有源码github地址:
1)没考虑TCP黏包/拆包 导致的异常案例:
https://github.com/leetcode-hust/leetcode/tree/master/louyuting/src/netty/quanwei/p4

2)netty中解决TCP黏包/拆包的方法:
https://github.com/leetcode-hust/leetcode/tree/master/louyuting/src/netty/quanwei/p4_2