1.pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
2.KafkaTopicBean
public class KafkaTopicBean {
private String topicName; // topic 名称
private Integer partition; // partition 分区数量
private Integer replication; // replication 副本数量
private String descrbe;
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public Integer getPartition() {
return partition;
}
public void setPartition(Integer partition) {
this.partition = partition;
}
public Integer getReplication() {
return replication;
}
public void setReplication(Integer replication) {
this.replication = replication;
}
public String getDescrbe() {
return descrbe;
}
public void setDescrbe(String descrbe) {
this.descrbe = descrbe;
}
@Override
public String toString() {
return "KafkaTopicBean [topicName=" + topicName + ", partition=" + partition
+ ", replication=" + replication + ", descrbe=" + descrbe +"]";
}
}
3.KafkaUtil
import java.util.Properties;
import org.apache.kafka.common.security.JaasUtils;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
public class KafkaUtil {
public static void createKafaTopic(String ZkStr,KafkaTopicBean topic) {
ZkUtils zkUtils = ZkUtils.
apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled());
AdminUtils.createTopic(zkUtils, topic.getTopicName(), topic.getPartition(),
topic.getReplication(), new Properties(), new RackAwareMode.Enforced$());
zkUtils.close();
}
public static void deleteKafaTopic(String ZkStr,KafkaTopicBean topic) {
ZkUtils zkUtils = ZkUtils.
apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled());
AdminUtils.deleteTopic(zkUtils, topic.getTopicName());
zkUtils.close();
}
}
4.调用方式
public static void main(String[] args) {
//zookeeper地址:端口号
String ZkStr = "912.168.0.1:2181";
//topic对象
KafkaTopicBean topic = new KafkaTopicBean();
topic.setTopicName("testTopic"); //topic名称
topic.setPartition(1); //分区数量设置为1
topic.setReplication(1); //副本数量设置为1
//创建topic
KafkaUtil.createKafaTopic(ZkStr,topic);
//删除topic
KafkaUtil.deleteKafaTopic(ZkStr,topic);
}