spark中的并行方法调用以及传递方法中使用spark会话

时间:2021-10-15 00:41:10

Let me first inform all of you that I am very new to Spark.

让我先告诉大家我对Spark很新。

I need to process huge number of records in a table and when it is grouped by email it is around 1 million.I need to perform multiple logical calculation based on the data set against individual email and update the database based on the logical calculation

我需要在表格中处理大量记录,当它通过电子邮件分组时大约是100万。我需要根据针对单个电子邮件的数据集执行多个逻辑计算,并根据逻辑计算更新数据库

Roughly my code structure is like

我的代码结构大致相似

Initial Data Load ...

初始数据加载......

import sparkSession.implicits._ var tableData = sparkSession.read.jdbc(, , connectionProperties).select("email").where()

import sparkSession.implicits._ var tableData = sparkSession.read.jdbc(,, connectionProperties).select(“email”)。where()

//Data Frame with Records with grouping on email count greater than one

//带有记录的数据框,其中电子邮件数量大于1

var recordsGroupedBy =tableData.groupBy("email").count().withColumnRenamed("count", "recordcount").filter("recordcount > 1 ").toDF()

var recordsGroupedBy = tableData.groupBy(“email”)。count()。withColumnRenamed(“count”,“recordcount”)。filter(“recordcount> 1”)。toDF()

Now comes the processing after grouping against email using processDataAgainstEmail() method

现在是使用processDataAgainstEmail()方法对电子邮件进行分组后的处理

recordsGroupedBy.collect().foreach(x=>processDataAgainstEmail(x.getAs("email"),sparkSession))

Here I see foreach is not parallelly executed .I need to invoke the method processDataAgainstEmail(,) in parallel. But if I try to parallelize by doing

在这里,我看到foreach没有并行执行。我需要并行调用processDataAgainstEmail(,)方法。但是,如果我尝试通过做并行化

Hi I can get a list by invoking

嗨,我可以通过调用获得一个列表

val emailList =dataFrameWithGroupedByMultipleRecords.select("email").rdd.map(r => r(0).asInstanceOf[String]).collect().toList

val emailList = dataFrameWithGroupedByMultipleRecords.select(“email”)。rdd.map(r => r(0).asInstanceOf [String])。collect()。toList

var rdd = sc.parallelize(emailList )

var rdd = sc.parallelize(emailList)

rdd.foreach(x => processDataAgainstEmail(x.getAs("email"),sparkSession))

rdd.foreach(x => processDataAgainstEmail(x.getAs(“email”),sparkSession))

This is not supported as I can not pass sparkSession when using parallelize .

这不受支持,因为我在使用parallelize时无法传递sparkSession。

Can anybody help me on this as in processDataAgainstEmail(,) multiple operations would be performed related to database insert and update and also spark dataframe and spark sql operations needs to be performed.

任何人都可以帮助我,因为在processDataAgainstEmail(,)中将执行与数据库插入和更新相关的多个操作,并且还需要执行spark数据帧和spark sql操作。

To summerize I need to invoke parallelly processDataAgainstEmail(,) with sparksession

为了总结,我需要使用sparksession并行调用processDataAgainstEmail(,)

In case it is not all possible to pass spark session ,the method won't be able to perform anything on database .I am not sure what would be the alternate way as parallelism on email is must for my scenario.

如果不是所有可能通过spark会话,该方法将无法在数据库上执行任何操作。我不确定什么是替代方式,因为电子邮件的并行性是我的方案必须的。

1 个解决方案

#1


0  

The forEach is the method the list that operates on each element of the list sequentially, so you are acting on it one at a time, and passing that to processDataAgainstEmail method.

forEach是按顺序对列表的每个元素进行操作的列表的方法,因此您一次一个地对其执行操作,并将其传递给processDataAgainstEmail方法。

Once you have gotten the resultant list, you then invoke the sc.parallelize on to parallelize the creation of the dataframe from the list of records you created/manipulated in the previous step. The parallelization, as I can see in the pySpark, is the property of creating of the dataframe, not acting the result of any operation.

获得结果列表后,然后调用sc.parallelize on以从您在上一步中创建/操作的记录列表中并行创建数据帧。正如我在pySpark中看到的那样,并行化是创建数据帧的属性,而不是任何操作的结果。

#1


0  

The forEach is the method the list that operates on each element of the list sequentially, so you are acting on it one at a time, and passing that to processDataAgainstEmail method.

forEach是按顺序对列表的每个元素进行操作的列表的方法,因此您一次一个地对其执行操作,并将其传递给processDataAgainstEmail方法。

Once you have gotten the resultant list, you then invoke the sc.parallelize on to parallelize the creation of the dataframe from the list of records you created/manipulated in the previous step. The parallelization, as I can see in the pySpark, is the property of creating of the dataframe, not acting the result of any operation.

获得结果列表后,然后调用sc.parallelize on以从您在上一步中创建/操作的记录列表中并行创建数据帧。正如我在pySpark中看到的那样,并行化是创建数据帧的属性,而不是任何操作的结果。