
时间:2022-06-02 07:38:27

I'm new to Scala and Spark. I'm trying to return multiple key-value pairs during the map transformation. My input data is a simple CSV file.


1, 2, 3
4, 5, 6
7, 8, 9

My Scala script looks like the following.


class Key(_i:Integer, _j:Integer) {
 def i = _i
 def j = _j
class Val(_x:Double, _y:Double) {
 def x = _x
 def y = _y
val arr = "1,2,3".split(",")
for(i <- 0 until arr.length) {
 val x = arr(i).toDouble
 for(j <- 0 until arr.length) {
  val y = arr(j).toDouble
  val k = new Key(i, j)
  val v = new Val(x, y)
  //note that i want to return the tuples, (k, v)

I want to be able to use the for loop and data structures above to return multiple tuples (k, v). Something similar to the code below.

我希望能够使用上面的for循环和数据结构来返回多个元组(k, v),类似于下面的代码。

val file = sc.textFile("/path/to/test.csv")
file.map(line => {
 val arr = line.split(",")
 for(i <- 0 until arr.length) {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) {
   val y = arr(j).toDouble
   val k = new Index(i,j)
   val v = new Val(x,y)
}).collect //reduceByKey is not there, reduce is there, but not what i want

When I copy/paste the code above into the lambda expression (and run on the Scala REPL shell) I get the following error:

当我将上述代码复制/粘贴到lambda表达式(并在Scala REPL shell上运行)时,我得到了以下错误:

error: illegal start of simple expression
val arr = line.split(",")

I realize also that I am still stuck in imperative/procedural style programming thinking, so please bear with me (and a newbie at Scala/Spark).


2 个解决方案



You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).


file.map(line => {
    //multiple lines of code here

Full answer after edits:


case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
val arr = line.split(",")
 val doubleSeq = for(i <- 0 until arr.length) yield {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) yield {
   val y = arr(j).toDouble
   val k = Index(i,j)
   val v = Val(x,y)

There were a multitude of problems actually:


  • Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement Serializable
  • 注意,我将类更改为case类,因为它们是可序列化的。否则,您将需要实现Serializable
  • I changed map to flatMap, as well as flattened your array as one flatMap would still leave you with an inner array. Now, the combination of the two will yield you your RDD[(Index, Val)], which can now be implicitly used with reduceByKey
  • 我把map改成了flatMap,并且把你的数组变平了,因为一个flatMap仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地使用reduceByKey
  • I turned your for loop into a for comprehension by using yield. You were getting a final type of Unit because the return type of a for loop is Unit
  • 我用屈服把你的for循环变成了a for comprehension。你得到的是最终类型的单位因为for循环的返回类型是单位



Use RDD.flatMap and yield a list from the for loop:


val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
  val arr = line.split(",")
  for {
    i <- 0 until arr.length
    j <- (i + 1) until arr.length
  } yield {
    val x = arr(i).toDouble
    val y = arr(j).toDouble
    val k = new Index(i, j)
    val v = new Val(x, y)
    (k, v)



You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).


file.map(line => {
    //multiple lines of code here

Full answer after edits:


case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
val arr = line.split(",")
 val doubleSeq = for(i <- 0 until arr.length) yield {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) yield {
   val y = arr(j).toDouble
   val k = Index(i,j)
   val v = Val(x,y)

There were a multitude of problems actually:


  • Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement Serializable
  • 注意,我将类更改为case类,因为它们是可序列化的。否则,您将需要实现Serializable
  • I changed map to flatMap, as well as flattened your array as one flatMap would still leave you with an inner array. Now, the combination of the two will yield you your RDD[(Index, Val)], which can now be implicitly used with reduceByKey
  • 我把map改成了flatMap,并且把你的数组变平了,因为一个flatMap仍然会给你留下一个内部数组。现在,两者的结合将产生您的RDD[(Index, Val)],现在可以隐式地使用reduceByKey
  • I turned your for loop into a for comprehension by using yield. You were getting a final type of Unit because the return type of a for loop is Unit
  • 我用屈服把你的for循环变成了a for comprehension。你得到的是最终类型的单位因为for循环的返回类型是单位



Use RDD.flatMap and yield a list from the for loop:


val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
  val arr = line.split(",")
  for {
    i <- 0 until arr.length
    j <- (i + 1) until arr.length
  } yield {
    val x = arr(i).toDouble
    val y = arr(j).toDouble
    val k = new Index(i, j)
    val v = new Val(x, y)
    (k, v)