Netty框架介绍并编写WebSocket服务端与客户端

时间:2024-04-12 11:56:00

一、Netty 介绍

Netty 是一个基于 Java NIO(Non-blocking I/O,非阻塞 I/O)的网络通信框架,旨在帮助开发者轻松地编写高性能、高可靠性的网络应用程序。由于其出色的设计和强大的功能,Netty 在企业级应用和开源项目中得到了广泛的应用。

二、特点

2.1 高性能

Netty 使用了异步、事件驱动的编程模型,能够处理大量并发连接而不会导致线程资源的浪费。通过底层的 NIO 技术,Netty 能够实现高性能的网络通信。

2.2 可扩展性

Netty 的设计允许开发者灵活地扩展和定制各种功能。它提供了丰富的插件机制和可扩展的 API,使得开发者能够根据自己的需求定制网络应用程序。

2.3 完善的协议支持

Netty 提供了对多种网络协议的支持,包括 HTTP、WebSocket、TCP、UDP 等。开发者可以轻松地构建各种类型的网络应用程序,如 Web 服务器、实时通信应用等。

2.4 容错性和可靠性

Netty 提供了多种机制来保障网络通信的可靠性和容错性,包括心跳检测、断线重连、流量控制等。这些机制使得网络应用程序能够更加稳定地运行。

2.5 跨平台

Netty可以在多种操作系统平台上运行,包括Windows、Linux、MacOS等。

三、使用场景

3.1 高性能服务器

Netty 可以用于构建高性能的服务器,如 Web 服务器、游戏服务器等。其异步、事件驱动的设计使得服务器能够处理大量并发连接而不会导致性能下降。

3.2 实时通信应用

由于 Netty 对 WebSocket 的支持,它非常适合构建实时通信应用,如聊天应用、即时消息应用等。开发者可以利用 Netty 快速搭建稳定、高性能的实时通信系统。

3.3 分布式系统

Netty 可以作为分布式系统中的通信框架,用于实现节点之间的通信和数据传输。其可靠性和高性能使得它成为分布式系统的理想选择。

四、Netty 核心组件

1、Bootstrap(引导类):

用于启动和配置网络应用程序的配置类。ServerBootstrap 用于服务端,Bootstrap 用于客户端。


2、NioEventLoopGroup(事件循环组):

包含多个事件循环(NioEventLoop)的组件。在服务端,通常会创建两个实例:BossEventLoopGroup 负责处理连接事件,WorkerEventLoopGroup 负责处理读写事件。客户端通常只有一个 NioEventLoopGroup 处理连接和 I/O 任务。


3、NioEventLoop(事件循环):

表示一个不断循环执行事件处理的线程。每个 NioEventLoop 包含一个 Selector 和一个 TaskQueue,用于监听注册在其上的 SocketChannel 上的 I/O 事件,并处理非 I/O 任务。

4、Channel(通道):

是一种双向的连接通道,负责处理客户端与服务端之间的所有网络事件(如数据传输、生命周期事件)。不同协议和阻塞类型的连接对应不同类型的 Channel,常见的包括 NioSocketChannel(客户端 TCP Socket 连接)和 NioServerSocketChannel(服务器端 TCP Socket 连接)。

5、ChannelHandler(通道处理器):

负责处理输入输出数据的逻辑,可以接收入站事件(如数据接收)和出站事件(如数据发送),并执行相应的处理逻辑。

6、ChannelHandlerContext(通道处理器上下文):

包含与 ChannelHandler 相关联的各种信息,如 Channel、EventLoop、ChannelPipeline 等。提供了丰富的方法,便于 ChannelHandler 与其他组件进行交互。

7、ChannelPipeline(通道管道):

是一个双向链表,用于拦截和处理事件的链式结构。负责管理 ChannelHandler 并协调它们的处理顺序。每个 ChannelHandlerContext 关联着一个 ChannelHandler。

8、ChannelFuture(通道未来):

主要用于接收异步 I/O 操作返回的执行结果。提供了丰富的方法,用于检查操作状态、添加监听器以接收通知,并对操作结果进行处理。


这些组件共同构成了 Netty 的核心框架,为开发者提供了高效、可靠的网络应用开发环境。

五、使用Netty框架编写WebSocket服务端与客户端

背景:

WebSocket服务端实现功能:

简单的接收到的消息打印出来,并将接收到的消息返回给客户端

WebSocket客户端实现功能:

