10 分钟了解 Pulsar:针对 Kafka 用户的指南

时间:2022-12-11 07:58:41


本文翻译自 StreamNative 博客《Understanding Pulsar in 10 Minutes: A Guide for Kafka Users》,作者 Elliot West,StreamNative 软件工程师。本文由 StreamNative 组织翻译。

译者简介

王中兴,社区昵称 AlphaWang,就职于 eBay 消息中间件。

本文将结合你现有的 Apache Kafka 知识来带你入门 Apache Pulsar[1]。我们将展示 Apache Kafka 的基本概念是如何映射到 Apache Pulsar 中的,这样你就能够通过已有的 Kafka 知识快速上手 Pulsar 中类似的使用场景。

Apache Pulsar 为成熟的 Apache Kafka[2] 生态提供了一个非常有吸引力的替代选择。Pulsar 中的差异化功能及架构可以克服 Kafka 中的种种限制。虽然 Kafka 和 Pulsar 都是高度可扩展以及持久化的分布式事件流平台,但是它们又有许多不同之处。例如,Pulsar 可以提供灵活的订阅、统一的流和消息、以及分离存储等等。

我有幸使用过 Apache Kafka 许多年,将其作为企业级平台提供给数据科学家、工程师和分析师使用。因此,我遇见过多种多样的 Kafka 使用场景,并接触过 Kafka 生态系统的多个方面。当我加入 StreamNative[3] 团队并使用 Pulsar 时,我想看看我的 Kafka 经验在 Pulsar 环境下会是什么样子。本文的阅读前提是你对 Kafka 已经有一定的理解,因为我不会从头开始讲解诸如主题等基本概念。本文不会对比平台的功能,而是专注于从生产者和消费者的角度讲解 Pulsar 如何实现与 Kafka 类似的功能。

注:本文不会讨论 Pulsar 的 Kafka 插件协议(KoP[4]),KoP 将 Pulsar 封装起来,直接使用 Broker 的资源使现有的 Kafka 客户端和 Pulsar 进行交互,使得客户端就好像是在和 Kafka Broker 交互一样。

主题

让我们先从主题开始,因为主题是 Kafka 和 Pulsar 中最基本的概念。你可以把 Pulsar 主题看作是单个 Kafka 主题分区。这里你可能会对性能有疑问,因为多分区是 Kafka 横向扩展能力的原因所在。不过不用担心,Pulsar 在主题的基础上提供了一个类似的概念,即“分区主题[5]”。在分区主题中,一组 Pulsar 主题在逻辑上被分成一组,实现与 Kafka 分区主题类似的功能。之所以存在这种区别,是因为 Pulsar 除了支持 Kafka 所针对的流式处理场景之外,还支持传统消息架构,而非分区主题是传统消息架构中的基本组件。由于我们关注的是 Kafka 使用场景,因此在下文中我们假设主题一词表示 Pulsar 分区主题,或者说 Kafka 主题。

PulsarAdmin admin = ...;
admin.topics().createPartitionedTopic(topic, partitions);

主题保留策略

与 Kafka 相比,Pulsar 在消息生命周期方面拥有更大的灵活性。默认情况下,Pulsar 会永远保留所有未确认消息,并立即删除已确认消息。当然,这两个行为[6]均可调整,可以通过设置消息保留时间来保留已确认消息,通过设置消息过期时间来清除未确认消息。

而 Kafka 的保留策略则不关心消费者活动,消息是被持久化还是被清除完全与消费者是否消费过无关。用户也许不希望在 Pulsar 中复制 Kafka 的这种行为,因为这样一来消息保留策略的灵活性就更小了。不过为了学习的目的,我们还是来看看如何实现这一点。首先,我们来针对被所有消费者都确认过的消息设置保留策略。

PulsarAdmin admin = ...;
admin.topicPolicies().setRetention(
“topic-name”,
new RetentionPolicies(sizeInMins, sizeInMB)
);

然后设置未确认消息的过期时间:

admin.topicPolicies().setMessageTTL(
“topic-name”,
messageTTLInSeconds
);

主题压实

与 Kafka 一样,Pulsar 也支持对主题进行压实(Compaction)。然而其内部实现略有不同,Pulsar 同时维护着未压实和已压实的数据。在压实过程中会产生一个新的已压实 Ledger,而原有的已压实 Ledger 会被丢弃。

10 分钟了解 Pulsar:针对 Kafka 用户的指南

Pulsar 压实主题包含两个 Ledger

正因为同时存储了未压实和已压实数据,消费者可以根据使用场景决定消费任何一种数据,只需简单的配置即可。

10 分钟了解 Pulsar:针对 Kafka 用户的指南

消费者可以选择消费已压实或未压实数据

Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
.topic("some-compacted-topic")
...
.readCompacted(true)
.subscribe();

