Spark Shell Examples

时间:2023-03-09 08:35:12
Spark Shell Examples

Spark Shell

Example 1 - Process Data from List:

scala> val pairs = sc.parallelize( List(
("This", 2),
("is", 3),
("Spark", 5),
("is", 3)
) )
...
scala> pairs.collect().foreach(println)
(This,2)
(is,3)
(Spark,5)
(is,3)
// Reduce Pairs by Keys:
scala> val pair1 = pairs.reduceByKey((x,y) => x+y, 4)
...
scala> pair1.collect.foreach(println)
(Spark,5)
(is,6)
(This,2)
// Decrease values by 1:
scala> val pair2 = pairs.mapValues( x=>x-1 )
scala> pair2.collect.foreach(println)
(This,1)
(is,2)
(Spark,4)
(is,2)
// Group Values by Keys:
scala> pairs.groupByKey.collect().foreach(println)
(Spark,CompactBuffer(5))
(is,CompactBuffer(3, 3))
(This,CompactBuffer(2))

Example 2 - Process Data from Local Text File

// Create an RDD from local test file:
scala> val testFile = sc.textFile("File:///home/PATH_TO_SPARK_HOME/README.MD")

RDD transformation and action can now be applied on the textFile

// This will display the number of lines in this textFile:
scala> textFile.count()
// or simply:
scala> textFile.count
// Note: if no argument, no parenthesis needed
// This will display the first line:
scala> textFile.first
// Filter lines containing "Spark":
scala> val linesWithSpark = textFile.filter (
line => line.contains("Spark")
)
// or simply:
scala> val linesWithSpark = textFile.filter(_.contains ("Spark"))
// Note: underscore "_" means every element in textFile
// Collect the content of linesWithSpark:
scala> linesWithSpark.collect ()
// Print lines of content of linesWithSpark:
scala> linesWithSpark.foreach (println)
// Map each line to #terms in it:
scala> numOfTermsPerLine = textFile.map ( line => line.split(" ").size ) // or simply:
scala> numOfTermsPerLine = textFile.map ( _.split(" ").size )
// Aggregate the numOfTermsPerLine to the max #terms:
scala> numOfTermsPerLine.reduce ( (a, b) => if (a>b) a else b ) // or use package Math.max:
scala> import java.lang.Math
scala> numOfTermsPerLine.reduce ( (a, b) => Math.max(a, b))
// Convert RDD textFile to an 1-D array of terms:
scala> val terms = textFile.flatMap ( _.split(" ") ) // Convert RDD textFile to an 2-D array of lines of terms:
scala> val terms_ = textFile.map ( _.split(" ") )
// Calculate the vocabulary size in textFile:
scala> terms.distinct().count() // or simply:
scala> terms.distinct.count
// Find longest line together with the length in textFile:
scala> val lineLengthPair = textFile.map (
line => (line, line.length) )
scala> val lineWithMaxLength = lineLengthPair.reduce (
(pair1, pair2) => if pair1._2 >= pair2._2 pair1 else pair2 ) // alternatively, in a concise way:
scala> val lineWithMaxLength = textfile.map (
line => (line, line.length) ).reduce (
(pair1, pair2) => if (pair1._2 >= pair2._2) pair1 else pair2 )
// Find out all lines with "Spark" along with line number (start with 0)
// and output with format <line_no: line_content>
scala> val lineIndexPair = textFile.zipWithIndex()
scala> val lineIndexPairWithSpark = lineIndexPair.filter (
_._1.contains("Spark"))
scala> lineIndexPairWithSpark.foreach (
pair => println ( pair._2 + ": " + pair._1 ) // alternatively, in a concise way:
scala> textFile.zipWithIndex().filter (
_._1.contains("Spark")).foreach (
pair => println(pair._2 + ": ", pair._1) )

Example 3 - Process Data from Local CSV file

Download CSV file by

wget --content-disposition https://webcms3.cse.unsw.edu.au/files/cc5bb4af124130f899cddad80af071f1ad478c3c8eb7440433291459bb603ff1/attachment

Define a name-field mapping for the CSV file

scala> val aucid 		= 0
scala> val bid = 1
scala> val bidtime = 2
scala> val bidder = 3
scala> val bidderrate = 4
scala> val openbid = 5
scala> val price = 6
scala> val itemtype = 7
scala> val dtl = 8
// Create an RDD as a 2-D array from CSV file:
scala> val auctionRDD = sc.textFile("file:///home/PATH-TO-CSV-FILE/auction.csv")
.map ( _.split(",") )
// Count total number of item types in the auction:
scala> auctionRDD.map ( _(itemtype).distinct.count ) // itemtype was previously defined as 7 to index 8th column
// Count total number of bids per itemtype:
scala> auctionRDD.map ( line => ( line(itemtype), 1 )
.reduceByKey ( _ + _ , 4)
.foreach( pair => println (pair._1 + "," + pair._2)
// Find maximum number of bids for each auction
scala> auctionRDD.map ( line => ( line(aucid), 1 ) )
.reduceByKey ( _ + _ , 4)
.reduce ( (pair1, pair2) => if ( pair1._2 >= pair2._2 ) pair1 else pair2 )
._2
// Find top-5 most number of bids for each auction
scala> auctionRDD.map ( line => (line(aucid), 1) )
.reduceByKey ( _ + _ , 4)
.map ( _.swap )
.sortByKey (false)
.map ( _.swap )
.take (5)

Example 4 - Word Count on HDFS Text File

Download & put data file to HDFS by:

wget --content-disposition https://webcms3.cse.unsw.edu.au/files/33c7707c8b646a686e33af7e2f2fc006b53ff8c13d8317976bd262d8c6daae66/attachment
hdfs dfs -put pg100.txt Input/
// Create an RDD from HDFS:
scala> val pg100RDD = sc.textFile ("hdfs://HOST-NAME:PORT/user/USER-NAME/Input/pg100.txt")
// Word count:
scala> pg100RDD.flapMap ( _.split(" ") )
.map ( term => (term, 1) )
.reduceByKey ( _ + _ , 3)
.saveAsTextFile ( "OUTPUT-PATH" )

Example N - Spark Graph-X programming

# Download graph data tiny-graph.txt
$ wget --content-disposition https://webcms3.cse.unsw.edu.au/files/ae6f45a3d64c0b35a3bd4d0c2740cc673f000dc60ec17d0e882faf6c20f74509/attachment
// Import Graphx relavent classes:
scala> import org.apache.spark.graphx._
// Load graph data as RDD:
scala> val tinyGraphRDD = sc.textFile ("file:///home/PATH-TO-GRAPH-DATA/tiny-graph.txt")
// Convert raw data <index, srcVertex, destVertex, weight>
// into graphx readable edges:
scala> val edges = tinyGraphRDD.map ( _.split(" ") )
.map ( line =>
Edge ( line(1).toLong,
line(2).toLong,
line(3).toDouble
)
)
// Create a graph:
scala> val graph = Graph.fromEdges[Double, Double] (edges, 0.0)
// Now the graph has been created,
// show the triplets of this graph:
scala> graph.triplets.collect.foreach ( println )

Written with StackEdit.