1、spark streaming原理
1.6.3版本streaming-programming-guide官网地址
网友的翻译1
更丰富的函数接口可查阅pyspark.streaming.api
Spark Streaming上下游如下图所示
Spark Streaming核心是通过DStream来实现相关的功能,DStream代表数据流,代表由多个rdds组成的序列。
Discretized Streams (DStreams)
DStream是Spark Streaming的基本抽象,由一个连续的rdd序列组,每一个rdd包含一段时间间隔的数据。如下图所示:
对DStream的操作,其实会转换成对一系列rdd的操作,一个时间间隔接收完一批数据,然后处理这个rdd,然后下一个时间间隔重复这个过程,依次延续下去。如下图所示。
2、初步体验Spark Streaming的用法。
下面实现的是统计对数据服务器TCP socket监听时接收到的文本内容中的字数。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc=SparkContext()
ssc=StreamingContext(sc,1)#设置批与批之间的间隔为1秒钟。
lines=ssc.socketTextStream('localhost',9999)#从本地服务器的9999端面监听数据,不同源具体的DStream也不一样。这里监听TCP socket,因而用socketTextStream()函数。
words=lines.flatMap(lambda line:line.split(' '))#将接收到的文本内容拆分成单词
pairs=words.map(lambda word:(word,1))#利用map和reduceByKey来计数
wordcount=pairs.reduceByKey(lambda x,y:x+y)
wordcount.pprint()#打印每行输入的前10字字符
ssc.start()#启动监听
ssc.awaitTermination()#一直监听等待,直至被中断
以上程序执行后就会监听相应端口信息,由于没有任何输入,此时只会打印出时间,而无内容。
下面利用netcat来模拟数据服务器。
首先安装netcat,sudo apt install netcat
.
然后启动netcat并指定端口,nc -lk 9999
此时屏幕会等待输入。键盘输入相应内容后,比如
kk lk iii si
按回车。此时程序就会监听到输入的内容,经过分析后打印在另一终端。
结果如下:
------------------------------------------- Time: 2018-02-14 00:31:52 -------------------------------------------
('kk', 1)
('', 1) ('lk', 1)
('iii', 1)
('si', 1)
3、spark streaming进一步练习
#程序完成的功能是接收一批一批的数据,然后统计每批数据中包含的单词即及数目,并打印出来。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers = '3.3.3.3:9092'#kafka生产时配置的kafka地址即可
topic=['water_kafka_test']#kafka生产时配置的topic,注意要用列表形式
kvs = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split("\t")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
结果大致如下:
-------------------------------------------
Time: 2018-02-28 10:54:22
-------------------------------------------
('cps.xxx.cn', 2)
('11642.62', 1)
('m3', 2)
('Totalxxxx', 2)
('1608220001', 2)
('WCPS.BJUT.P1000.P1101.P1212.W12120001', 2)
('20180212\n', 2)
('2017-10-23T04:07:37', 1)
('11642.52', 1)
('2017-10-23T04:15:00', 1)
-------------------------------------------
Time: 2018-02-28 10:54:24
-------------------------------------------
('cps.xxx.cn', 4)
('11642.77', 1)
('m3', 4)
('11642.97', 1)
('Totalxxxx', 4)
('1608220001', 4)
('2017-10-23T04:22:37', 1)
('2017-10-23T04:19:37', 1)
('11642.87', 1)
('11642.72', 1)
...
从相临的两个时间看,再次输出结果的时间间隔是2s这与我们在pyspark.streaming程序中设定的时间间隔是一致的。
我们在kafka生产程序中设定的每行信息的发送时间间隔是0.5s,这样在消费端2s的时间间隔内可以接收4行信息。这从(‘cps.xxx.cn’, 4)中也可以体现出来。
由于消费程序处理出结果花费的时间在0.3-0.4s,因而在2s的时间间隔内完全可以将所有的消息处理完,而不会产生信息丢失。如果监控程序处理逻辑花费的时间比较长,那么应该将sparkstreaming的信息流批与批之间的时间间隔加长,以免程序处理不过来,产生信息丢失。
4、DStreams上的转换操作
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
5、DStreams上的输出操作
DStreams上的输出操作跟rdd上的动作操作有点像,它使得DStreams的数据可以push到外部数据库和文件系统。
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging.Python API This is called pprint() in the Python API. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream’s contents as text files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”. |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream’s contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.Python API This is not available in the Python API. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream’s contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.Python API This is not available in the Python API. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
利用foreachRDD为DStream中的每个rdd设计一个处理逻辑
因为DStream的操作是比较有限的,而通过foreachRDD转换成了对每个rdd的操作,这就可以利用rdd的大量操作。而完成复杂的功能设计。
下面是官网的描述:
DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。
下面是一个错误示范:功能是通过创建连接将数据保存的外部系统。
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
上面为什么不正确呢?因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。
如下所示
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
然而上面的代码又会造成另外一个常见的错误-为每一个记录创建了一个连接对象。通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。
一个更好的解决办法是利用rdd.foreachPartition方法。 为RDD的partition创建一个连接对象,用这个连接对象发送partition中的所有记录。如下所示:
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
对于连接部分,还可以进一步优化,就是创建连接池。
可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。
如下所示:
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。
其它需要注意的地方:
(1)输出操作通过惰性执行的方式操作DStreams,正如RDD action通过惰性执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者用于输出操作dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。
(2)默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。
下面是一些补充
1:写到外部数据源,表面上看是spark去写,实际上就是jvm去操作。jvm写数据库,spark streaming就可以写数据库。jvm如果能写到Hbase或者Redius中,Spark也能。
2:spark streaming中我们使用Dstream.foreachRDD(),来把Dstream中的数据发送到外部的文件系统中,外部文件系统主要是数据库,Hbase,Redius,数据库比较少量的数据,Redius,中等规模的分布式数据。Hbase则是超大规模的数据。实际生产环境下如果你数据真的非常大,一般首先放在kafka中。然后再通过kafka给Hbase。
将数据结果通过socket写到web Server上
3:Dstream是在Driver上执行,是RDD的一种封装,实际上在Cluster上执行的是RDD,而Dstream是RDD的一种封装所以不是在Worker的Excutor上执行的,一个在Driver中,一个Excutor要用到连接器,那么要求Connector必须要序列化,但是数据库的句柄不能在socket中序列化,连接上数据库,这个数据库的句柄一般没听说过可以序列化或者反序列化。连接数据库一般在什么地方执行,在什么地方链接数据库。而不是在这里链接过数据库之后,弄一个序列化传到另外一台机器jvm中,让他反序列化继续链接,因为RDD在foreach的时候,在Executor中执行,它要这个connector的话,connection必须随着我们的任务广播到我们集群的Executor中。一广播就需要序列化和反序列化。所以connector不是可序列化的。再者connection是和网络通信有关,你怎么序列化。
4:链接上数据库的时候,如果每条数据都建立一个新的connection,内存很快就会耗光。
5:partition肯定是在一台机器上的。一台机器上可能有很多partition,我们有必要复用这个连接器connection。
6:rdd.foreachPartition()可以构建神奇的功能,就是它可以让底层的数据变化,它不会导致整个Dstream的执行,必须由foreach等action触发。spark基于rdd编程的时候,rdd数据是不变的。这是rdd的规则,但是如果你擅长使用foreachPartition的话,底层的数据可以改变,去进行计算。因为spark 的rdd是分布式函数编程。有时候底层的数据改变是一种业务的需要。foreachPartition是通过操作一个具体的数据结构来实现的。正常认为的rdd中是一条一条的数据,这一条条的数据从rdd的角度看,不变就不变。但一条条记录里面的内容是可以变得。即spark可以运行在动态数据源上。或者数组的内容不变,指向的索引可以变。
7:Dstream.foreachRDD();和transform一样,可以直接基于RDD编程。这里主要是写入了数据库中。
下面利用foreachRDD完成一个处理逻辑
6、Dstream窗口操作
Spark Streaming的Window Operation可以理解为定时的进行一定时间段内的数据的处理。如下图所示:
1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。
2.这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。
所以基于窗口的操作,需要指定2个参数:
window length - The duration of the window (3 in the figure)
slide interval - The interval at which the window-based operation is performed (2 in the figure).
第一个时间称为窗口长度,第二个时间称为滑动长度
下面的函数可以帮助实现一些丰富的时间窗口操作。
groupByKeyAndWindow(windowDuration, slideDuration, numPartitions=None)[source]
mapValues(f)
Return a new DStream by applying groupByKey over a sliding window. Similar to DStream.groupByKey(), but applies it over a sliding window.
Parameters:
windowDuration – width of the window; must be a multiple of this DStream’s batching interval
slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
numPartitions – Number of partitions of each RDD in the new DStream.
7、共享变量
共享变量很好的资料
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上 的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)
RDD缓存策略
Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现: rddFromTextFile.cache
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。
广播变量(broadcast variable)
广播变量用来把变量在所有节点的内存之间进行共享,这样的方式尤其是在分布式集群中进行并行计算提供了很大的便利,如果数据集很大,需要分布式存储到各个DataNode上,根据“计算向数据靠近”的原则,将每一个DataNode上都要使用的变量(类似全局变量)进行广播,而不是在每一个DataNode上产生一个副本,比如利用sc.broadcast将聚类中心设置为一个只读变量,并广播给每一个集群中的机器进行共享相同的聚类中心。
广播变量(broadcast variables)允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。
可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:
from pyspark import SparkContext
from pyspark import SparkConf
factor =2
sc = SparkContext()
brodacastvalue = sc.broadcast(factor)
list = [1, 2, 3, 4, 5]
listRdd = sc.parallelize(list)
listmap = listRdd.map(lambda s: s * brodacastvalue.value)
print listmap.collect()
这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量brodacastvalue 的值,而不是使用factor 的值,这样就不会把factor 重复分发到这些节点上。此外,一旦广播变量创建后,普通变量factor 的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。
广播可以将变量发送到闭包中,被闭包使用。但是,广播还有一个作用是同步较大数据。比如你有一个IP库,可能有几G,在map操作中,依赖这个ip库。那么,可以通过广播将这个ip库传到闭包中,被并行的任务应用。广播通过两个方面提高数据共享效率:
1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;
2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。
它由运行SparkContext的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:
broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))
16/06/26 21:04:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 296.0 B, free 296.0 B)
16/06/26 21:04:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 110.0 B, free 406.0 B)
16/06/26 21:04:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36878 (size: 110.0 B, free: 517.4 MB)
16/06/26 21:04:50 INFO SparkContext: Created broadcast 0 from broadcast at PythonRDD.scala:430
#终端的输出表明,广播变量存储在内存中,占用的空间大概是110字节,仍余下517MB可用空间。
广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value方法:
sc.parallelize(list(["1", "2", "3"])).map(lambda x: broadcastAList.value).collect()
#上述一行代码会从{"1", "2", "3"}这个集合(一个Scala List)里,新建一个带有三条记录的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。
注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以Scala(Python或Java)集合的形式返回驱动程序。collect函数一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。
***没有action的话,广播并不会发出去!
***使用broadcast广播黑名单到每个Executor中!
累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于Python还不支持)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
补充:
除了上面提到的两种外,还有一个闭包的概念,这里补充下
闭包与广播变量对比
有两种方式将数据从driver节点发送到worker节点:通过 闭包 和通过 广播变量 。闭包是随着task的组装和分发自动进行的,而广播变量则是需要程序猿手动操作的,具体地可以通过如下方式操作广播变量(假设 sc 为 SparkContext 类型的对象, bc 为 Broadcast 类型的对象):
可通过 sc.broadcast(xxx) 创建广播变量。
可在各计算节点中(闭包代码中)通过 bc.value 来引用广播的数据。
bc.unpersist() 可将各executor中缓存的广播变量删除,后续再使用时数据将被重新发送。
bc.destroy() 可将广播变量的数据和元数据一同销毁,销毁之后就不能再使用了。
任务闭包包含了任务所需要的代码和数据,如果一个executor数量小于RDD partition的数量,那么每个executor就会得到多个同样的任务闭包,这通常是低效的。而广播变量则只会将数据发送到每个executor一次,并且可以在多个计算操作*享该广播变量,而且广播变量使用了类似于p2p形式的非常高效的广播算法,大大提高了效率。另外,广播变量由spark存储管理模块进行管理,并以MEMORY_AND_DISK级别进行持久化存储。
什么时候用闭包自动分发数据?情况有几种:
数据比较小的时候。
数据已在driver程序中可用。典型用例是常量或者配置参数。
什么时候用广播变量分发数据?
情况有几种:
数据比较大的时候(实际上,spark支持非常大的广播变量,甚至广播变量中的元素数超过java/scala中Array的最大长度限制(2G,约21.5亿)都是可以的)。
数据是某种分布式计算结果。典型用例是训练模型等中间计算结果。
当数据或者变量很小的时候,我们可以在Spark程序中直接使用它们,而无需使用广播变量。
对于大的广播变量,序列化优化可以大大提高网络传输效率,参见本文序列化优化部分。
8、kafka结合spark 错误整理
1、
问题:
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
原因:
其实这个错误就是编译的程序的scala版本和提交时的版本不一致导致的.
For the Scala API, Spark 1.6.3 uses Scala 2.10. You will need to use a compatible Scala version (2.10.x).
spark1.6.3(当前版本)还不支持2.11.x , 其中很多的插件还不兼容
spark就默认是用了 bin/下自带的scala版本
解决方法(已验证):
通过外部将相应包提交上去。即在spark-submit时加入参数--jars spark-streaming-kafka-assembly_2.10-1.6.3.jar
。由于只写了相对路径,因而在submit时要先进入spark-streaming-kafka-assembly_2.10-1.6.3.jar所在的目录,否则会上传不了这个包。