Kafka基础/1

时间:2024-04-10 15:09:29

Kafka

概念

Kafka 是一个分布式的流媒体平台。

应用:消息系统、日志收集、用户行为追踪、流式处理

特点:高吞吐量、消息持久化、高可靠性、高扩展性

术语

  • broker:Kafka 的服务器,Kafka 当中每一台服务器,我们称其为broker

  • Zookeeper:管理集群(Kafka 内置的 Zookeeper)

  • Topic:点对点(很像阻塞队列),发布订阅模式(Kafka采用的。生产者把消息放到某个位置,然后很多个消费者同时关注这个位置,订阅这个位置,然后读取消息,这个时候这个消息可以被多个消费者同时读到或者先后读到,相当于一个文件夹,存放消息的位置)

  • Partition:分区,是对主题这个位置的一个分区,可以把主题分为n个区域,然后就可以采用多线程的方式同时向这n个分区里面写数据,增强并发能力。每一个它是从前往后按照顺序往队尾里追加数据的,然后按照索引顺序读取数据。

  • offset:这个消息在这个分区内存放的这个索引序列

  • Leader Replica:主副本,当想从分区中读取数据的时候,主副本可以给出数据,处理请求

  • Follower Replica:从副本,只是备份,只是从主副本备份数据,不负责做响应。当主副本挂掉,集群就会从从副本中选一个新的作为主副本

具体使用:

创建主题:
在这里插入图片描述

Kafka 默认端口90922

生产者:
在这里插入图片描述

消费者:
在这里插入图片描述

Spring 整合 Kafka

配置的时候:

  • server:项目中的 server 只有一个
  • 消费者的组
  • 是否自动提交
  • 自动提交的频率

Kafka 小样例:

一、首先定义一个对象,对事件进行封装

package com.conquer.community.entity;

import java.util.HashMap;
import java.util.Map;

public class Event {

    private String topic;  //主题/事件的类型
    private int userId;  //事件触发的人
    private int entityType;  //事件发生在哪个实体(帖子、点赞、评论)
    private int entityId;  //实体的 ID
    private int EntityUserId;   //实体的作者
    private Map<String, Object> data = new HashMap<>();  //把其它的额外的数据全都存到这个 map 中,这样就具有了一定的扩展性

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;  //有了一些改动,好处:当用了 set 方法之后,当我 set 了 topic 我当然还要 set 其它的属性,当我set了这个属性之后,又返回这个对象,我又可以调用当前对象的其它 set 方法,所以我们写的时候可以 event.setTopic().setUserId.set...(链式编程) 就比较方便
    }

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityUserId() {
        return EntityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        EntityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key, Object value) {
        this.data.put(key, value);
        return this;   //这样的话一次传一个值,还想传的话继续点就可以
    }
}

Kafka 零拷贝原理

在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,那么它必须要经过这样几个拷贝过程
在这里插入图片描述

1、从磁盘中读取目标文件内容拷贝到内核缓冲区

2、CPU 控制器再把内核缓冲区的数据复制到用户空间的缓冲区中

3、在应用程序中,调用 write() 方法,把用户空间缓冲区中的数据拷贝到内核下的 Socket Buffer 中

4、把在内核模式下的 SocketBuffer 中的数据复制到网卡缓冲区,网卡缓冲区再把数据传输到目标服务器上

这这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历四次拷贝,而这四次拷贝过程中,有两次拷贝是浪费的。分别是:

  • 从内核空间复制到用户空间
  • 从用户空间再次复制到内核空间

除此之外,由于用户空间和内核空间的切换,会带来上下文的一个切换,对 CPU 的一个性能也会造成一些影响。而零拷贝就是把这两次多余的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核直接传送到 Socket,而不再需要经过应用程序所在的用户空间。

**零拷贝通过 DMA 技术把文件内容复制到内核空间的 Read Buffer,接着,把包含数据位置和长度信息的文件描述符加载到 Socket Buffer 中,DMA 引擎直接把数据从内核空间传递给网卡设备。**在这样的一个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了两次 CPU 的上下文切换,极大的提高了效率。

结论:

所谓零拷贝,并不是完全没有数据复制,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。

实现零拷贝:
  • 在 Linux 中,零拷贝技术依赖于低层的 sendfile() 方法实现

  • 在 Java 中,FileChannal.transferTo() 方法的底层实现就是 sendfile() 方法

  • mmap 文件映射机制:原理 —— 将磁盘文件映射到内存,用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。

ISR 机制

ISR 机制是 Kafka 保障数据可靠性的关键概念之一。从三个方面来说:

1、ISR 的概念

ISR 代表着一组与 Topic 分区 Leader 保持同步的 Follower 分区的副本

2、ISR 的工作原理

