Spark(3) - External Data Source

时间:2023-03-08 16:19:25

Introduction

Spark provides a unified runtime for big data. HDFS, which is Hadoop's filesystem, is the most used storage platform for Spark as it provides const-effefctive storage for unstructured and semi-structured data on commodity hardware. Spark is not limited to HDFS and can work with any Hadoop-supported storage.

Hadoop supported storage means a storage format that can work with Hadoop's InputFormat and OutputFormat interfaces. InputFormats is responsible for creating InputSplits from input data and dividing it further into records. OutputFormat is responsible for writing to storage.

Loading data from the local filesystem

Though the local filesystem is not a good fit to store big data due to disk size limitations and lack of distributed nature, technically you can load data in distributed systems using the local filesystem. But then the file/directory you are accessing has to be available on each node.

1. create the words directory

mkdir words

2. get into the words directory

cd words

3. create the sh.txt file

echo "to be or not to be" > sh.txt

4. start the spark shell

spark-shell

5. load the words directory as RDD

scala> val words = sc.textFile("file:///home/hduser/words")

6. count the number of lines

scala> words.count

7. divide the line (or lines) into multiple words

scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))

8. convert word to (word,1)

scala> val wordsMap = wordsFlatMap.map( w => (w,1))

9. add the number of occurrences for each word

scala > val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))

10. print the RDD

scala> wordCount.collect.foreach(println)

11. doing all in one step

scala> sc.textFile("file:///home/hduser/ words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)

Loading data from HDFS

HDFS is the most widely used big data storage system. One of the reasons for the wide adoption of HDFS is schema-on-read. What this means is that HDFS does not put any restriction on data when data is being written. Any and all kinds of data are welcome and can be stored in a raw format. This feature makes it ideal storage for raw unstructured data and semi-structured data.

1. create the words directory

mkdir words

2. get into the words directory

cd words

3. create the sh.txt file

echo "to be or not to be" > sh.txt

4. start the spark shell

spark-shell

5. load the words directory as RDD

scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words")

6. count the number of lines

scala> words.count

7. divide the line (or lines) into multiple words

scala> val wordsFlatMap = words.flatMap(_.split("\\W+"))

8. convert word to (word,1)

scala> val wordsMap = wordsFlatMap.map( w => (w,1))

9. add the number of occurrences for each word

scala > val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))

10. print the RDD

scala> wordCount.collect.foreach(println)

11. doing all in one step

