从0.7升级到0.8.1.1之后,产生到嵌入kafka队列的错误

时间:2021-07-04 20:21:23

I haven't been able to find anything that directly handled the problem I'm facing, so I'm posting here. I have JUnit/JBehave tests that spin up an embedded ZooKeeper server, embedded Kafka server, and kafka producers and consumers.

我找不到任何直接解决我所面临问题的方法,所以我在这里发布。我有JUnit/JBehave测试,它们派生出嵌入式ZooKeeper服务器、嵌入式Kafka服务器、Kafka生产者和消费者。

After upgrading kafka from 0.7 to 0.8.1.1, I'm encountering the following types of errors:

在将kafka从0.7升级到0.8.1.1之后,我遇到了以下类型的错误:

ERROR [kafka-request-handler-5] state.change.logger - Error on broker 1 while processing LeaderAndIsr request correlationId 7 received from controller 1 epoch 1 for partition [topicName,8]
java.lang.NullPointerException: null
at kafka.log.Log.<init>(Log.scala:60) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.log.LogManager.createLog(LogManager.scala:265) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:90) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.cluster.Partition$$anonfun$makeLeader$2.apply(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) ~[scala-library-2.10.4.jar:na]
at kafka.cluster.Partition.makeLeader(Partition.scala:175) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:305) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:304) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) ~[scala-library-2.10.4.jar:na]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) ~[scala-library-2.10.4.jar:na]
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:304) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:258) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:217) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaApis.handle(KafkaApis.scala:189) [kafka_2.10-0.8.1.1.jar:na]
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) [kafka_2.10-0.8.1.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]

and

WARN  [threadName] k.c.ConsumerFetcherManager$LeaderFinderThread - [threadName], Failed to add leader for partitions [topicName,9],[topicName,3],[topicName,0],[topicName,8],[topicName,5],[topicName,1],[topicName,6],[topicName,2],[topicName,7],[topicName,4]; will retry
kafka.common.NotLeaderForPartitionException: null
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_25]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_25]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_25]
at java.lang.reflect.Constructor.newInstance(Constructor.java:408) ~[na:1.8.0_25]
at java.lang.Class.newInstance(Class.java:438) ~[na:1.8.0_25]
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na]
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[scala-library-2.10.4.jar:na]
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) ~[kafka_2.10-0.8.1.1.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [kafka_2.10-0.8.1.1.jar:na]

and

02/03 10:26:34.655 WARN  [kafka-request-handler-7] kafka.server.KafkaApis - [KafkaApi-1] Offset request with correlation id 0 from client clientName on partition [topicName,5] failed due to Leader not local for partition [topicName,5] on broker 1

1 个解决方案

#1


1  

It turns out this has to do with the Time parameter in the new KafkaServer constructor.

事实证明,这与新的KafkaServer构造函数中的时间参数有关。

I was passing in a null param for the kafka.utils.Time object:

我为《卡夫卡》传递了一个零参数。时间对象:

private KafkaServer server = new KafkaServer(config, null);

Instead, you need to create an implementation of the kafka.utils.Time interface, and pass in a new instance of that:

相反,您需要创建kafko .utils的实现。时间接口,并通过一个新的实例:

private KafkaServer server = new KafkaServer(config, new SystemTime());

private static class SystemTime implements Time {

    @Override
    public long milliseconds() {
        return System.currentTimeMillis();
    }

    @Override
    public long nanoseconds() {
        return System.nanoTime();
    }

    @Override
    public void sleep(long arg0) {
        try {
            Thread.sleep(arg0);
        } catch (InterruptedException e) {
            log.error("Kafka systemtime interrupted",e);
        }
    }

}

#1


1  

It turns out this has to do with the Time parameter in the new KafkaServer constructor.

事实证明,这与新的KafkaServer构造函数中的时间参数有关。

I was passing in a null param for the kafka.utils.Time object:

我为《卡夫卡》传递了一个零参数。时间对象:

private KafkaServer server = new KafkaServer(config, null);

Instead, you need to create an implementation of the kafka.utils.Time interface, and pass in a new instance of that:

相反,您需要创建kafko .utils的实现。时间接口,并通过一个新的实例:

private KafkaServer server = new KafkaServer(config, new SystemTime());

private static class SystemTime implements Time {

    @Override
    public long milliseconds() {
        return System.currentTimeMillis();
    }

    @Override
    public long nanoseconds() {
        return System.nanoTime();
    }

    @Override
    public void sleep(long arg0) {
        try {
            Thread.sleep(arg0);
        } catch (InterruptedException e) {
            log.error("Kafka systemtime interrupted",e);
        }
    }

}