netty客户端连接后无限发送数据,连接不上时无限重试,断线重连

时间:2024-03-18 12:25:28

在之前的netty文章里,刚开始学,利用netty实现websocket写了一个聊天程序。

纯netty实现http,websocket协议,头像上传,搭建实时聊天室,群聊,私聊,文字,图片消息

本文的需求已经在文章的标题体现了。那接下来一一实现吧!首先从连接开始!

 如果先启动客户端client,客户端连接不上服务端server,客户端就会无限重试

 关键代码,添加一个连接监听器:(一会完整的代码我会粘贴在下面)

netty客户端连接后无限发送数据,连接不上时无限重试,断线重连

 

class ConnectionListener implements ChannelFutureListener {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (!channelFuture.isSuccess()) {
            final EventLoop loop = channelFuture.channel().eventLoop();
            loop.schedule(new Runnable() {
                @Override
                public void run() {
                    System.err.println("服务端链接不上,开始重连操作...");
                    try {
						NettyClient.connect(10007, "127.0.0.1");
					} catch (Exception e) {
						e.printStackTrace();
					}
                }
            }, 3, TimeUnit.SECONDS);
        } else {
            System.err.println("服务端链接成功...");
        }
    }
}

需要注意的是,添加了监听器之后,吧下面这个优雅关机的代码注释掉!因为我们要实现的就是不关机,无限重试,如果没有注释这个,我们的监听器将会不起效,这个优雅关机的条就是,没有连接时关机,那就违背了我们要做到的没有连接时无限重试了!

这样实现了没有连接上服务器端时无限重试,下面是没连接上时的控制台输出: 

netty客户端连接后无限发送数据,连接不上时无限重试,断线重连

代码里我们设置的是3秒重试一次!

下面解决第二个问题:如果因为运行期间抛出异常导致,服务端客户端连接断开,我的客户端还是要无限重试连接!

解释一下为什么这样做,这里我服务端只是一个模拟,我真实的业务场景是要用客户端通讯硬件设备,所以不关心服务端,只要异常就让客户端无限重试!

解决办法:在重写客户端的handler中,重写这个方法!

@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                System.err.println("服务端链接不上,开始重连操作...");
                try {
					NettyClient.connect(10007, "127.0.0.1");
				} catch (Exception e) {
					e.printStackTrace();
				}
            }
        }, 3, TimeUnit.SECONDS);
	}

这样会在因为种种原因断开链接后使客户端无限重试!

下面解决第三个问题:当客户端和服务端简历连接后无限发送数据!

解释一下应用场景,我客户端开启后,需要在和服务端建立链接后无限拉取服务端的数据,就是要实时获取了,并且定期存储数据库!

刚开始直接这样在重写的channelActive方法中加了个while(true):

这样做也是完全体现了我的无知和青涩.....还会引发粘包问题...(这里是错的所以一带而过了,下面说正确怎么做)

这个方法是在建立连接后执行,貌似没有问题,但是,客户端的读方法是在这个方法之后执行的,这样就知道怎么改了。

我需要发送数据,和接收数据是并行执行,而不是串行。并行怎么实现呢?当然是线程了!

于是这样写channelActive方法,直接在里面开启线程,无限发送消息!

@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		new Thread(new Runnable() {
			@Override
			public void run() {
				while(true) {
					ByteBuf bf = Unpooled.copiedBuffer(("你好服务端:"+Math.random()).getBytes());
					ctx.writeAndFlush(bf);
					try {
						Thread.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
	}

 下面看控制台的打印

netty客户端连接后无限发送数据,连接不上时无限重试,断线重连

netty客户端连接后无限发送数据,连接不上时无限重试,断线重连

下面是完整的客户端和服务端的代码:

我直接分在了两个类里面,方便大家粘贴了以后就可以执行:

服务端:

package com.ning.nett;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

public class TimeServer {
	public void bind(int port) throws Exception{
		NioEventLoopGroup bossGroup = new NioEventLoopGroup();
		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
			.channel(NioServerSocketChannel.class)
			.option(ChannelOption.SO_BACKLOG, 1024)
			.childHandler(new ChildChannelHandler());
			ChannelFuture f = b.bind(port).sync();
			f.channel().closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

		@Override
		protected void initChannel(SocketChannel sc) throws Exception {
			sc.pipeline().addLast(new TimeServerHandler());
		}
		
	}
	public static void main(String[] args) throws Exception {
		new TimeServer().bind(10007);
	}
}
class TimeServerHandler extends ChannelInboundHandlerAdapter{
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf bb = (ByteBuf) msg;
		byte[] b = new byte[bb.readableBytes()];
		bb.readBytes(b);
		System.out.println("收到客户端数据:"+new String(b));
		ByteBuf bf = Unpooled.copiedBuffer(("你好客户端:"+Math.random()).getBytes());
		ctx.writeAndFlush(bf);
	}
}

客户端

package com.ning.nett;


import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
	public static void connect(int port,String host) throws Exception{
		NioEventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
			.option(ChannelOption.SO_KEEPALIVE, true)
			.handler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new MessageHandler());
				}
			});
			ChannelFuture f = b.connect(host, port);
			f.addListener(new ConnectionListener());
			f.channel().closeFuture().sync();
		} finally {
			//group.shutdownGracefully();
		}
	}
	public static void main(String[] args){
		try {
			connect(10007, "127.0.0.1");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
class MessageHandler extends ChannelInboundHandlerAdapter{
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("关闭客户端连接");
		cause.printStackTrace();
	}
	
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                System.err.println("服务端链接不上,开始重连操作...");
                try {
					NettyClient.connect(10007, "127.0.0.1");
				} catch (Exception e) {
					e.printStackTrace();
				}
            }
        }, 3, TimeUnit.SECONDS);
	}
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		new Thread(new Runnable() {
			@Override
			public void run() {
				while(true) {
					ByteBuf bf = Unpooled.copiedBuffer(("你好服务端:"+Math.random()).getBytes());
					ctx.writeAndFlush(bf);
					try {
						Thread.sleep(500);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf bb = (ByteBuf) msg;
		byte[] b = new byte[bb.readableBytes()];
		bb.readBytes(b);
		System.out.println("收到服务器端数据:"+new String(b));
	}
}
class ConnectionListener implements ChannelFutureListener {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (!channelFuture.isSuccess()) {
            final EventLoop loop = channelFuture.channel().eventLoop();
            loop.schedule(new Runnable() {
                @Override
                public void run() {
                    System.err.println("服务端链接不上,开始重连操作...");
                    try {
						NettyClient.connect(10007, "127.0.0.1");
					} catch (Exception e) {
						e.printStackTrace();
					}
                }
            }, 3, TimeUnit.SECONDS);
        } else {
            System.err.println("服务端链接成功...");
        }
    }
}

 还有一点要说的是:

这里开启线程只是测试阶段这样做,在项目里,springboot项目是用线程池去执行这个线程的,

在springBoot开启这个客户端当然也不是用main方法启动,需要配置springboot的启动时初始化这个客户端。

(这里也有坑的下面的文章我会解释)