与服务端建立连接后,向服务端发送一条消息。并将收到服务端发送的消息打印出来

5.1、Netty服务端工作架构流程

Netty 服务端的工作架构流程可以大致描述如下:

  1. 创建引导类(Bootstrap):首先,创建一个 ServerBootstrap 实例作为服务端的启动引导类。设置引导类的配置参数,如线程模型、通道类型等。

  2. 设置线程模型:通过 group 方法设置事件循环组,一般使用 NioEventLoopGroup 类型。可以指定一个 Boss 线程组用于处理连接事件,和一个或多个 Worker 线程组用于处理读写事件。

  3. 指定通道类型和处理器:调用 channel 方法指定通道类型,如 NioServerSocketChannel。通过 childHandler 方法设置通道处理器(ChannelInitializer),用于初始化新连接的 Channel,并添加业务逻辑处理器(ChannelHandler)。

  4. 绑定端口:通过 bind 方法绑定监听端口,并调用 sync 方法等待绑定完成。

  5. 处理连接:一旦有客户端连接到服务器端口,Boss 线程就会接收到连接事件,创建新的 SocketChannel 并将其注册到 Worker 线程组的事件循环中。

  6. 事件循环处理Worker 线程会不断循环从注册的 SocketChannel 中读取数据,并将数据交给 ChannelPipeline 中的 ChannelHandler 处理。处理完成后,可以向客户端发送响应数据。

  7. 关闭通道和释放资源:当连接关闭时,可以通过 ChannelFutureListener 监听器来处理关闭事件,关闭通道并释放相关资源。

  8. 优雅关闭:最后,调用 shutdownGracefully 方法优雅地关闭 EventLoopGroup,释放所有的资源。

这是一个简单的流程,实际应用中可能还涉及到更多细节和复杂性,比如异常处理、心跳检测、流量控制等。 Netty 提供了丰富的组件和功能,可以帮助开发者构建高性能、可靠的网络应用程序。

5.2 服务端代码示例

以下是一个简单的使用Netty框架构建WebSocket服务器的示例代码

1、pom.xml文件中添加以下Netty依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.68.Final</version>
</dependency>

2、WebSocketServerHandler是一个自定义的处理器

WebSocketServerHandler是一个自定义的处理器,用于处理WebSocket相关的事件。在上述示例代码中,我们添加了一个WebSocketServerHandler类来处理WebSocket连接和消息的处理逻辑。

以下是WebSocketServerHandler类的简单示例:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;




public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器接受消息中...");

        // 处理接收到的WebSocket消息
        String request = msg.text();


        System.out.println("服务器接受消息中...");
        System.out.println("Received WebSocket message: " + request);

        // 假设这里的业务逻辑是简单地将接收到的消息返回给客户端
        ctx.channel().writeAndFlush(new TextWebSocketFrame("Server received: " + request));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 发生异常时关闭连接
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client connected: " + ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }
}

1、WebSocketServerHandler继承自Netty的SimpleChannelInboundHandler类,并指定处理的消息类型为TextWebSocketFrame,这表示它将处理来自客户端的文本类型的WebSocket帧。

在channelRead0()方法中,我们处理接收到的WebSocket消息。在这个简单的示例中,我们只是打印接收到的消息,并将其原样返回给客户端。

在exceptionCaught()方法中,我们处理发生异常时的情况。在这里,我们简单地打印异常信息,并关闭与客户端的连接。

可以根据实际需求来编写自定义的WebSocketServerHandler,以实现特定的业务逻辑。

2、在 Netty 中,ChannelHandlerContext 是一个上下文对象,代表了 ChannelHandler 和 ChannelPipeline 之间的关联。它包含了当前 ChannelHandler 的上下文信息以及与其相关联的 Channel、EventLoop、ChannelPipeline 等。通过 ChannelHandlerContext,你可以获取当前处理器的相关信息,并与其他处理器、Channel 进行交互。

在你提供的代码中,ChannelHandlerContext 被用于处理接收到的 WebSocket 消息。具体来说,你可以通过 ctx 来执行以下操作:

  • 获取当前 Channel:通过 ctx.channel() 方法获取当前的 Channel 对象,你可以通过它进行消息的发送和连接的管理。
  • 获取关联的 ChannelPipeline:通过 ctx.pipeline() 方法获取当前的 ChannelPipeline 对象,你可以对其进行添加、删除和修改处理器等操作。
  • 获取关联的 EventLoop:通过 ctx.executor() 方法获取当前的 EventExecutor(一般为 EventLoop),你可以将任务提交给它来异步执行。
  • 发送消息:通过 ctx.writeAndFlush() 方法向当前连接的客户端发送消息。

