Mina - 模拟同步请求

时间:2023-03-09 03:41:12
Mina - 模拟同步请求

这篇博客主要就铺代码吧,Mina的一些基础知识可以参考:

http://www.cnblogs.com/huangfox/p/3458272.html

场景假设:

1.客户端发送用户信息,服务端根据用户名查询用户年龄。(模拟查询)

2.同步请求

3.协议:直接采用字段类型编码解码。


具体代码结构:

Mina - 模拟同步请求

codec负责编码解码,TCPAcceptor服务端,TCPConnector客户端,User业务对象。


User

package com.fox.mina.base.c2;

/**
* @author huangfox
* @date 2013年12月3日 上午11:23:55
*
*/
public class User {
String name;
int age; public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public int getAge() {
return age;
} public void setAge(int age) {
this.age = age;
} @Override
public String toString() {
return "User [name=" + name + ", age=" + age + "]";
} }

  

编码、解码工厂

DefaultMinaCodecFactory

package com.fox.mina.base.c2.codec;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder; /**
* @author huangfox
* @date 2013年12月5日 下午7:48:47
*
*/
public class DefaultMinaCodecFactory implements ProtocolCodecFactory {
ProtocolEncoder encoder;
ProtocolDecoder decoder; public DefaultMinaCodecFactory() { } public DefaultMinaCodecFactory(ProtocolEncoder encoder,
ProtocolDecoder decoder) {
super();
this.encoder = encoder;
this.decoder = decoder;
} @Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
// TODO Auto-generated method stub
return encoder;
} @Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
// TODO Auto-generated method stub
return decoder;
} public ProtocolEncoder getEncoder() {
return encoder;
} public void setEncoder(ProtocolEncoder encoder) {
this.encoder = encoder;
} public ProtocolDecoder getDecoder() {
return decoder;
} public void setDecoder(ProtocolDecoder decoder) {
this.decoder = decoder;
} }

  

编码器

FEncode

package com.fox.mina.base.c2.codec;

import org.apache.commons.lang.ArrayUtils;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput; import com.fox.mina.base.c2.User; /**
* @author huangfox
* @date 2013年12月5日 下午7:49:21
*
*/
public class FEncoder extends ProtocolEncoderAdapter {
public static final NumberCodec numberCodec = DefaultNumberCodecs
.getBigEndianNumberCodec(); @Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
byte[] bytes = null;
if (message instanceof User) {
User user = (User) message;
byte[] name = user.getName().getBytes();
bytes = ArrayUtils.addAll(bytes,
numberCodec.int2Bytes(name.length, 4));
bytes = ArrayUtils.addAll(bytes, name);
bytes = ArrayUtils.addAll(bytes,
numberCodec.int2Bytes(user.getAge(), 4));
out.write(IoBuffer.wrap(bytes));
} else {
System.out.println("encoder error!");
} } }

编码的协议:

用户姓名的长度(int 4字节)

用户年龄(int 4字节)

用户姓名(byte[])

解码器

FDecode

package com.fox.mina.base.c2.codec;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput; import com.fox.mina.base.c2.User; /**
* @author huangfox
* @date 2013年12月3日 上午11:35:46
*
*/
public class FDecoder extends CumulativeProtocolDecoder {
public static final NumberCodec numberCodec = DefaultNumberCodecs
.getBigEndianNumberCodec(); @Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
int len = 0;
Object lenAttr = session.getAttribute("len");
if (lenAttr == null) {
if (in.remaining() < 4) {
return false;
}
byte[] buffer = new byte[4];
in.get(buffer);
len = numberCodec.bytes2Int(buffer, 4);
session.setAttribute("len", len);
} else {
len = (Integer) lenAttr;
}
//
String name = "";
Object nameAttr = session.getAttribute("name");
if (nameAttr == null) {
if (in.remaining() < len) {
return false;
}
byte[] buffer = new byte[len];
in.get(buffer);
name = new String(buffer);
session.setAttribute("name", name);
} else {
name = (String) nameAttr;
}
if (in.remaining() < 4) {
return false;
}
byte[] buffer = new byte[4];
in.get(buffer);
int age = numberCodec.bytes2Int(buffer, 4);
User u = new User();
u.setName(name);
u.setAge(age);
//
out.write(u);
//
session.removeAttribute("len");
session.removeAttribute("name"); return true;
} }

