kafka入门2:java 创建及删除 topic

时间:2023-03-09 06:27:18
kafka入门2:java 创建及删除 topic

1.pom

   <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.1</version>
  </dependency>

2.KafkaTopicBean

public class KafkaTopicBean {

    private String topicName;       // topic 名称
private Integer partition; // partition 分区数量
private Integer replication; // replication 副本数量
private String descrbe; public String getTopicName() {
return topicName;
} public void setTopicName(String topicName) {
this.topicName = topicName;
} public Integer getPartition() {
return partition;
} public void setPartition(Integer partition) {
this.partition = partition;
} public Integer getReplication() {
return replication;
} public void setReplication(Integer replication) {
this.replication = replication;
} public String getDescrbe() {
return descrbe;
} public void setDescrbe(String descrbe) {
this.descrbe = descrbe;
} @Override
public String toString() {
return "KafkaTopicBean [topicName=" + topicName + ", partition=" + partition
+ ", replication=" + replication + ", descrbe=" + descrbe +"]";
} }

3.KafkaUtil

import java.util.Properties;
import org.apache.kafka.common.security.JaasUtils;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils; public class KafkaUtil { public static void createKafaTopic(String ZkStr,KafkaTopicBean topic) {
ZkUtils zkUtils = ZkUtils.
apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); AdminUtils.createTopic(zkUtils, topic.getTopicName(), topic.getPartition(),
topic.getReplication(), new Properties(), new RackAwareMode.Enforced$());
zkUtils.close();
} public static void deleteKafaTopic(String ZkStr,KafkaTopicBean topic) {
ZkUtils zkUtils = ZkUtils.
apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); AdminUtils.deleteTopic(zkUtils, topic.getTopicName());
zkUtils.close();
} }

4.调用方式

    public static void main(String[] args) {

        //zookeeper地址:端口号
String ZkStr = "912.168.0.1:2181"; //topic对象
KafkaTopicBean topic = new KafkaTopicBean();
topic.setTopicName("testTopic"); //topic名称
topic.setPartition(1); //分区数量设置为1
topic.setReplication(1);  //副本数量设置为1 //创建topic
KafkaUtil.createKafaTopic(ZkStr,topic);
//删除topic
KafkaUtil.deleteKafaTopic(ZkStr,topic); }