sparkstreaming中kafka的offset提交

时间:2024-03-13 16:00:16

就kafka而言,offset提交方式有两种,自动和手动。

将enable.auto.commit设置为true,即可自动提交

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

或者采用commitAsync来自动提交。


sparkstreaming消费kafka数据,提交方式也是分为自动和手动两种。貌似是一样的,但细节上有所不同。

kafka首次启动的时候,一般会出现下面的情况。因为有数据挤压,所以会有很多在queued状态的batch。

如果数据量特别大,可能会出问题,因此参数spark.streaming.kafka.maxRatePerPartition就尤为重要,他设置sparkstreaming没秒每个分区最大的消费数量,使得挤压的数据可以慢慢消费。

sparkstreaming中kafka的offset提交

此时如果参数enable.auto.commit设置为false,并且代码端也不手动提交,通过日志你会发现每个batch可以正常消费,但是服务器上查看kafka消费情况,却是保持没变,是不是很诡异。当你把任务重启之后,会发下再次从最初的位置开始消费,也就是上次运行完全没有任何效果。

因此可以看出sparkstreaming在消费kafka的时候,自己内部保存了一组offset。它只在第一次消费的时候从kafka取offset,然后会一直按照自己内部存储这个offset来消费数据,但是不会把这个数据提交给任何地方(kafka或zookeeper)。因此,当任务重启后,还是会从最初的地方开始消费,因为上次任务的消费没提交,kafka内部的offset没更新。

所以,除非你的streaming程序永远不停,否则最好手动提交offset。