kafka 0.10.2 消息消费者

时间:2023-02-27 16:57:15
package cn.xiaojf.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import sun.applet.Main; import java.util.Arrays;
import java.util.List;
import java.util.Properties; /**
* 消息消费者
* @author xiaojf 2017/3/22 15:50
*/
public class MsgConsumer {
private static final String GROUP = "MsgConsumer";
private static final List TOPICS = Arrays.asList("my-replicated-topic"); /**
* 自动提交offset
*/
public void autoCommit() {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());//key反序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName());//value反系列化方式
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//自动提交
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092");//指定broker地址,来找到group的coordinator
properties.put(ConsumerConfig.GROUP_ID_CONFIG,this.GROUP);//指定用户组 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(TOPICS); while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//100ms 拉取一次数据
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic: "+record.topic() + " key: " + record.key() + " value: " + record.value() + " partition: "+ record.partition());
}
}
} /**
* 手动提交offset
*/
public void consumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());//key反序列化方式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName());//value反系列化方式
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//手动提交
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092");//指定broker地址,来找到group的coordinator
properties.put(ConsumerConfig.GROUP_ID_CONFIG,this.GROUP);//指定用户组 KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(TOPICS);//指定topic消费 long i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//100ms 拉取一次数据
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic: "+record.topic() + " key: " + record.key() + " value: " + record.value() + " partition: "+ record.partition());
i ++;
} if (i >= 100) {
consumer.commitAsync();//手动commit
i = 0;
}
}
} public static void main(String[] args) {
new MsgConsumer().autoCommit();
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>

kafka 0.10.2 消息消费者的更多相关文章

  1. kafka 0&period;10&period;2 消息生产者&lpar;producer&rpar;

    package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafk ...

  2. kafka 0&period;10&period;2 消息生产者

    package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org ...

  3. kafka 0&period;8&period;2 消息消费者 consumer

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/20 ...

  4. Kafka 0&period;10问题点滴

    15.如何消费内部topic: __consumer_offsets 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter 最后用看了它的源码 ...

  5. Kafka 0&period;10&period;1版本源码 Idea编译

    Kafka 0.10.1版本源码 Idea编译 1.环境准备 Jdk 1.8 Scala 2.11.12:下载scala-2.11.12.msi并配置环境变量 Gradle 5.6.4: 下载Grad ...

  6. Kafka 0&period;10 KafkaConsumer流程简述

    ConsumerConfig.scala 储存Consumer的配置 按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的. 1.从poll开始 消费的规则 ...

  7. Kafka 0&period;10&period;1&period;1 特点

    1.Consumer优化:心跳线程可作为后台线程,提交offset,剥离出poll函数 问题:0.10新设计的consumer是单线程的,提交offset是在poll中.本次的poll调用,提交上次p ...

  8. kafka 0&period;10&period;2 cetos6&period;5 集群部署

    安装 zookeeper http://www.cnblogs.com/xiaojf/p/6572351.html安装 scala http://www.cnblogs.com/xiaojf/p/65 ...

  9. Kafka 0&period;10&period;0

    2.1 Producer API We encourage all new development to use the new Java producer. This client is produ ...

随机推荐

  1. &lpar;C&plus;&plus;&rpar; LNK2019&colon; unresolved external symbol&period;

    Error 33 error LNK2019: unresolved external symbol "\xxx.obj yyy.Native 仔细看看错误信息,后来发现尽然是构造函数的一个 ...

  2. SNMP 原理与实战详解

    原文地址:http://freeloda.blog.51cto.com/2033581/1306743 原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法 ...

  3. Mac后台开发MNMP&lpar;nginx &comma; mysql&comma; php&rpar;标配

    mysql安装: 方法:1.原始方法,下载压缩文件,解压,安装,配置            2.dmp文件安装            3.brew安装 这里使用brew安装:      a.brew ...

  4. 使用Kindle4rss推送自己感兴趣的博文

    微信是个好东西,信息量超大,正能量的东西居多,但信息过载的滋味也很不好受,浏览了一大堆铺天盖地的信息后,关上手机后大脑又重新回到空白.所以还是喜欢用RSS聚合功能,自己去订阅优秀的博客或新闻,当有更新 ...

  5. Java日志管理方法&lpar;转载&rpar;

    原文地址:http://www.cnblogs.com/leocook/p/log_java.html java开发中常见的几种日志管理方案有以下4种: 1. Commons-logging + lo ...

  6. css3 边框记

    css3 边框 border属性在css1中就已经定义了,使用它可以设置元素的边框风格,边框颜色以及边框粗细. border-width:设置元素边框的粗细. border-color:设置元素边框的 ...

  7. uboot之ldr指令

    刚开始接触uboot的时候,就一直对ldr指令很迷惑,因为这个指令有两层用法,一个是加载,一个是伪指令.今天闲着没事就来说一下这两个之间的区别. LDR伪指令的形式是"LDR Rn,=exp ...

  8. mssql sqlserver 三种数据表数据去重方法分享

    摘要: 下文将分享三种不同的数据去重方法数据去重:需根据某一字段来界定,当此字段出现大于一行记录时,我们就界定为此行数据存在重复. 数据去重方法1: 当表中最在最大流水号时候,我们可以通过关联的方式为 ...

  9. get windows auth code

    public static WindowsIdentityInfo GetWindowsIdentityInfo(HttpContext context) { WindowsIdentityInfo ...

  10. Linux C 编程

    主题链接地址:https://www.cnblogs.com/kele-dad/category/1194627.html