Java Nio Socket通讯

时间:2023-03-10 02:58:47
Java Nio Socket通讯

Server端:

#############服务器端连接请求处理###############
public class MultiplexerServer implements Runnable { /**多路复用器,SocketChannel注册到Selector.Selector轮询监听Channel网络事件*/
private Selector selector; private ServerSocketChannel serverSocketChannel; private boolean stop; public MultiplexerServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("[" + getClass().getSimpleName() + "] start in port [" + port + "]");
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
} @Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
try {
/**处理请求连接*/
handle(selectionKey);
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
if (selectionKey.channel() != null) {
selectionKey.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
} }
try {
/**Selector关闭后,注册的Channel及Pipe等资源都会自动关闭(不需要重复关闭资源)*/
if (selector != null) {
selector.close();
}
} catch (Exception e) {
e.printStackTrace();
}
} public void stop(boolean stop) {
this.stop = stop;
} private void handle(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) { // 新连接
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(buffer);
if (readBytes > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
/**响应客户端请求*/
response(sc, new Respone("server received", 0));
} else if (readBytes == 0) {
;
} else { // 客户端关闭连接
key.cancel();
sc.close();
}
}
}
} private void response(SocketChannel sc, Respone respone) throws IOException{
if (respone != null && sc != null && sc.isConnected()) {
String json = respone.toString();
byte[] bytes = json.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
}
}
} ###############服务器端响应################
public class Respone { private String data; private int status; public Respone() { } public Respone(String data, int status) {
this.data = data;
this.status = status;
} public String getData() {
return data;
} public void setData(String data) {
this.data = data;
} public int getStatus() {
return status;
} public void setStatus(int status) {
this.status = status;
} @Override
public String toString() {
return "{\"data\":" + data + ",\"status\":" + status + "}";
}
} #############服务端入口##############
public class NioServer { public static void start(int port) {
Executors.newSingleThreadExecutor().submit(new MultiplexerServer(port));
} public static void main(String []args) {
int port = 8181;
if (args != null && args.length == 1) {
port = Integer.parseInt(args[0]);
}
NioServer.start(port);
}
}

  Client段:

public class NioClientConnection implements Runnable {

	private String host;

	private int port;

	private Selector selector;

	private SocketChannel socketChannel;

	private boolean stop;

	public NioClientConnection(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
} @Override
public void run() {
try {
connect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
try {
// 响应服务
doResponse(selectionKey);
} catch (Exception e) {
e.printStackTrace();
if (selectionKey != null) {
selectionKey.cancel();
if (selectionKey.channel() != null) {
selectionKey.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
try {
if (selector != null) {
selector.close();
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
} private void connect() throws IOException {
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
send(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
} private void send(SocketChannel sc) throws IOException {
byte[] bytes = "Hello, Server".getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
if (!buffer.hasRemaining()) {
System.out.println("send msg sucess");
}
} private void doResponse(SelectionKey key) throws IOException{
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
send(socketChannel);
}
}
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(buffer);
if (readBytes > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
/**关闭连接*/
stop = true;
} else if (readBytes == 0) {
;
} else { // 客户端关闭连接
key.cancel();
sc.close();
}
}
}
} } public class NioClient { public static void start(String host, int port) {
Executors.newSingleThreadExecutor().execute(new NioClientConnection(host, port));
} public static void main(String []args) {
String host = "127.0.0.1";
int port = 8181;
if (args != null && args.length == 2) {
host = args[0];
port = Integer.valueOf(args[1]);
}
NioClient.start(host, port);
} }