scala> sc.textFile("file:///home/hduser/ words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)

Loading data from HDFS using a custom InputFormat

Sometimes you need to load data in a specific format and TextInputFormat is not a good fit for that. Spark provides two methods for this purpose:

1. sparkContext.hadoopFile: This supports the old MapReduce API
2. sparkContext.newAPIHadoopFile: This supports the new MapReduce API

These two methods provide support for all of Hadoop's built-in InputFormats interfaces as well as any custom InputFormat.

1. create the currency directory

mkdir currency

2. get into the words directory

cd words

3. create the na.txt file and upload the currency folder to HDFS

vi na.txt

United States of America US Dollar
Canada Canadian Dollar
Mexico Peso

hdfs dfs -put currency /user/hduser/currency

4. start the spark shell and import statements

spark-shell

scala> import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat

5. load the currency directory as RDD and convert it from tuple of (Text, Text) to tuple of (String, String)

val currencyFile = sc.newAPIHadoopFile("hdfs://localhost:9000/user/hduser/currency", classOf[KeyValueTextInputFormat], classOf[Text])

val currencyRDD = currencyFile.map(t => (t._1.toString, t._2.toString))

6. count the number of elements in the RDD

scala> currencyRDD.count

7. print the RDD

scala> currencyRDD.collect.foreach(println)

Loading data from Amazon S3

Amazon Simple Storage Service (S3) provides developers and IT teams with a secure, durable, and scalable storage platform. The biggest advantage of Amazon S3 is that there is no up-front IT investment and companies can build capacity (just by clicking a button a button) as they need.

Though Amazon S3 can be used with any compute platform, it integrates really well with Amazon's cloud services such as Amazon Elastic Compute Cloud (EC2) and Amazon Elastic Block Storage (EBS). For this reason, companies who use Amazon Web Services (AWS) are likely to have significant data is already stored on Amazon S3.

1. go to http://aws.amazon.com and log in with username and password

2. navigate to Storage & Content Delivery | S3 | Create Bucket

3. enter the bucket name - for example, com.infoobjects.wordcount

4. select Region, click on Create

5. click on Create Folder end enter words as the folder name

6. create sh.txt file on the local system

echo "to be or not to be" > sh.txt

7. navigate to Words | Upload | Add Files and choose sh.txt from the dialog box

8. click on Start Upload

9. select sh.txt and click on Properties

10. set AWS_ACCESS_KEY and AWS_SECRET_ACCESS_KEY as environment variables

11. open the spark shell and load the words directory from s3 in the words RDD

scala> val words = sc.textFile("s3n://com.infoobjects.wordcount/words")

Load data from Apache Cassandra

Apache Cassandra is a NoSQL database with a masterless ring cluster structure. While HDFS is a good fit for streaming data access, it does not work well with random access. For example, HDFS will work well when your average file size is 100 MB and you want to read the whole file. If you frequently access the nth line in a file or some other part as a record, HDFS would be too slow.

Relational databases have traditionally provided a solution to that, providing low latency, random access, but they do not work well with big data. NoSQL databases such as Cassandra fill the gap by providing relational database type access but in a distributed architecture on commodity servers.

1. create a keyspace named people in Cassandra using the CQL shell

cqlsh> CREATE KEYSPACE people WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

2. create a column family(from CQL 3.0 onwords, it can also be called a table) person in newer version of Cassandra

cqlsh> create columnfamily person(id int primary key, first_name varchar, last_name varchar);

3. insert a few records in the column family

cqlsh> insert into person(id,first_name,last_name) values(1,'Barack','Obama');
cqlsh> insert into person(id,first_name,last_name) values(2,'Joe','Smith');

4. add Cassandra connector dependency to SBT

"com.datastax.spark" % "spark-cassandra-connector" % 1.2.0

5. can also add the Cassandra dependency to Maven

<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.0</version>
</dependency>

6. start the spark shell

spark-shell

7. set the spark.cassandra.connection.host property

scala> sc.getConf.set("spark.cassandra.connection.host", "localhost")

8. import Cassandra-specific libraries

scala> import com.datastax.spark.connector._

9. load the person column family as an RDD

scala> val personRDD = sc.cassandraTable("people", "person")

10. count the number of lines

scala> personRDD.count

11. print the RDD

scala> personRDD.collect.foreach(println)

12. retrieve the first row

scala> var firstRow = personRDD.first

13. get the column names

scala> firstRow.columnNames

14. access Cassandra through Spark SQL

scala> val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)

15. load the person data as SchemaRDD

scala> val p = cc.sql("select * from people.person")

16. print the person data

scala> p.collect.foreach(println)

creating uber JARs with sbt-assembly plugin provided by SBT

1. mkdir uber

2. cd uber

3. open the SBT prompt

sbt

4. give the project a name sc-uber, save the session and exit

> set name := "sc-uber"
> session save
> exit

5. add the spark-cassandra-driver denpendency to build.sbt

vi build.sbt

name := "sc-uber"

libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector" % "1.1.0"

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => (xs map {_.toLowerCase}) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
case _ => MergeStrategy.discard
}
case _ => MergeStrategy.first
}

9. create plugins.sbt in the project folder

vi plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

10. build a JAR

sbt assembly

The uber JAR is now created in target/scala-2.10/sc-uber-assembly-0.1- SNAPSHOT.jar.

11. rename the JAR file

mv thirdparty/sc-uber-assembly-0.1-SNAPSHOT.jar thirdparty/sc-uber.jar

12. load the spark shell with the uber JAR

spark-shell --jars thirdparty/sc-uber.jar

13. call spark-submit with JARS option to submit Scala code to a cluster

spark-submit --jars thirdparty/sc-uber.jar