Pulsar 也有类似于 Kafka 的消息删除语义。只需发布一个带有空值的墓碑消息,在下一个压实周期中就会删除对应的 key 值消息。除了基于 key 的删除,Kafka 还支持针对压实主题指定保留时间。例如,你可以指定无期限保留,始终保持所有 key 的最新消息,提供类似于键值存储的存储语义。或者,你也可以指定一个保留期,这样消息就会以 TTL 的方式过期。

Pulsar 在这方面的灵活性则稍差一些。要想从已压实 Ledger 中删除消息,就只能通过显式的基于 key 的删除,否则 Pulsar 默认保留所有 key 的最新消息。Pulsar 的压实主题也可以设置保留策略,但这只适用于未压实 Ledger,即未压实的消息。鉴于这一限制,在考虑 key 基数时必须格外小心。如果需要指定已压实 Ledger 的消息 TTL,那么就有必要通过手动流程来标记和删除消息。

Schema

在 Kafka 生态中,可以通过 Confluent Schema 注册表以及生产者/消费者 SerDes 来实现对消息的 Schema 验证。应用程序开发人员可以对客户端进行配置,让客户端与注册表服务集成并能够发现和发布其 Schema。通过适当的配置,客户端可以对发布的消息进行验证。

Pulsar 也有类似的功能,即服务器端 Schema 策略[7]。但请注意,“服务器端”这个提法意味着 Schema 注册表功能是与 Pulsar Broker 集成的,而不是解耦的第三方服务。如此一来,这种集成会直接体现在 Pulsar 客户端 API 中,因此不需要额外配置 Schema 注册表端点(Endpoint)、SerDes 以及主题命名策略(Subject Name Strategy)。Pulsar 支持与 Kafka 相同的 Schema 类型(Avro、Protobuf 等)以及演化保证。

Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
.topic(topic)
.create();
User user = new User("Tom", 28);
producer.send(user);

生产者

多个生产者可以同时连接到 Kafka 主题并向其发送消息。Pulsar 也提供这种主题访问语义,使用默认的 ​​Shared​​ 生产者访问模式[8]即可。因此,可以预期对于大多数使用场景,我们不需要额外的操作即可达到同样的效果。请注意,虽然单个 Kafka 生产者实例可以写入多个主题,但 Pulsar 的习惯做法是每个主题一个生产者实例。

分区

分区是生产者在主题的各个分区之间分发消息的过程。在 Kafka 中,我们可以通过在每个消息上设置一个 key 并为生产客户端提供一个分区器[9]来控制消息分发。Pulsar 采用了非常类似的方法,但其术语略有不同。在 Pulsar 中,我们也可以通过设置消息的 key 来指导分区,不过 Pulsar 的消息 key 必须是字符串类型,而 Kafka 则支持 ​​byte[]​​ 类型。另外,可以通过配置 Producer 客户端的 MessageRoutingMode[10] 来进一步控制分区。Pulsar 的默认路由模式是 ​​RoundRobinPartition​​,使用消息 key 的哈希值来将消息分配到分区,而 round-robin 模式可以分配没有 key 的消息。比较方便的一点是,这些分区语义与 Kafka 的默认分区器相同,因此可以预期对于大多数使用场景,我们不需要额外的操作即可达到同样的效果。此外,对于自定义行为,Pulsar 的路由模式提供了与 Kafka 分区器相同的灵活性。

副本

与 Kafka 一样,Apache Pulsar 可以为接收到的消息保存多个副本,实际上 Pulsar 在这方面还提供了一些有趣的灵活性。与 Kafka 不同的是,Pulsar 的消息存储与 Broker 分离,即不受存储位置的限制。在实践中这意味着我们可以独立地设置消息副本的数量和存储节点的数量。Kafka 分区副本的消息只能写入特定的节点,而 Pulsar 的消息则分布在一组 Bookie(又称“Ensemble”)上。Jack Vanlightly 在他的博客[11]中生动地描述了这一点:“Kafka 主题就像一根根 Toblerone 巧克力棒……而 Pulsar 主题就像气体,可以*膨胀充满可用的空间”。请看下面的例子,在 5 个节点的 Pulsar 和 Kafka 集群中保存消息 ​​M1-M3​​,副本数量为 3。

10 分钟了解 Pulsar:针对 Kafka 用户的指南

本地消息持久化与分离消息持久化

讲到副本,就不得不提副本故障,这是 Kafka 的痛点之一。你可能会想如果失去一个 Bookie 会怎么样?对集群有什么影响?可以确定的是,这种情况下集群需要拷贝副本不足的数据,但与 Kafka 不同,Pulsar 的影响后果更小。重要的是我们可以继续写入新数据,并保持所需的持久性;这是因为 Pulsar 的分区存储是高度分布式的,而 Kafka 则需要完整地同步分区数据才行。

消息确认

