Kakfa重连测试

时间:2022-12-14 21:50:38

在Kafak已启动的情况下:

发送端首次连接大概耗时400毫秒。后续消息发送都在1毫秒以下。

接收端首次连接大概耗时400-7000毫秒。后续消息接收都在1毫秒以下。(具体时间与topic中存留的消息量有关)

但在使用Kafka时,会遇到Kafka重启。或者启用应用时Kafak还没有启动的情况,针对于各种情况进行测试。

测试消息发起端

Properties props = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
producer.send(new ProducerRecord<String, String>(topic, message));

1.发起端先启动,Kafak后启动

创建producer不会进行连接,直接进入消息发送,耗时大概300-400毫秒。

发送端连接不上,60秒后方法会返回,但不报错。

可通过MAX_BLOCK_MS_CONFIG参数,调整超时时间,单位是毫秒。

properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

 如果需要发送失败的异常,则需要在发送方法后,增加get

producer.send(new ProducerRecord<String, String>(topic, message)).get();

2.发起端保持启动,Kafak启动,或重启

发起端在Kafak启动完成后,会自动进行连接。无需人工干预。

测试消息接收端

Properties props = new Properties();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakUrl);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
while(true){ ConsumerRecords<String, String> records = consumer.poll(1);
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}

Kafak接收端会忽略一些早期的消息,有时候会出现前N条丢失的情况,如果需要保证之前的消息都接收,需要增加参数

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

1.接收端先启动,Kafak后启动

接收端在Kafak启动完成后,会自动进行连接。无需人工干预。

2.接收端保持启动,Kafak启动,或重启

接收端在Kafak启动完成后,会自动进行连接。无需人工干预。