kafka负载均衡相关资料收集(一)

时间:2022-09-03 08:39:29

key为null时Kafka会将消息发送给哪个分区?

当你编写kafka Producer时, 会生成KeyedMessage对象。

1
KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

但是这句话相当的误导人。

从字面上来讲,这句话没有问题, 但是这里的随机是指在参数"topic.metadata.refresh.ms"刷新后随机选择一个, 这个时间段内总是使用唯一的分区。 默认情况下每十分钟才可能重新选择一个新的分区。 但是相信大部分的程序员和我一样, 都理解成每个消息都会随机选择一个分区。
可以查看相关的代码:

key为null时Kafka会将消息发送给哪个分区?

当你编写kafka Producer时, 会生成KeyedMessage对象。

1
KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

但是这句话相当的误导人。

从字面上来讲,这句话没有问题, 但是这里的随机是指在参数"topic.metadata.refresh.ms"刷新后随机选择一个, 这个时间段内总是使用唯一的分区。 默认情况下每十分钟才可能重新选择一个新的分区。 但是相信大部分的程序员和我一样, 都理解成每个消息都会随机选择一个分区。
可以查看相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner
// So we look up in the send partition cache for the topic to decide the target partition
val id = sendPartitionPerTopicCache.get(topic)
id match {
case Some(partitionId) =>
// directly return the partitionId without checking availability of the leader,
// since we want to postpone the failure until the send operation anyways
partitionId
case None =>
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId
}
} else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
"; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
partition
}

如果key为null, 它会从sendPartitionPerTopicCache查选缓存的分区, 如果没有,随机选择一个分区,否则就用缓存的分区。

LinkedIn工程师Guozhang Wang在邮件列表中解释了这一问题,
最初kafka是按照大部分用户理解的那样每次都随机选择一个分区, 后来改成了定期选择一个分区, 这是为了减少服务器段socket的数量。不过这的确很误导用户,据称0.8.2版本后又改回了每次随机选取。但是我查看0.8.2的代码还没看到改动。

所以,如果有可能,还是为KeyedMessage设置一个key值吧。

LinkedIn工程师Guozhang Wang在邮件列表中解释了这一问题,如果key为null, 它会从sendPartitionPerTopicCache查选缓存的分区, 如果没有,随机选择一个分区,否则就用缓存的分区。

最初kafka是按照大部分用户理解的那样每次都随机选择一个分区, 后来改成了定期选择一个分区, 这是为了减少服务器段socket的数量。不过这的确很误导用户,据称0.8.2版本后又改回了每次随机选取。但是我查看0.8.2的代码还没看到改动。

所以,如果有可能,还是为KeyedMessage设置一个key值吧。

from:http://colobu.com/2015/01/22/which-kafka-partition-will-keyedMessages-be-sent-to-if-key-is-null/

kafka负载均衡相关资料收集(一)的更多相关文章

  1. kafka负载均衡相关资料收集(三)

    apache kafka系列之Producer处理逻辑 下文是转载的,原文链接地址:点这儿 [转] Kafka ProducerKafka Producer处理逻辑kafka生产者处理逻辑apache ...

  2. kafka负载均衡相关资料收集(二)

    [转]关于kafka producer 分区策略的思考 from:http://blog.csdn.net/ouyang111222/article/details/51086037 今天跑了一个简单 ...

  3. AssetBundle机制相关资料收集

    原地址:http://www.cnblogs.com/realtimepixels/p/3652075.html AssetBundle机制相关资料收集 最近网友通过网站搜索Unity3D在手机及其他 ...

  4. FastAdmin 导出 Excel 相关资料收集 (2018-08-14)

    FastAdmin 导出 Excel 相关资料收集 导出 Excel 文件时身份证号变成科学计数法怎么办? https://forum.fastadmin.net/thread/1346 姊妹篇 Fa ...

  5. FastAdmin 导入 Excel 相关资料收集 (2018-08-14)

    FastAdmin 导入 Excel 相关资料收集 新版本一键CRUD后自带导入功能,但是默认被禁用,如何启动 https://forum.fastadmin.net/thread/540 Excel ...

  6. iOS10以及xCode8相关资料收集

    兼容iOS 10 资料整理笔记 源文:http://www.jianshu.com/p/0cc7aad638d9 1.Notification(通知) 自从Notification被引入之后,苹果就不 ...

  7. nginx 负载均衡相关知识

    Nginx ("engine x") 是一个高性能的 HTTP 和 反向代理 服务器,也是一个 IMAP/POP3/SMTP 代理服务器. Nginx 是由 Igor Sysoev ...

  8. F5 负载均衡 相关资源

    F5负载均衡之检查命令的说明http://net.zdnet.com.cn/network_security_zone/2010/0505/1730942.shtml F5培训http://wenku ...

  9. IIS负载均衡相关

    1. IIS负载均衡 (比较简单的例子,能看到效果) 2.nginx+iis实现负载均衡 3.Windows平台分布式架构实践 - 负载均衡 4.Net分布式系统:Keepalived+LVS+Ngi ...

随机推荐

  1. Qt 动画框架

    最近一个项目中的需要做动画效果,很自然就想起来用qt animation framework .这个框架真的很强大.支持多个动画组合,线性动画,并行动画等.在此总结一下使用该框架一些需要注意的地方: ...

  2. VBS创建数据表

    '创建数据表'参数:strDBPath 字符串型 数据库路径'参数:strTableName 字符串型 需要创建的数据表的名称'参数:strColumnName 字符串型 初始化的字段名称,其实可以算 ...

  3. Net预编译 真的好用与否

    公司手机网站上千个 ASP.NET开发一套程序只是配置不一样,所有站点呈现的内容就不一样了, 以前的程序是ASP的,现在ASP程序猿少之又少了,所以公司要求转.NET,新开发也用NET.所有现在上千个 ...

  4. jquery mobile -role

    jquery mobile -role - cc_jony - 博客园 jquery mobile -role   data-page 页面 data-header 页面的头部 data-conten ...

  5. window&period;close&lpar;&rpar;方法对谷歌和火狐浏览器无效

    在近期的项目中,遇到了一个问题,就是用户到新浪支付进行操作,操作成功后,指定到一个网页,需求是点击确定,关闭该网页.需求出来以后认为这种就是小菜一碟,直接用 window.close()方法就可以实现 ...

  6. 用DOS命令来运行Java代码

    用DOS命令来运行Java代码.. ----------------- Demo.java public class Demo { public static void main(String[] a ...

  7. R语言︱文件读入、读出一些方法罗列(批量xlsx文件、数据库、文本txt、文件夹)

    笔者寄语:小规模的读取数据的方法较为简单并且多样,但是,批量读取目前看到有以下几种方法:xlsx包.RODBC包.批量转化成csv后读入. R语言中还有一些其他较为普遍的读入,比如代码包,R文件,工作 ...

  8. 亲密接触Redis-第二天(Redis Sentinel)

    简介 经过上次轻松搭建了一个Redis的环境并用Java代码调通后,这次我们要来看看Redis的一些坑以及Redis2.8以后带来的一个新的特性即支持高可用特性功能的Sentinel(哨兵). Red ...

  9. babel 插件编写

    一.开始 工具链接: 每一个节点都有如下所示的接口(Interface): interface Node { type: string; } 字符串形式的 type 字段表示节点的类型(如: &quo ...

  10. Docker第二章:docker基础1--镜像,容器&amp&semi;仓库

    镜像介绍及操作:http://www.haveneed.cn/article-detials/115 容器介绍及操作:http://www.haveneed.cn/article-detials/11 ...