netty(二) 创建一个netty服务端和客户端

时间:2021-01-08 21:32:54

服务端

NettyServer

package com.zw.netty.config;

import com.zw.netty.channel.ServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; /**
* @author liuzw
* @email liuzw1@hua-cloud.com.cn
* @date 2018/12/20 20:39
*/
public class NettyServer { private final int port; public NettyServer(Integer port){
this.port = port;
} public void startServer(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.TCP_NODELAY, true);
b.childHandler(new ServerInitializer());
b.handler(new LoggingHandler(LogLevel.INFO));
b.childOption(ChannelOption.AUTO_READ, true); // 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync(); // 监听服务器关闭监听
f.channel().closeFuture().sync(); }catch (InterruptedException e){ }finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} public static void main(String [] args){
new NettyServer(8081).startServer();
}
}

ChannelInitializer

package com.zw.netty.channel;

import com.zw.netty.handler.GpsHandler;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.*;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil; /**
* @author liuzw
* @email liuzw1@hua-cloud.com.cn
* @date 2018/12/20 20:55
*/
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override
protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 以("\n")为结尾分割的 解码器
// pipeline.addLast("lenth",new FixedLengthFrameDecoder(7));
// pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
// pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
// pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// pipeline.addLast(new LengthFieldPrepender(4)); // 字符串解码 和 编码
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder()); // 自己的逻辑Handler
pipeline.addLast("handler", new GpsHandler());
}
}

ChannelHandler

package com.zw.netty.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.LocalDateTime; /**
* @author liuzw
* @email liuzw1@hua-cloud.com.cn
* @date 2018/12/20 21:03
*/
public class GpsHandler extends SimpleChannelInboundHandler<String> { @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("RamoteAddress : " + channelHandlerContext.channel().remoteAddress() + " active !");
System.out.println(s);
// channelHandlerContext.channel().writeAndFlush("from server;" + LocalDateTime.now()); } /**
* 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
*
* */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !"); } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// 当出现异常就关闭连接
ctx.close();
}
}

客户端

ClientServer

package com.zw.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; /**
* @author liuzw
* @email liuzw1@hua-cloud.com.cn
* @date 2018/12/21 9:49
*/
public class ClientServer { public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientInitializer()); try {
ChannelFuture channelFuture = bootstrap.connect("localhost", 9005).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}

ChannelInitializer

package com.zw.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil; /**
* @author liuzw
* @email liuzw1@hua-cloud.com.cn
* @date 2018/12/21 9:50
*/
public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new FixedLengthFrameDecoder(7));
// pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
} }

ChannelHandler

package com.zw.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import java.time.LocalDateTime; /**
* @author liuzw
* @email liuzw1@hua-cloud.com.cn
* @date 2018/12/21 9:52
*/
public class MyClientHandler extends SimpleChannelInboundHandler<String> { @Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client output:" + msg);
ctx.writeAndFlush("from client;" + LocalDateTime.now()); } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.channel().close();
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("----------------------");
// ctx.writeAndFlush("test#");
ctx.writeAndFlush("test1 ");
// ctx.writeAndFlush("test11111111111");
} }