ISR 列表中的副本会定期向 Leader 同步数据,确保数据的一致性。只有 ISR 列表中的副本才会参与数据的同步操作,如果某个副本无法及时同步,那么它将会从 ISR 列表中移除。

3、ISR 的重要性

ISR 机制对于数据的可靠性非常重要,当某个副本无法及时同步或者发生故障的时候,Kafka 会从 ISR 列表中去选择另外一个副本作为 Leader,确保高可用性和数据的一致性。

总的来说,Kafka 的 ISR 机制是确保数据可靠性和一致性的关键概念,它通过维护与 Leader 同步的副本列表去确保数据的及时同步和可靠性。

Kafka 快在哪里

Kafka 是一个分布式流处理组件,它设计之初,就是用在一些高吞吐量的数据流应用和数据管道上,所以在早期很多互联网企业对于一些高吞吐量需求的场景清一色采用了 Kafka,在早期很多场景中,大部分用的是 ActiveMQ,发现性能是在满足不了需求,就直接切到 Kafka 上。

有几个比较好的设计:

1、数据的分片存储:Kafka 采用了 partition 的物理存储机制,把一个 topic 中的消息,分成多个数据分片,类似于分库分表逻辑

2、消息的持久化存储方案中,Kafka 的消息都是顺序追加的方式存储到磁盘上的。利用了磁盘的数据写入,减少了磁盘寻道的时间;再结合批量刷盘的操作,节省了磁盘的 IO 次数。

3、Kafka 使用操作系统的零拷贝机制来优化数据传输。这意味着再将消息从磁盘发送到网络接口的时候,可以减少数据的拷贝次数

4、Kafka 整体架构的可扩展性很强。它可以再不停机的情况下进行水平扩容,我们可以动态地添加更多的 Kafka Broker 以及分区来处理更高的负载

5、Kafka 为每个分区维护了一套简洁高效的索引。使得即使在非常大的数据量下,也能快速定位和检索消息。

这些特性共同工作使得 Kafka 在消息传输方面非常高效,特别适合需要处理大量数据流的场景。比如说日志搜集、事件源、实时分析、监控系统等

Kafka 中的 Partition 分区副本的 Leader 选举算法

Kafka 首先会选择一个具有最新数据的副本作为新的 Leader,也就是 ISR 集合的副本。其中 ISR 集合指的是与 Leader 同步的副本集合,也就是说,它们的数据同步状态与 Leader 最接近,并且它们与 Leader 副本的网络通信的延迟是最小的。

如果 ISR 集合没有可用副本,Kafka 会从所有副本中去选择一个具有最新数据的副本作为新的 Leader。在这种情况下选举出来的 Leader 由于和原来老的节点的数据存在较大的延迟,所以可能会造成一部分的数据丢失的情况。

所以 Kafka 设计者把这个功能开关的选择交给了开发人员,如果愿意接受这种情况,可以通过 unclean.leader.election.enable 这样一个参数来设置,开启之后虽然会造成数据丢失,但是至少可以保证依然能够对外提供服务,保证可用性。

MQ 如何保证消费顺序性

这个问题我想从两个方面来回答,一个是 Kafka 为什么会存在无序消费,第二个是 Kafka 如何保证有序消费

Kafka 为什么会存在无序消费

在 Kafka 的整个架构里面,用到了 Partition 的分区机制 实现消息的物理存储。也就是说在同一个 topic 里面可以维护多个 Partition 来去实现消息的一个分片。那么生产者在发送消息的时候会根据消息的 key 来进行取模,来决定把当前的消息存储到哪一个 Partition 里面。

而且消息是按照先后有序的去存储在 Partition 里面。假设一个 topic 里面有三个 Partition,而消息正好被路由到三个独立的 Partition 里面,然后消费端有三个消费者去通过 balance 的一个机制去分别指派了对应的消费分区,因为消费者是完全独立的网络节点,所以可能会出现消息的消费顺序不是按照发送顺序来实现的,从而导致消息的消息乱序的一个问题。

解决方法

自定义消息分区的一个路由算法,然后把指定的 key 都发送到同一个 Partition 里面,然后我们去指定一个消费者专门去消费某一个分区的数据,这样的话就保证了消息的顺序消费了。

另外在有些设计方案里面,在消费端会采用异步线程的方式来消费数据,以提高消息的处理效率。在这样的情况下,因为每个线程的消息处理效率是不同的。所以即便是采用单个分区的存储和消费也可能会出现无序访问的一个问题。对这样的问题的解决方法就是在消息消费端采用一个阻塞队列,把获取到的消息,先保存到阻塞队列里面,然后采用异步线程从阻塞队列里面去获取消息来进行消费。

参考 by B站 跟着Mic 学架构