编码、解码使用的工具类:

package com.fox.mina.base.c2.codec;

public interface NumberCodec {
String convertCharset(String charset); byte[] short2Bytes(short value, int byteLength); byte[] int2Bytes(int value, int byteLength); byte[] long2Bytes(long value, int byteLength); byte[] float2Bytes(float value, int byteLength); byte[] double2Bytes(double value, int byteLength); short bytes2Short(byte[] bytes, int byteLength); int bytes2Int(byte[] bytes, int byteLength); long bytes2Long(byte[] bytes, int byteLength); float bytes2Float(byte[] bytes, int byteLength); double bytes2Double(byte[] bytes, int byteLength);
}

  

package com.fox.mina.base.c2.codec;

public class DefaultNumberCodecs {

	private static int b2ui(byte b) {
return (int) (b + 256) % 256;
} private static long b2ul(byte b) {
return (long) (b + 256) % 256;
} private static NumberCodec littleEndianCodec = new NumberCodec() { public int bytes2Int(byte[] bytes, int byteLength) {
int value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ui(bytes[i]) << (i * 8);
}
return value;
} public long bytes2Long(byte[] bytes, int byteLength) {
long value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ul(bytes[i]) << (i * 8);
} return value;
} public short bytes2Short(byte[] bytes, int byteLength) {
short value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ui(bytes[i]) << (i * 8);
} return value;
} @Override
public float bytes2Float(byte[] bytes, int byteLength) {
int value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ui(bytes[i]) << (i * 8);
}
return Float.intBitsToFloat(value);
} @Override
public double bytes2Double(byte[] bytes, int byteLength) {
long value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ul(bytes[i]) << (i * 8);
}
return Double.longBitsToDouble(value);
} public byte[] int2Bytes(int value, int byteLength) {
byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) {
int shiftCount = i * 8;
bytes[i] = (byte) ((value & (0x000000ff << shiftCount)) >> shiftCount);
}
return bytes;
} public byte[] long2Bytes(long value, int byteLength) {
byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) {
int shiftCount = i * 8;
bytes[i] = (byte) ((value & (0x00000000000000ffL << shiftCount)) >> shiftCount);
}
return bytes;
} public byte[] short2Bytes(short value, int byteLength) {
byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) {
int shiftCount = i * 8;
bytes[i] = (byte) ((value & (0x00ff << shiftCount)) >> shiftCount);
}
return bytes;
} @Override
public byte[] float2Bytes(float value, int byteLength) {
byte[] bytes = new byte[byteLength]; // parse the bits that represent the floating-point number
// floatToRawIntBits gives the raw float bits without normalization
// using floatToRawIntBits is over 5 times as fast as
// floatToIntBits.
int x = Float.floatToRawIntBits(value);
for (int i = 0; i < byteLength; i++) {
int shiftCount = i * 8;
bytes[i] = (byte) ((x & (0x000000ff << shiftCount)) >> shiftCount);
}
return bytes;
} @Override
public byte[] double2Bytes(double value, int byteLength) {
byte[] bytes = new byte[byteLength]; // parse the the bits that represent the floating-point number
long x = Double.doubleToRawLongBits(value);
for (int i = 0; i < byteLength; i++) {
int shiftCount = i * 8;
bytes[i] = (byte) ((x & (0x00000000000000ffL << shiftCount)) >> shiftCount);
}
return bytes;
} public String convertCharset(String charset) {
if (charset.equals("UTF-16")) {
return "UTF-16LE";
} else {
return charset;
}
} }; private static NumberCodec bigEndianCodec = new NumberCodec() { public int bytes2Int(byte[] bytes, int byteLength) {
int value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ui(bytes[i]) << ((byteLength - 1 - i) * 8);
}
return value;
} public long bytes2Long(byte[] bytes, int byteLength) {
long value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ul(bytes[i]) << ((byteLength - 1 - i) * 8);
} return value;
} public short bytes2Short(byte[] bytes, int byteLength) {
short value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ui(bytes[i]) << ((byteLength - 1 - i) * 8);
} return value;
} @Override
public float bytes2Float(byte[] bytes, int byteLength) {
int value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ui(bytes[i]) << ((byteLength - 1 - i) * 8);
}
return Float.intBitsToFloat(value);
} @Override
public double bytes2Double(byte[] bytes, int byteLength) {
long value = 0;
for (int i = 0; i < byteLength; i++) {
value |= b2ul(bytes[i]) << ((byteLength - 1 - i) * 8);
}
return Double.longBitsToDouble(value);
} public byte[] int2Bytes(int value, int byteLength) {
byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) {
int shiftCount = ((byteLength - 1 - i) * 8);
bytes[i] = (byte) ((value & (0x000000ff << shiftCount)) >> shiftCount);
}
return bytes;
} public byte[] long2Bytes(long value, int byteLength) {
byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) {
int shiftCount = ((byteLength - 1 - i) * 8);
bytes[i] = (byte) ((value & (0x00000000000000ffL << shiftCount)) >> shiftCount);
}
return bytes;
} public byte[] short2Bytes(short value, int byteLength) {
byte[] bytes = new byte[byteLength]; for (int i = 0; i < byteLength; i++) {
int shiftCount = ((byteLength - 1 - i) * 8);
bytes[i] = (byte) ((value & (0x00ff << shiftCount)) >> shiftCount);
}
return bytes;
} @Override
public byte[] float2Bytes(float value, int byteLength) {
byte[] bytes = new byte[byteLength]; // parse the bits that represent the floating-point number
// floatToRawIntBits gives the raw float bits without normalization
// using floatToRawIntBits is over 5 times as fast as
// floatToIntBits.
int x = Float.floatToRawIntBits(value);
for (int i = 0; i < byteLength; i++) {
int shiftCount = ((byteLength - 1 - i) * 8);
bytes[i] = (byte) ((x & (0x000000ffL << shiftCount)) >> shiftCount);
}
return bytes;
} @Override
public byte[] double2Bytes(double value, int byteLength) {
byte[] bytes = new byte[byteLength]; // parse the the bits that represent the floating-point number
long x = Double.doubleToRawLongBits(value);
for (int i = 0; i < byteLength; i++) {
int shiftCount = ((byteLength - 1 - i) * 8);
bytes[i] = (byte) ((x & (0x00000000000000ffL << shiftCount)) >> shiftCount);
}
return bytes;
} public String convertCharset(String charset) {
if (charset.equals("UTF-16")) {
return "UTF-16BE";
} else {
return charset;
}
} }; public static NumberCodec getBigEndianNumberCodec() {
return bigEndianCodec;
} public static NumberCodec getLittleEndianNumberCodec() {
return littleEndianCodec;
}
}

  

