手动初始化kafka consumer、producer

时间:2020-12-07 19:46:56

只是简略的记录如何手动初始化Kafka消费者、生产者

1.Consumer相关配置

Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, xxx);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, xxx);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, xxx);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, xxx);
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, xxx);
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, xxx);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, xxx);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, xxx);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, xxx);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, xxx);
properties.put(SPECIFIC_AVRO_READER_CONFIG, xxx);

2.新建Kafka Consumer实例

consumer = new KafkaConsumer<>(这里使用上面的配置)

3.订阅topic

consumer.subscribe(xxxTopics);

1.Producer相关配置

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, xxx);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, xxx);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, xxx);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, xxx);

2.新建生产者实例

producer = new KafkaProducer<>(使用上面的配置);

3.产生内容到kafka

producer.send(new ProducerRecord<>(WhichTopic, Key, Value));