Kafka Topic Api

时间:2023-03-09 13:16:47
Kafka Topic Api

Pom:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.1.0</version>
</dependency>

JavaCode:

package com;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import scala.collection.JavaConversions; import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties; /**
*
*
* 测试kafka Topic CRUD
* @author chenwen
*
*/
public class TestKafkaApi { /**
*
* 创建topic
*/
@Test
public void createTopic() {
ZkUtils zkUtils = ZkUtils.apply("172.16.11.224:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 创建一个单分区单副本名为test的topic
AdminUtils.createTopic(zkUtils, "test",
1, // 分区数
1, // 副本数
new Properties(),
RackAwareMode.Enforced$.MODULE$);
zkUtils.close();
} /**
*
* 删除topic
*
*/
@Test
public void deleteTopic() {
ZkUtils zkUtils = ZkUtils.apply("172.16.11.224:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 删除topic 'test'
AdminUtils.deleteTopic(zkUtils, "test");
zkUtils.close();
} /**
*
* 查询某个topic的属性
*
*/
@Test
public void query() {
ZkUtils zkUtils = ZkUtils.apply("172.16.11.224:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 获取topic 'test'的topic属性属性
Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "__consumer_offsets");
// 查询topic-level属性
Iterator it = props.entrySet().iterator();
while(it.hasNext()){
Map.Entry entry=(Map.Entry)it.next();
Object key = entry.getKey();
Object value = entry.getValue();
System.out.println(key + " = " + value);
}
zkUtils.close();
} /**
*
* 判断某个topic是否存在
*
* @param topicName
*/
@Test
public void topicExists(String topicName) {
ZkUtils zkUtils = ZkUtils.apply("zkHost", 30000, 30000, JaasUtils.isZkSecurityEnabled());
boolean exists = AdminUtils.topicExists(zkUtils,topicName);
} /**
*
* 获取topiclist
*/
@Test
public void getTopicList() {
ZkUtils zkUtils = ZkUtils.apply("zkHost", 30000, 30000, JaasUtils.isZkSecurityEnabled());
List<String> allTopicList = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
} }