Kafka (一)

时间:2022-09-01 21:17:10

使用Kafka最新版本0.9

Kafka 配置


1. 安装

首先需要安装Java,推荐安装Java8,不然会出现一些莫名其妙的错误

kafka_2.11-0.9.0.0.tgz

tar -xzf kafka_2.11-0.9.0.0.tgz

为了方便,更改一下目录名

mv kafka_2.11-0.9.0.0.tgz kafka

2.配置Kafka服务端属性

安装的是单节点,集群的配置非常简单,可以看看其他的资料

cd config

vim server.properties

有2个关键属性需要修改

advertised.host.name

advertised.port

这2个属性需要修改成IP地址或者机器名,如果Kafka是部署在云端中,可能云端的虚拟机中有别的虚拟IP,需要在这2个地址配置。

这个测试环境中,是部署在虚拟机中,通过路由相连,仍然需要配置这2个属性,将advertised.host.name设置为虚拟机的IP地址。

如果不配置advertised.host.name,只配置host.name,在另外一台机器上使用Java的Client连接时,会被解析成localhost。

3. 启动Kafka自带的Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

 

4. 启动Kafka

bin/kafka-server-start.sh config/server.properties

Kafka 简单测试


1. 创建Topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2. 查看Topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

3. 生产消息

新建一个Console

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

输入字符串回车即可

4. 消费消息

新建一个Console

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在生产消息的Console,继续输入字符并回车,这里会即时消费

Kafka Java客户端


1. Maven文件

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>

2. Produce代码

        Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.160:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic2", Integer.toString(i), Integer.toString(i))); producer.close();

bootstrap.servers即Kafka broker的地址

3. Consumer代码

      Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.160:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
//consumer.subscribe(Arrays.asList("foo", "bar"));
consumer.subscribe(Arrays.asList("my-topic2")); while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

Consumer Group


Kafka的Consumer可以加入一个Group,如果一个Group中的Consumer数大于该Topic的Partition数,则某些Consumer不会Fetch到数据,如果Consumer数小于Partition数,则某些Consumer会比别的Consumer消费更多的消息。

如果一个Consumer没有加入任何Group,则这个Consumer会消费该Topic下所有Partition的消息

建议最好的方式是一个Consumer对应一个Partition

在Kafka中会储存每个Consumer消费消息的Offset,Consumer可以选择Auto Commit Offset或者Manual Commit

在进行设计时,如果对Consumer有一定一致性的要求,可以选择Manual Commit,例如Consumer宕机,重启或者新Consumer加入时,可以从该Partition的Offset记录位置开始消费,还有一种情况则是Consumer宕机,不重启并且没有新的Consumer加入,在0.9.0。0版本的Kafka中,会有一个Consumer的协调者来处理Consumer的Reblance,启动Reblance,那么在同一个Group中的其他Consumer也可以消费宕机的Consumer订阅的Partition的消息。但这样设计的前提是Consumer必须被设计成无状态的。

Kafka 传递的消息


Kafka传递的消息是<key,value>,让我想起了Map-Reduce,利用Kafka可以快速构建一个轻量级的MR并行分析

Kafka对Hadoop的支持也非常的好。

Kafka (一)的更多相关文章

  1. Spark踩坑记——Spark Streaming&plus;Kafka

    [TOC] 前言 在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark strea ...

  2. 消息队列 Kafka 的基本知识及 &period;NET Core 客户端

    前言 最新项目中要用到消息队列来做消息的传输,之所以选着 Kafka 是因为要配合其他 java 项目中,所以就对 Kafka 了解了一下,也算是做个笔记吧. 本篇不谈论 Kafka 和其他的一些消息 ...

  3. kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险. ...

  4. &period;net windows Kafka 安装与使用入门&lpar;入门笔记&rpar;

    完整解决方案请参考: Setting Up and Running Apache Kafka on Windows OS   在环境搭建过程中遇到两个问题,在这里先列出来,以方便查询: 1. \Jav ...

  5. kafka配置与使用实例

    kafka作为消息队列,在与netty.多线程配合使用时,可以达到高效的消息队列

  6. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  7. Kafka副本管理—— 为何去掉replica&period;lag&period;max&period;messages参数

    今天查看Kafka 0.10.0的官方文档,发现了这样一句话:Configuration parameter replica.lag.max.messages was removed. Partiti ...

  8. Kafka:主要参数详解(转)

    原文地址:http://kafka.apache.org/documentation.html ############################# System ############### ...

  9. kafka

    2016-11-13  20:48:43 简单说明什么是kafka? Apache kafka是消息中间件的一种,我发现很多人不知道消息中间件是什么,在开始学习之前,我这边就先简单的解释一下什么是消息 ...

  10. Spark Streaming&plus;Kafka

    Spark Streaming+Kafka 前言 在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端, ...

随机推荐

  1. maven基础知识

    1.maven基础知识 1.1maven坐标 maven坐标通常用冒号作为分割符来书写,像这样的格式:groupId:artifactId:packaging:version.项目包含了junit3. ...

  2. 【转】SSM框架——详细整合教程(Spring&plus;SpringMVC&plus;MyBatis)

    原文地址:http://blog.csdn.net/zhshulin/article/details/37956105 使用SSM(Spring.SpringMVC和Mybatis)已经有三个多月了, ...

  3. jquery&period;lazyload用法

    lazyload是jquery的插件,作为延迟加载图片,减压服务器压力. 如何使用: 先把 <script src="jquery.js" type="text/j ...

  4. MarkDown中锚点的使用

    在文档中创建锚点: <A NAME="ROP_ON_ARM">Davi L, Dmitrienko A, Sadeghi A R, et al. [Return-ori ...

  5. android 仿小米icon处理,加阴影和边框

    本人自己在做一个launcher,所以须要处理icon,加阴影和边框等.这仅仅是一种处理方法,其它的处理方法类似. 源码: https://github.com/com314159/LauncherI ...

  6. 【Git】 自动化Maven项目构建脚本(二)

    这次脚本增加了构建选择,可以按需构建了. #!/bin/bash #----------------------------------------------- # FileName: auto-b ...

  7. 安装了 R2 Integration Servic 之后,SQL Server 2008 Management Studio报错

    问题产生 IM数据库服务器未安装Integration Servic,影响备份.在安装了安装了 SQL Server 2008 R2 Integration Servic 之后,SQL Server ...

  8. 跟bWAPP学WEB安全&lpar;PHP代码&rpar;--XPath注入

    XML/Xpath注入 看了下,A2里面是认证与会话管理的破坏或称之为绕过,没有特别要写的,很多就是小问题,可能会将这类问题放在最后写一下.一篇博客,这里还是更多的着重在能够获取信息或者服务器权限的漏 ...

  9. so easy&comma; too happy

    一.预估与实际 PSP2.1 Personal Software Process Stages 预估耗时(分钟) 实际耗时(分钟) Planning 计划 • Estimate • 估计这个任务需要多 ...

  10. Paw —— 比Postman更舒服的API利器

    特点: 颜值高本地应用,流畅有收藏夹,管理请求可使用环境变量.比如用来一键切换开发环境请求和线上环境请求.即不同环境的同个接口只有host不一样,其它都是一样的,所以就把host抽离出来弄成一个环境变 ...