服务端

TCPAcceptor

package com.fox.mina.base.c2;

import java.io.IOException;
import java.net.InetSocketAddress; import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.fox.mina.base.c2.codec.DefaultMinaCodecFactory;
import com.fox.mina.base.c2.codec.FDecoder;
import com.fox.mina.base.c2.codec.FEncoder; /**
* @author huangfox
* @date 2013年12月3日 上午11:15:53
*
*/
public class TCPAcceptor {
IoAcceptor acceptor = null;
String addr = "127.0.0.1";
int port = 9999; public TCPAcceptor() { } public void start() {
acceptor = new NioSocketAcceptor();
//
IoSessionConfig sessionConf = acceptor.getSessionConfig();
sessionConf.setReadBufferSize(2048);
// filter chain(codec)
acceptor.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new DefaultMinaCodecFactory(
new FEncoder(), new FDecoder())));
// handler
acceptor.setHandler(new IOHandler());
//
try {// 可做重连
acceptor.bind(new InetSocketAddress(addr, port));
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("server stated ... ");
} private class IOHandler extends IoHandlerAdapter { @Override
public void messageReceived(IoSession session, Object message)
throws Exception {
User u = (User) message;
System.out.println("[server]" + u.toString());
//模拟业务处理
u.setAge(u.hashCode()%100);
// send msg to client
session.write(u);
} } public static void main(String[] args) {
TCPAcceptor server = new TCPAcceptor();
server.start();
} }

  

客户端

TCPConnector

