java自定义对象发送kafka

时间:2022-08-26 16:06:44

写了一个小的项目,自定义java对象,发送到kafka

kafka安装搭建这里就不在描述了,解压简单配置即可

直接进入正题吧

一. 自定义java对象,并实现序列化,省略get,set方法

public class Document implements Serializable {
private String title;
private String content;
private String id;
private String date;
private long updatetime;

public byte[] toBytes(){
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(bo);
oos.writeObject(this);
oos.flush();
oos.close();
bo.close();
} catch (IOException e) {
e.printStackTrace();
}
return bo.toByteArray();
}

public Document toDocument(byte[] bytes){
Document document = null;
try {
ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
ObjectInputStream ois = new ObjectInputStream (bis);
document = (Document) ois.readObject();
ois.close();
bis.close();
} catch (IOException ex) {
ex.printStackTrace();
} catch (ClassNotFoundException ex) {
ex.printStackTrace();
}
return document;
}
}

二. 自定义Encoder

import java.util.Map;

import org.apache.kafka.common.serialization.Serializer;

import com.thunisoft.data.domain.Document;

public class DocumentEncoder implements Serializer<Document> {

@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String s, Document document) {
return document.toBytes();
}

@Override
public void close() {

}
}

三. producer实现

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;

import com.thunisoft.data.domain.Document;
import com.thunisoft.data.fy.api.kafka.DocumentProducer;
import com.thunisoft.data.fy.api.kafka.domain.DocumentEncoder;
import com.thunisoft.data.fy.constant.Constants;

public class DocumentProducer {
private static Properties props;
private static KafkaProducer<String, Document> producer;

static {
if (props == null){
props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_METDATA_BROKERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DocumentEncoder.class.getName());
//自定义分区
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DocumentPartitioner.class.getName());
}
producer = new KafkaProducer<String, Document>(props);
}

public void produce(Document document) throws IOException {
producer.send(new ProducerRecord<String, Document>(Constants.TOPIC, document));
}
}

四. 自定义Decoder

import com.bigdata.frame.data.Document;
import kafka.serializer.Decoder;

public class DocumentDecoder implements Decoder<Document>{

@Override
public Document fromBytes(byte[] bytes) {
Document document = new Document();
return document.toDocument(bytes);
}
}

五. 编写Consumer

import com.bigdata.frame.constant.Constants;
import com.bigdata.frame.kafka.DocumentConsumer;
import com.bigdata.frame.kafka.domain.DocumentDecoder;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Properties;

public class DocumentConsumer {
private static Properties props;
private static ConsumerConnector consumer;
static{
if(props == null){
props = new Properties();
//zookeeper 配置
props.put("zookeeper.connect", Constants.ZOOKEERER_CONNECT);

//group 代表一个消费组
props.put("group.id", Constants.KAFKA_GROUP_ID);

//指定客户端连接zookeeper的最大超时时间
props.put("zookeeper.connection.timeout.ms", Constants.ZOOLEEPER_CONNECT_SESSION_TOMEOUT);
//rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
// 连接zk的session超时时间
props.put("zookeeper.session.timeout.ms", Constants.ZOOKEEPER_SESSION_TIMEOUT);
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");

props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "1200");
//序列化类
props.put("serializer.class", DocumentDecoder.class.getName());
}
}

public ConsumerConnector getConsumer(){
if(consumer == null){
consumer = consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
}
return consumer;
}

}


六. 也可自定义partitioner规则

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class DocumentPartitioner implements Partitioner {

@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}


七, 测试代码

public static void  main(String[] args){
DocumentProducer producer = new DocumentProducer();
Document documentnt = new Document();
documentnt.setTitle("测试");
documentnt.setContent("这是Producer测试");
producer.produce(documentnt);
}

public static void  main(String[] args) {        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();        topicCountMap.put(topic, 1);        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());        DocumentDecoder valueDecoder = new DocumentDecoder();        Map<String, List<KafkaStream<String, Document>>> consumerMap =  consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);        KafkaStream<String, Document> stream = consumerMap.get(topic).get(0);        ConsumerIterator<String, Document> it = stream.iterator();        while (it.hasNext()) {            Document document = it.next().message();	    System.out.println(document.toString());        }    }