Mina传输大数组,多路解码,粘包问题的处理

时间:2023-03-09 09:38:21
Mina传输大数组,多路解码,粘包问题的处理

我的实际情况:

1,传递的业务数据种类很多,这就决定了我们要用多路解码器,MINA的中文手册提供的是DemuxingProtocolCodecFactory;

2,,有的数据长度达到8K,网上有资料说Mina在传输数据超过2K的情况下,会分片传输,因此要考虑如何来接收;

3,若数据发送很快,或者网络状况不佳,很容易出现粘包的情况,这也是要解决的问题。

1)针对多路解码:

编码器:

将编码器继承MessageEncoder<T>,T是你编码的对象的类,此中我是要编码Requstwork类;其中GetBytes()是我自己定义的将对象的数据组成字节数组的函数;

public class RequstNetworkEncoder implements MessageEncoder<RequstNetwork>{
@Override
public void encode(IoSession ioSession, RequstNetwork requstNetwork, ProtocolEncoderOutput out)
throws Exception {
if (requstNetwork != null) {
byte[] bytes1 = GetBytes(requstNetwork);
int capacity = bytes1.length;
IoBuffer buffer = IoBuffer.allocate(capacity, false);
buffer.setAutoExpand(true);
buffer.put(bytes1);
buffer.flip();
out.write(buffer);
}
}
}

对应的解码器:

public class RequstNetworkDecoder implements MessageDecoder {
@Override
public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
if(ioBuffer.remaining()<2){
//还没有达到不同数据的标志位的地方
return MessageDecoderResult.NEED_DATA;
}
else{
ioBuffer.position(1);
byte b=ioBuffer.get();
if (b==(此处为区分不同数据的标志)){
return MessageDecoderResult.OK; }
else{
return MessageDecoderResult.NOT_OK;
}
}
} @Override
public MessageDecoderResult decode(IoSession ioSession, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
RequstNetworkReply reply=new RequstNetworkReply();
//自己解码的过程
out.write(reply);
return MessageDecoderResult.OK;
} @Override
public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { }
}

编解码工厂:

public class MyProtocolCodecFactory extends DemuxingProtocolCodecFactory {

    public MyProtocolCodecFactory(){
super.addMessageEncoder(RequstNetwork.class,RequstNetworkEncoder.class);
super.addMessageDecoder(RequstNetworkDecoder.class); }
}

针对大数组的传输和粘包,修改了一下网上的做法:

public class RequestPlanDecoder extends CumulativeProtocolDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
"context"); @Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception { Context ctx =getContext(session);//获取session 的context long matchCount=ctx.getMatchLength();//目前已获取的数据
long length=ctx.getLength();//数据总长度
IoBuffer buffer=ctx.getBuffer();//数据存入buffer //第一次取数据
if(length==0){
length=in.getLong();
//保存第一次获取的长度
ctx.setLength(length);
matchCount=in.remaining();
ctx.setMatchLength(matchCount);
}
else{
matchCount+=in.remaining();
ctx.setMatchLength(matchCount);
}
ctx.setMatchLength(matchCount);
if (in.hasRemaining()) {
// 如果buff中还有数据
buffer.put(in);
// 添加到保存数据的buffer中
if (matchCount >= length) {
////自己解码的部分///////
if(buffer.remaining() > 0) {
//如果读取一个完整包内容后还粘了包,就让父类再调用一次,进行下一次解析
IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
temp.put(buffer);
temp.flip();
in.sweep();
//清空数据
in.put(temp);
}
ctx.reset();//清空
return true;
} else {
ctx.setBuffer(buffer);
return false;
}
}
return false;
} //获取session的context
public Context getContext(IoSession session) {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}
/** * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析 **/
private class Context { public IoBuffer buffer; public long length = 0; public long matchLength = 0; public Context() {
buffer = IoBuffer.allocate(1024).setAutoExpand(true);
}
public void setBuffer(IoBuffer buffer) {
this.buffer = buffer;
}
public void setLength(long length) {
this.length = length;
}
public void setMatchLength(long matchLength) {
this.matchLength = matchLength;
}
public IoBuffer getBuffer() {
return buffer;
}
public long getLength() {
return length;
}
public long getMatchLength() {
return matchLength;
}
public void reset(){
this.buffer.clear();
this.length=0;
this.matchLength=0;
} }
}

我想让传大数组的解码器能和其他解码器一起共用,通过查看官方的MINA API直到MessageDecoder就是继承了CumulativeProtocolDecoder,于是就做了如下结合:

public class RequestPlanDecode implements MessageDecoder  {
private final AttributeKey CONTEXT = new AttributeKey(getClass(),
"context");
@Override
public MessageDecoderResult decodable(IoSession ioSession, IoBuffer in) {
if(in.remaining()<2){
return MessageDecoderResult.NEED_DATA;
}
else{
byte b1=in.get();
byte b2=in.get();
if(b2==<span style="font-family: Arial, Helvetica, sans-serif;">(此处为区分不同数据的标志)</span>){
return MessageDecoderResult.OK;
}
else {
return MessageDecoderResult.NOT_OK;
}
}
} @Override
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
//=================结合CumulativeProtocolDecoder================//
Context ctx =getContext(session);//获取session 的context long matchCount=ctx.getMatchLength();//目前已获取的数据
long length=ctx.getLength();//数据总长度
IoBuffer buffer=ctx.getBuffer();//数据存入buffer //第一次取数据
if(length==0){
length=in.getLong();
//保存第一次获取的长度
ctx.setLength(length);
matchCount=in.remaining();
ctx.setMatchLength(matchCount);
}
else{
matchCount+=in.remaining();
ctx.setMatchLength(matchCount);
}
if (in.hasRemaining()) {// 如果buff中还有数据
buffer.put(in);// 添加到保存数据的buffer中
if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
////自己解码的部分/////// if(buffer.remaining() > 0) {////解决粘包
IoBuffer temp = IoBuffer.allocate(1024).setAutoExpand(true);
temp.put(buffer);
temp.flip();
in.sweep();
in.put(temp);
}
ctx.reset();
return MessageDecoderResult.OK; } else {
ctx.setBuffer(buffer);
return MessageDecoderResult.NEED_DATA;
}
}
return MessageDecoderResult.NEED_DATA;
} @Override
public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput)
throws Exception { }
/////////////////////////////////////结合CumulativeProtocolDecoder/////////////////////////////////////////////////
//获取session的context
public Context getContext(IoSession session) {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}
/**
* 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于大数据解析
*
*/
private class Context {
public IoBuffer buffer;
public long length = 0;
public long matchLength = 0; public Context() {
buffer = IoBuffer.allocate(1024).setAutoExpand(true);
} public void setBuffer(IoBuffer buffer) {
this.buffer = buffer;
} public void setLength(long length) {
this.length = length;
}
public void setMatchLength(long matchLength) {
this.matchLength = matchLength;
} public IoBuffer getBuffer() { return buffer;
} public long getLength() {
return length;
} public long getMatchLength() {
return matchLength;
} public void reset(){
this.buffer.clear();
this.length=0;
this.matchLength=0;
}
} }