Apache Kafka系列(三) Java API使用

时间:2022-08-07 16:47:29

Apache Kafka系列(一) 起步

Apache Kafka系列(二) 命令行工具(CLI)

Apache Kafka系列(三) Java API使用

摘要:

  Apache Kafka Java Client API

一、基本概念

  Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如:

    1.创建Topic

    2.罗列出已存在的Topic

    3.对已有Topic的Produce/Consume测试

  跟其他的消息系统一样,Kafka提供了多种不用语言实现的客户端API,如:Java,Python,Ruby,Go等。这些API极大的方便用户使用Kafka集群,本文将展示这些API的使用

二、前提

  • 在本地虚拟机中安装了Kafka 0.11.0版本,可以参照前一篇文章:  Apache Kafka系列(一) 起步
  • 本地安装有JDK1.8
  • IDEA编译器
  • Maven3

三、项目结构

  Maven pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.randy</groupId>
<artifactId>kafka_api_demo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Maven</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
</project>

 

四、源码

  4.1 Producer的源码    

package com.randy;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
* Author : RandySun
* Date : 2017-08-13 16:23
* Comment :
*/
public class ProducerDemo {

public static void main(String[] args){
Properties properties
= new Properties();
properties.put(
"bootstrap.servers", "192.168.1.110:9092");
properties.put(
"acks", "all");
properties.put(
"retries", 0);
properties.put(
"batch.size", 16384);
properties.put(
"linger.ms", 1);
properties.put(
"buffer.memory", 33554432);
properties.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer
<String, String> producer = null;
try {
producer
= new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg
= "Message " + i;
producer.send(
new ProducerRecord<String, String>("HelloWorld", msg));
System.out.println(
"Sent:" + msg);
}
}
catch (Exception e) {
e.printStackTrace();

}
finally {
producer.close();
}

}
}

  可以使用KafkaProducer类的实例来创建一个Producer,KafkaProducer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:

  • bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.110:9092");

   bootstrap.servers是Kafka集群的IP地址,如果Broker数量超过1个,则使用逗号分隔,如"192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110是我的其中一台虚拟机的

           IP地址,9092是所监听的端口

  • key.serializer   &  value.serializer
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消

   息序列化为二进制类型。本例是发送文本消息到Kafka集群,所以使用的是StringSerializer。

  • 发送Message到Kafka集群
   for (int i = 0; i < 100; i++) {
String msg
= "Message " + i;
producer.send(
new ProducerRecord<String, String>("HelloWorld", msg));
System.out.println(
"Sent:" + msg);
}

   上述代码会发送100个消息到HelloWorld这个Topic

  4.2 Consumer的源码

package com.randy;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
* Author : RandySun
* Date : 2017-08-13 17:06
* Comment :
*/
public class ConsumerDemo {

public static void main(String[] args){
Properties properties
= new Properties();
properties.put(
"bootstrap.servers", "192.168.1.110:9092");
properties.put(
"group.id", "group-1");
properties.put(
"enable.auto.commit", "true");
properties.put(
"auto.commit.interval.ms", "1000");
properties.put(
"auto.offset.reset", "earliest");
properties.put(
"session.timeout.ms", "30000");
properties.put(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(
"value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer
<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(
"HelloWorld"));
while (true) {
ConsumerRecords
<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}

}
}

  可以使用KafkaConsumer类的实例来创建一个Consumer,KafkaConsumer类的参数是一系列属性值,下面分析一下所使用到的重要的属性:

  • bootstrap.servers

    和Producer一样,是指向Kafka集群的IP地址,以逗号分隔。

  • group.id

     Consumer分组ID

  • key.deserializer and value.deserializer

     发序列化。Consumer把来自Kafka集群的二进制消息反序列化为指定的类型。因本例中的Producer使用的是String类型,所以调用StringDeserializer来反序列化

  Consumer订阅了Topic为HelloWorld的消息,Consumer调用poll方法来轮循Kafka集群的消息,其中的参数100是超时时间(Consumer等待直到Kafka集群中没有消息为止): 

        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
while (true) {
ConsumerRecords
<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}

五、总结

  本文展示了如何创建一个Producer并生成String类型的消息,Consumer消费这些消息。这些都是基于Apache Kafka 0.11.0 Java API。