将mapwithstatedstream的默认存储模式从Memory仅限制到其他模式

时间:2021-08-23 00:19:12

I have written spark streaming application using Kfka and mapwithsta functions. I have attched a snapshot of my application for the storage level将mapwithstatedstream的默认存储模式从Memory仅限制到其他模式

我已经使用Kfka和mapwithsta函数编写了spark流应用程序。我已经为应用程序添加了存储级别的快照

As you see the Kafka stream is serilized in both memory and disk..but I cant find a way to change the default presistence of the mapwithste internal streams..this the pice of code I am using

正如你所看到的那样,Kafka流在内存和磁盘中都已经加强了..但是我找不到一种方法来改变mapwithste内部流的默认存在...这是我正在使用的代码片段

val messages=KafkaUtils.createDirectStream[String, String, (String,String)](ssc,
  kafkaParams,
  fromOffsets,
 (r:org.apache.kafka.clients.consumer.ConsumerRecord[String,String]) =>(r.topic(),r.value()))
  .persist(StorageLevel.MEMORY_AND_DISK_SER)
....
val mapped1=message.map(x=>(x._2.hashCode().toString(),x)).mapWithState(stateSpec1)

In my applications sates can become huge so I need to presiste the internal sates in emeory and disk..I would apprecite any help on this.

在我的应用程序中,状态可能变得很大,所以我需要在内存和磁盘中预先设置内部状态。我会对此有所帮助。

1 个解决方案

#1


0  

mapWithState is a distributed in-memory state store. It saves your state inside an internal structure called OpenHashMapBasedStateMap. What you're currently persisting is the KafkaRDD created by KafkaUtils.createDStream. If you're not iterating that same input twice, there's no need to persist it.

mapWithState是一个分布式内存状态存储。它将您的状态保存在名为OpenHashMapBasedStateMap的内部结构中。您目前持久存在的是KafkaUtils.createDStream创建的KafkaRDD。如果你没有两次迭代相同的输入,就没有必要坚持下去。

Remember that even if your internal state is huge, it should be evenly distributed inside your cluster. This means that you're not putting all your eggs in one basket, but spreading it throughout the cluster. If your state grows, you can always scale out your cluster with an additional node.

请记住,即使您的内部状态很大,也应该在集群内均匀分布。这意味着您不是将所有鸡蛋放在一个篮子里,而是将其分散到整个群集中。如果您的州增长,您可以随时使用其他节点扩展您的群集。

#1


0  

mapWithState is a distributed in-memory state store. It saves your state inside an internal structure called OpenHashMapBasedStateMap. What you're currently persisting is the KafkaRDD created by KafkaUtils.createDStream. If you're not iterating that same input twice, there's no need to persist it.

mapWithState是一个分布式内存状态存储。它将您的状态保存在名为OpenHashMapBasedStateMap的内部结构中。您目前持久存在的是KafkaUtils.createDStream创建的KafkaRDD。如果你没有两次迭代相同的输入,就没有必要坚持下去。

Remember that even if your internal state is huge, it should be evenly distributed inside your cluster. This means that you're not putting all your eggs in one basket, but spreading it throughout the cluster. If your state grows, you can always scale out your cluster with an additional node.

请记住,即使您的内部状态很大,也应该在集群内均匀分布。这意味着您不是将所有鸡蛋放在一个篮子里,而是将其分散到整个群集中。如果您的州增长,您可以随时使用其他节点扩展您的群集。