package com.fox.mina.base.c2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.Random; import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.ReadFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.fox.mina.base.c2.codec.DefaultMinaCodecFactory;
import com.fox.mina.base.c2.codec.FDecoder;
import com.fox.mina.base.c2.codec.FEncoder; /**
* @author huangfox
* @date 2013年12月3日 上午11:15:46
*
*/
public class TCPConnector {
IoConnector connector = null;
IoSession session = null;
String ip = "127.0.0.1";
int port = 9999; public TCPConnector() {
connector = new NioSocketConnector();
connector.getSessionConfig().setUseReadOperation(true);
connector.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new DefaultMinaCodecFactory(
new FEncoder(), new FDecoder())));
connector.setHandler(new IOHander());
ConnectFuture connectF = connector.connect(new InetSocketAddress(ip,
port));
connectF.awaitUninterruptibly();
session = connectF.getSession();
} Random r = new Random(); public void sendMsg(User u) {
WriteFuture writeF = session.write(u);
writeF.awaitUninterruptibly();
if (writeF.getException() != null) {
System.out.println(writeF.getException().getMessage());
} else if (writeF.isWritten()) {
System.out.println("msg was sent!");
// 发送、接受
ReadFuture readF = session.read();
readF.awaitUninterruptibly(1000);
if (readF.getException() != null) {
System.out.println(readF.getException().getMessage());
} else {
System.out.println("[client]"+readF.getMessage().toString());
}
} else {
System.out.println("error!");
}
} public void close() {
this.connector.dispose();
} private class IOHander extends IoHandlerAdapter { } public static void main(String[] args) {
TCPConnector client = new TCPConnector();
while (true) {
BufferedReader r = new BufferedReader(new InputStreamReader(
System.in));
try {
System.out.println("输入:");
String msg = r.readLine();
User u = new User();
u.setName(msg);
client.sendMsg(u);
} catch (IOException e) {
e.printStackTrace();
} }
} }

注意:以上代码在多线程环境下是有问题的!!!  

(最大字体的提示了!)

在TcpConnector中,只有一个session,如果多线程同时对这个session进行读写操作将发生问题(B可能拿到A的响应结果)。

最简单的处理方式就是,加入一个连接池,在sendMsg方法中先获取一个连接,用完以后再归还到连接池。可以理解为“一个线程独占一个连接”。

代码如下:

package com.fox.mina.base.c2;

import java.net.InetSocketAddress;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.ReadFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.fox.mina.base.c2.codec.DefaultMinaCodecFactory;
import com.fox.mina.base.c2.codec.FDecoder;
import com.fox.mina.base.c2.codec.FEncoder; /**
* @author huangfox
* @date 2013年12月3日 上午11:15:46
*
*/
public class TCPConnector {
IoConnector connector = null;
// IoSession session = null;
String ip = "127.0.0.1";
int port = 9999;
int poolSize = 1;
LinkedBlockingQueue<IoSession> pool; public TCPConnector() {
connector = new NioSocketConnector(10);
connector.getSessionConfig().setUseReadOperation(true);
connector.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new DefaultMinaCodecFactory(
new FEncoder(), new FDecoder())));
connector.setHandler(new IOHander());
// pool
pool = new LinkedBlockingQueue<IoSession>(poolSize);
for (int i = 0; i < poolSize; i++) {
ConnectFuture connectF = connector.connect(new InetSocketAddress(
ip, port));
connectF.awaitUninterruptibly();
IoSession session = connectF.getSession();
try {
pool.put(session);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} Random r = new Random(); public void sendMsg(User u) {
IoSession session = null;
try {
session = pool.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (session == null)
return;
try {
WriteFuture writeF = session.write(u);
writeF.awaitUninterruptibly();
if (writeF.getException() != null) {
System.out.println(writeF.getException().getMessage());
} else if (writeF.isWritten()) {
// System.out.println("msg was sent!");
// 发送、接受
ReadFuture readF = session.read();
readF.awaitUninterruptibly();
if (readF.getException() != null) {
System.out.println(readF.getException().getMessage());
} else {
Date d = new Date(System.currentTimeMillis());
// System.out.println(Thread.currentThread().getId() + "-"
// + d.getSeconds() + "-" + session.hashCode());
// System.out.println("[client]"
// + readF.getMessage().toString());
}
} else {
System.out.println("error!");
}
} finally {
try {
pool.put(session);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} public void close() {
this.connector.dispose();
} private class IOHander extends IoHandlerAdapter { } }

  


扩展:

1.加入连接池(见上文)

2.加入连接、读写超时