kafka忽略集群Node信息,直接向``地址发送消息

时间:2025-04-24 07:23:35

背景

当kafka单机安装的时候或者集群安装的时候,kafka会先通过获取集群节点。
有时候网络复杂的时候

  • 如内网外部署
  • 地址映射
  • 代理转发等

配置地址可能为一个公网地址181.39.77.53:9092,然而返回的节点为内网地址172.16.31.33:9092,此时由于未开通172.16.31.33:9092网络,导致访问失败。

此时通常有两种解决方案

  • 开通kafka返回节点的网络
  • 修改kafka连接方式,忽略返回的node节点信息,直接访问配置的地址
    这里主要介绍下方案二,直接访问配置的地址

实现原理

  • 利用类加载机制,重复的类只会加载一次。
  • 同名类先加载的类先生效,后加载的类被忽略
  • 优先加载运行jar包内类。
    可以通过在项目内新建同名类(包名也要相同),修改源码覆盖的方式来实现。

实现代码

  • 在项目内新建
  • 通过修改initiateConnect(Node node, long now)实现
  /**
     * Initiate a connection to the given node
     * @param node the node to connect to
     * @param now current time in epoch milliseconds
     */
    private void initiateConnect(Node node, long now) {

        String nodeConnectionId = ();

        try {
            (nodeConnectionId, now, ());
            InetAddress address = (nodeConnectionId);

            // 开启单机版kafka连接
            // 如果单机kafka 直接连接``中配置的节点
            if () {
                KafkaChannel channel = ((Selector) selector).channel("-1");

                // 如果channel为空说明访问的``
                // 不为空则开始访问kafka返回的集群节点
                if(channel != null) {
                    InetSocketAddress remoteAddress = null;
                    try {
                        // 尝试通过反射方式获取`remoteAddress`
                        Field field = ("remoteAddress");
                        (true);
                        remoteAddress = (InetSocketAddress)(channel);
                        ("Initiating connection to node {} using address {}", node, address);
                        (nodeConnectionId,
                                remoteAddress,
                                ,
                                );

                    } catch (NoSuchFieldException | IllegalAccessException e) {
                        // 获取不到则获取远程地址
                        address = ();
                        ("Initiating connection to node {} using address {}", node, address);
                        (nodeConnectionId,
                                new InetSocketAddress(address, ()),
                                ,
                                );
                    }
                    return;
                }
            }
            ("Initiating connection to node {} using address {}", node, address);
            (nodeConnectionId,
                    new InetSocketAddress(address, ()),
                    ,
                    );
        } catch (IOException e) {
            ("Error connecting to node {}", node, e);
            // Attempt failed, we'll try again after the backoff
            (nodeConnectionId, now);
            // Notify metadata updater of the connection failure
            (now, nodeConnectionId, ());
        }
    }