总之,ChannelHandlerContext 提供了与当前处理器相关的一切信息,以便你在处理消息时能够方便地与其他组件进行交互。

3、服务端WebSocketServer代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;


public class WebSocketServer {

    private final int port = 8080;

    public void run() throws Exception {
        // 创建两个EventLoopGroup,一个用于接收客户端连接,另一个用于处理客户端连接的IO操作
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        System.out.println("test1");

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 指定使用NIO传输
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            // 添加HTTP编解码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            // 添加HTTP对象聚合器,将HTTP消息的多个部分合成一条完整的HTTP消息
                            ch.pipeline().addLast(new HttpObjectAggregator(65536));
                            // 添加WebSocket协议处理器,将HTTP协议升级为WebSocket协议
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
                            // 添加自定义的WebSocket处理器
                            ch.pipeline().addLast(new WebSocketServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置TCP参数
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置TCP参数
            System.out.println("test3");

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync();
            System.out.println("test4");

            // 等待服务器套接字关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭EventLoopGroup,释放所有的资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("test2");
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println("test0");
        WebSocketServer server = new WebSocketServer();
        server.run();
    }
}

该示例代码中使用了Netty框架来构建WebSocket服务器。主要步骤包括创建两个EventLoopGroup(用于接收客户端连接和处理客户端连接的IO操作)、配置ServerBootstrap(设置服务器参数,如传输类型、处理器)、设置ChannelInitializer(初始化通道,添加处理器链)以及绑定端口并启动服务器。

5.3 WebSocket 客户端架构流程

当使用 Netty 构建 WebSocket 客户端时,通常会遵循以下流程:

  1. 创建 EventLoopGroup:EventLoopGroup 是用于处理 I/O 事件的线程池。通常会创建一个 NioEventLoopGroup 对象,它包含了一组 NIO 线程,用于处理连接、读取、写入等操作。

  2. 配置 Bootstrap:Bootstrap 是 Netty 的启动类,用于设置客户端的引导配置。通过 Bootstrap 对象,我们可以设置 EventLoopGroup、Channel 类型以及 ChannelHandler 等属性。

  3. 创建并配置 ChannelInitializer:ChannelInitializer 是一个特殊的 ChannelHandler,用于初始化 Channel 的管道。在 ChannelInitializer 中,我们可以添加一系列的 ChannelHandler,在初始化过程中将它们添加到 ChannelPipeline 中。

  4. 创建 WebSocketClientHandshaker:WebSocketClientHandshaker 是用于进行握手的类。它提供了一些参数,如 URI、WebSocket 版本、自定义扩展等,用于构建握手请求。

  5. 连接到 WebSocket 服务器:通过调用 Bootstrap 的 connect 方法,传入服务器的主机和端口,与 WebSocket 服务器建立连接。连接成功后,会返回一个 ChannelFuture 对象,可以通过该对象获取与服务器的 Channel。

  6. 设置 WebSocketClientHandshaker:从 Channel 中获取 WebSocketClientHandler,并将 WebSocketClientHandshaker 设置到 WebSocketClientHandler 中。这样,WebSocketClientHandler 就可以使用 WebSocketClientHandshaker 进行握手操作。

  7. 发起握手请求:通过调用 WebSocketClientHandshaker 的 handshake 方法,发起握手请求。握手请求会发送到服务器,并等待服务器的响应。

  8. 连接建立成功后,可以发送消息到 WebSocket 服务器:在握手成功后,可以通过 Channel 向 WebSocket 服务器发送消息。消息需要封装为合适的 WebSocketFrame,例如 TextWebSocketFrame 或 BinaryWebSocketFrame。

  9. 处理服务器返回的消息:WebSocketClientHandler 会接收到服务器返回的消息,可以在其中处理和解析服务器发送过来的消息。

  10. 关闭连接:当不再需要与服务器通信时,可以调用 Channel 的 close 方法关闭连接。同时,需要调用 EventLoopGroup 的 shutdownGracefully 方法来优雅地关闭 EventLoopGroup。

以上是 Netty WebSocket 客户端的基本工作架构流程。通过合适的配置和处理逻辑,我们可以构建出强大而高效的 WebSocket 客户端应用程序。

注意:

不要在客户端的 channelActive 方法中发送消息,WebSocketClientProtocolHandler 不支持 Active,使用 channelActive 发送消息没有响应。发送消息需要在客户端启动后,通过异步的方式。小心踩坑

5.4 WebSocket 客户端代码

1、依赖