Kafka 生产者发送消息时可能会被阻塞,直到收到最低数量的 ACK,这样可以确信所生产的消息已被成功持久化到一定数量的副本中。Pulsar 也是如此,但也有一些差异。在 Kafka 中,生产者需要告诉 Broker 它需要多少个 ACK。因此,同一主题的不同生产者可能会向 Broker 请求不同数量的 ACK。在 Pulsar 中,ACK 由负责持久化消息的 Bookie 请求,而不是 Broker。此外,ACK 的数量也不由生产者指定。相反,它在主题级别上指定,直到达到所需的 ACK 数量前,任何同步生产者都会被阻塞。

PulsarAdmin admin = ...;
admin.topicPolicies().setPersistence(topic, new PersistencePolicies(
nodes,
replicas,
acks,
0.0
));

最小 ISR(In-Sync Replica)

在 Pulsar 中要想设置与 Kafka 等价的 ACK 值比较复杂,因为 Kafka 的 ​​all​​​ ACK 设置、集群范围内的 ​​min.insync.replicas​​​ 设置、以及 Kafka 主题的副本数量会相互影响。生产者的 ​​all ACKs​​​ 设置引入了不确定性;实际的 ACK 数量取决于当时的 ISR 数量。Kafka 生产者实际得到的 ACK 数量将介于 ​​min.insync.replicas​​ 的值和请求的 ACK 数量之间。

消费者

订阅类型决定着主题消费者的行为。订阅独立于主题,多个消费者可同时应用不同的消费语义。Pulsar 消息可以被单条确认,也可以不按顺序确认,以支持不同的消息场景。

消费者组

我们可以通过使用特定的 Pulsar 订阅类型来实现类似 Kafka 消费者组的功能。具体来说,我们可以创建订阅类型为灾备(Failover)[12]的持久化(Durable)[13]订阅。持久性可以确保确认被持久化(很像 Kafka 的偏移量管理);Pulsar 中的订阅默认是持久化的。灾备订阅类型确保只有一个消费者实例可以从分区中读取数据,消费者发生故障时,分区将会分配给另一个消费者实例。Pulsar 的默认订阅类型并不是灾备,因此必须明确指定。订阅一旦被创建,就必须由逻辑组中的所有消费者实例共享,分区将在这些实例之间进行分配。

消费顺序

在 Pulsar 中,灾备[14]订阅类型确保任何时间都只有一个消费者可以从分区中读取数据,因此可以在分区范围内保证有序,这一点与 Kafka 一样。

偏移量与提交

我将重点介绍 Kafka 中基于 Broker 的偏移量管理,因为这是最常见的使用场景。然而,Pulsar 消费者与 Kafka 一样也可以根据需要选择手动管理偏移量。一个关键区别是 Pulsar 还支持单条的、无序的消费者消息确认,这一点与传统消息队列一样。这与 Kafka 的偏移量形成鲜明对比,Kafka 只标记被消费的最后一条消息,这表示之前的所有消息都已被消费,也就是累积消息确认。

10 分钟了解 Pulsar:针对 Kafka 用户的指南

Pulsar 的单条消息确认与累积消息确认

幸运的是,Pulsar 的消费者 API 中包含了与 Kafka 等价的操作,也支持累积确认[15]当前消息及其之前的所有消息,而且你可以选择同步或异步地执行此操作。最终,其语义等同于 Kafka 的自动提交,因为 Pulsar 的 ACK 会定期刷新到 Broker 并被持久化。

try (PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build());
Consumer<DomainObject> consumer = client.newConsumer(schema)
.subscriptionName("subscription")
.subscriptionType(SubscriptionType.Failover)
.topic(topic)
.subscribe()) {

while(true) {
Message<DomainObject> message = consumer.receive();
// Process message
consumer.acknowledgeCumulative(message);
}
}

总结

在结束本文之前,让我们总结一下上文所讲的 Pulsar 和 Kafka 概念之间的映射关系。请注意,每个 Kafka 概念通常都会转化为 Pulsar 概念里的一个特定配置。出现这种情况是因为 Pulsar 提供了高度灵活的原语,可以据此创建更广泛的消息架构,而不局限于 Kafka 所提供的流式使用场景。重要的是,Pulsar 可以根据需要复制 Kafka 的行为,这一点不足为奇,因为 Pulsar 甚至支持与 Kafka 兼容的协议处理器[16]

Kafka

Pulsar

主题

分区主题[17](持久化)

主题分区

主题[18](持久化)

主题保留策略

主题保留策略 + 消息过期策略[19]

分区副本

主题持久化策略:副本[20]

压实主题

主题压实[21]

生产者

生产者(accessMode = Shared[22]

生产者 ACK

主题持久化策略:ACK[23]

消费者组

订阅[24](灾备[25]类型的持久化[26]订阅)

提交

累积消息确认[27]

Schema 注册表

服务器端 Schema 策略[28]