kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

时间:2021-01-04 14:35:02

注意: 本文不谈废话,低级问题请自行检查。

我使用Java版本的Kafka Producer生产数据,但是抛出了这个异常。百思不得其解,明明防火墙配置,ZooKeeper,Kafka配置都是没问题的啊。

困扰了我一天,最终发现这样一个问题:  kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

Kafka的server.properties文件中IP不能写主机名,必须写IP地址而不能写映射后的主机名.

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

如果你在这写的是hostname,例如bigdata:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

跑一个Producer程序,你就会喜提Exception:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

但是如果改成IP地址:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

程序就能正常运行:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 最无语的配置

我用的版本是Kafka 0.8 ,果然版本低还是BUG太多。浪费了不少时间。

我的生产者代码:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; import java.util.Date;
import java.util.Properties;
import java.util.Random; public class TestProducer {
public static void main(String[] args) {
long events = 10;
Random rnd = new Random(); Properties props = new Properties();
props.put("metadata.broker.list", "192.168.29.132:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "SimplePartitioner");
props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("advClickStreamTopic", ip, msg);
producer.send(data);
}
producer.close();
}
}
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
} }

我在JIRA上并没找到相关BUG。....  只能说有点坑

版本信息:

     <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.9.2-RC3</version>
</dependency> <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>