    <dependencies>
        <!-- Netty dependencies -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.68.Final</version>
        </dependency>
    </dependencies>

2、代码示例

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;


import java.net.URI;
import java.util.concurrent.CountDownLatch;

public class WebSocketClient {

    static final CountDownLatch latch = new CountDownLatch(1); // 用于阻塞主线程,直到握手完成

    public static void main(String[] args) throws Exception {
        // 目标 WebSocket 地址
        String url = "ws://127.0.0.1:8080/websocket";
        WebSocketClient client = new WebSocketClient();
        client.test(url);
    }

    public void test(String url) throws Exception {
        Channel dest = dest(url); // 获取目标通道
        latch.await(); // 等待握手完成
        dest.writeAndFlush(new TextWebSocketFrame("我就是要发送的消息2")); // 发送消息
    }

    public Channel dest(String url) throws Exception {
        final URI webSocketURL = new URI(url); // 目标 WebSocket 地址

        EventLoopGroup group = new NioEventLoopGroup(); // 创建事件循环组
        Bootstrap boot = new Bootstrap(); // 创建引导程序
        boot.option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .group(group)
                .handler(new LoggingHandler(LogLevel.INFO)) // 添加日志处理器,用于打印日志信息
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel sc) throws Exception {
                        ChannelPipeline pipeline = sc.pipeline();
                        pipeline.addLast(new HttpClientCodec()); // HTTP 客户端编解码器,用于处理 HTTP 请求和响应
                        pipeline.addLast(new ChunkedWriteHandler()); // 支持大数据流写入
                        pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // 聚合 HTTP 消息,将多个消息合并成一个完整的 FullHttpRequest 或 FullHttpResponse
                        // WebSocket 客户端协议处理器,用于处理 WebSocket 握手和帧的编解码
                        pipeline.addLast(new WebSocketClientProtocolHandler(
                                WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));
                        pipeline.addLast(new WebSocketClientHandler());

                    }
                });

        ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync(); // 连接到目标 WebSocket 服务器

        return cf.channel(); // 返回通道
    }

    public static void send(Channel channel) {
        final String textMsg = "握手完成后发送的消息"; // 要发送的消息内容

        if (channel != null && channel.isActive()) {
            TextWebSocketFrame frame = new TextWebSocketFrame(textMsg); // 创建 WebSocket 文本帧
            channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {
                if (channelFuture.isDone() && channelFuture.isSuccess()) {
                    System.out.println("     ================= 发送成功.");
                } else {
                    channelFuture.channel().close();
                    System.out.println("     ================= 发送失败. cause = " + channelFuture.cause());
                    channelFuture.cause().printStackTrace();
                }
            });
        } else {
            System.out.println("消息发送失败! textMsg = " + textMsg);
        }
    }


    // WebSocket客户端处理器
    public static class WebSocketClientHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


        // 当从服务器接收到消息时调用
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
            System.out.println(" 客户端收到消息======== " + msg.text());

        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE.equals(evt)) {
                System.out.println(ctx.channel().id().asShortText() + " 握手完成!");
                latch.countDown(); // 计数减一,握手完成
                send(ctx.channel()); // 发送消息
            }
            super.userEventTriggered(ctx, evt);
        }


        // 当通道不活动时调用
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            System.out.println("channelInactive");
        }

    }
}

final CountDownLatch latch = new CountDownLatch(1); // 用于阻塞主线程,直到握手完成。

这行代码创建了一个 CountDownLatch 对象,该对象用于在主线程中阻塞,直到计数器减少到零。在这里,计数器的初始值为 1,表示需要等待一个事件发生。主线程在 latch.await() 被调用时会被阻塞,直到另一个线程调用了 latch.countDown() 来减少计数器的值,使得计数器为零,主线程才会继续执行。在这个代码中,主线程需要等待 WebSocket 握手完成后才能继续执行,因此使用 CountDownLatch 来实现等待握手完成的功能。

上述示例代码仅用于演示 Netty 框架构建 WebSocket 客户端的基本流程,实际应用中可能还需要考虑异常处理、心跳机制等更复杂的逻辑。

六、相关内容:jmeter压测WebSocket协议

jmeter压测websocket协议-CSDN博客