我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
1
|
121.205.198.92
- - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
|
2
|
121.205.198.92
- - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
|
3
|
121.205.198.92
- - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
|
4
|
121.205.198.92
- - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
|
5
|
121.205.241.229
- - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
|
6
|
121.205.241.229
- - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
|
Java实现Spark应用程序(Application)
我们实现的统计分析程序,有如下几个功能点:
- 从HDFS读取日志数据文件
- 将每行的第一个字段(IP地址)抽取出来
- 统计每个IP地址出现的次数
- 根据每个IP地址出现的次数进行一个降序排序
- 根据IP地址,调用GeoIP库获取IP所属国家
- 打印输出结果,每行的格式:[国家代码] IP地址 频率
下面,看我们使用Java实现的统计分析应用程序代码,如下所示:
001
|
package org.shirdrn.spark.job;
|
004
|
import java.io.IOException;
|
005
|
import java.util.Arrays;
|
006
|
import java.util.Collections;
|
007
|
import java.util.Comparator;
|
008
|
import java.util.List;
|
009
|
import java.util.regex.Pattern;
|
011
|
import org.apache.commons.logging.Log;
|
012
|
import org.apache.commons.logging.LogFactory;
|
013
|
import org.apache.spark.api.java.JavaPairRDD;
|
014
|
import org.apache.spark.api.java.JavaRDD;
|
015
|
import org.apache.spark.api.java.JavaSparkContext;
|
016
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
017
|
import org.apache.spark.api.java.function.Function2;
|
018
|
import org.apache.spark.api.java.function.PairFunction;
|
019
|
import org.shirdrn.spark.job.maxmind.Country;
|
020
|
import org.shirdrn.spark.job.maxmind.LookupService;
|
022
|
import scala.Serializable;
|
025
|
public class IPAddressStats implements Serializable
{
|
027
|
private static final long serialVersionUID
= 8533489548835413763L;
|
028
|
private static final Log
LOG = LogFactory.getLog(IPAddressStats. class );
|
029
|
private static final Pattern
SPACE = Pattern.compile( "
" );
|
030
|
private transient LookupService
lookupService;
|
031
|
private transient final String
geoIPFile;
|
033
|
public IPAddressStats(String
geoIPFile) {
|
034
|
this .geoIPFile
= geoIPFile;
|
036
|
//
lookupService: get country code from a IP address
|
037
|
File
file = new File( this .geoIPFile);
|
038
|
LOG.info( "GeoIP
file: " +
file.getAbsolutePath());
|
039
|
lookupService
= new AdvancedLookupService(file,
LookupService.GEOIP_MEMORY_CACHE);
|
040
|
} catch (IOException
e) {
|
041
|
throw new RuntimeException(e);
|
045
|
@SuppressWarnings ( "serial" )
|
046
|
public void stat(String[]
args) {
|
047
|
JavaSparkContext
ctx = new JavaSparkContext(args[ 0 ], "IPAddressStats" ,
|
048
|
System.getenv( "SPARK_HOME" ),
JavaSparkContext.jarOfClass(IPAddressStats. class ));
|
049
|
JavaRDD<String>
lines = ctx.textFile(args[ 1 ], 1 );
|
051
|
//
splits and extracts ip address filed
|
052
|
JavaRDD<String>
words = lines.flatMap( new FlatMapFunction<String,
String>() {
|
054
|
public Iterable<String>
call(String s) {
|
055
|
//
121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101
Firefox/11.0"
|
057
|
return Arrays.asList(SPACE.split(s)[ 0 ]);
|
062
|
JavaPairRDD<String,
Integer> ones = words.map( new PairFunction<String,
String, Integer>() {
|
064
|
public Tuple2<String,
Integer> call(String s) {
|
065
|
return new Tuple2<String,
Integer>(s, 1 );
|
070
|
JavaPairRDD<String,
Integer> counts = ones.reduceByKey( new Function2<Integer,
Integer, Integer>() {
|
072
|
public Integer
call(Integer i1, Integer i2) {
|
077
|
List<Tuple2<String,
Integer>> output = counts.collect();
|
079
|
//
sort statistics result by value
|
080
|
Collections.sort(output, new Comparator<Tuple2<String,
Integer>>() {
|
082
|
public int compare(Tuple2<String,
Integer> t1, Tuple2<String, Integer> t2) {
|
085
|
} else if (t1._2
> t2._2) {
|
092
|
writeTo(args,
output);
|
096
|
private void writeTo(String[]
args, List<Tuple2<String, Integer>> output) {
|
097
|
for (Tuple2<?,
?> tuple : output) {
|
098
|
Country
country = lookupService.getCountry((String) tuple._1);
|
099
|
LOG.info( "[" +
country.getCode() + "]
" +
tuple._1 + "\t" +
tuple._2);
|
103
|
public static void main(String[]
args) {
|
105
|
if (args.length
< 3 )
{
|
106
|
System.err.println( "Usage:
IPAddressStats <master> <inFile> <GeoIPFile>" );
|
111
|
String
geoIPFile = args[ 2 ];
|
112
|
IPAddressStats
stats = new IPAddressStats(geoIPFile);
|
具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:
03
|
< groupId >org.apache.spark</ groupId >
|
04
|
< artifactId >spark-core_2.10</ artifactId >
|
05
|
< version >0.9.0-incubating</ version >
|
08
|
< groupId >log4j</ groupId >
|
09
|
< artifactId >log4j</ artifactId >
|
10
|
< version >1.2.16</ version >
|
13
|
< groupId >dnsjava</ groupId >
|
14
|
< artifactId >dnsjava</ artifactId >
|
15
|
< version >2.1.1</ version >
|
18
|
< groupId >commons-net</ groupId >
|
19
|
< artifactId >commons-net</ artifactId >
|
20
|
< version >3.1</ version >
|
23
|
< groupId >org.apache.hadoop</ groupId >
|
24
|
< artifactId >hadoop-client</ artifactId >
|
25
|
< version >1.2.1</ version >
|
需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用
transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出
现类似如下的错误:
01
|
14/03/10
22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76
|
02
|
Exception
in thread "main" org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException:
org.shirdrn.spark.job.IPAddressStats
|
03
|
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
|
04
|
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
|
05
|
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
|
06
|
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
|
07
|
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
|
08
|
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
|
09
|
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
|
10
|
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741)
|
11
|
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
|
12
|
at
scala.collection.immutable.List.foreach(List.scala:318)
|
13
|
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740)
|
14
|
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
|
15
|
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
|
16
|
at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
|
17
|
at
akka.actor.ActorCell.invoke(ActorCell.scala:456)
|
18
|
at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
|
19
|
at
akka.dispatch.Mailbox.run(Mailbox.scala:219)
|
20
|
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
|
21
|
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
|
22
|
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
|
23
|
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
|
24
|
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
|
在Spark集群上运行Java程序
这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:
02
|
< artifactId >maven-assembly-plugin</ artifactId >
|
06
|
< mainClass >org.shirdrn.spark.job.UserAgentStats</ mainClass >
|
10
|
< descriptorRef >jar-with-dependencies</ descriptorRef >
|
13
|
< exclude >*.properties</ exclude >
|
14
|
< exclude >*.xml</ exclude >
|
19
|
< id >make-assembly</ id >
|
20
|
< phase >package</ phase >
|
将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark
的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们
实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:
03
|
CYGWIN*)
cygwin= true ;;
|
08
|
#
Figure out where the Scala framework is installed
|
09
|
FWDIR= "$(cd
`dirname $0`/..; pwd)"
|
11
|
#
Export this as SPARK_HOME
|
12
|
export SPARK_HOME= "$FWDIR"
|
14
|
#
Load environment variables from conf/spark-env.sh, if it exists
|
15
|
if [
-e "$FWDIR/conf/spark-env.sh" ]
; then
|
16
|
.
$FWDIR/conf/spark- env .sh
|
20
|
echo "Usage:
run-example <example-class> [<args>]" >&2
|
24
|
#
Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
|
25
|
#
to avoid the -sources and -doc packages that are built by publish-local.
|
26
|
EXAMPLES_DIR= "$FWDIR" /java-examples
|
28
|
if [
-e "$EXAMPLES_DIR" /*.jar
]; then
|
29
|
export SPARK_EXAMPLES_JAR=` ls "$EXAMPLES_DIR" /*.jar`
|
31
|
if [[
-z $SPARK_EXAMPLES_JAR ]]; then
|
32
|
echo "Failed
to find Spark examples assembly in $FWDIR/examples/target" >&2
|
33
|
echo "You
need to build Spark with sbt/sbt assembly before running this program" >&2
|
38
|
#
Since the examples JAR ideally shouldn't include spark-core (that dependency should be
|
39
|
#
"provided"), also add our standard Spark classpath, built using compute-classpath.sh.
|
40
|
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
|
41
|
CLASSPATH= "$SPARK_EXAMPLES_JAR:$CLASSPATH"
|
44
|
CLASSPATH=`cygpath
-wp $CLASSPATH`
|
45
|
export SPARK_EXAMPLES_JAR=`cygpath
-w $SPARK_EXAMPLES_JAR`
|
49
|
if [
-n "${JAVA_HOME}" ]; then
|
50
|
RUNNER= "${JAVA_HOME}/bin/java"
|
52
|
if [
` command - v java`
]; then
|
55
|
echo "JAVA_HOME
is not set" >&2
|
60
|
#
Set JAVA_OPTS to be able to load native libraries and to set heap size
|
61
|
JAVA_OPTS= "$SPARK_JAVA_OPTS"
|
62
|
JAVA_OPTS= "$JAVA_OPTS
-Djava.library.path=$SPARK_LIBRARY_PATH"
|
63
|
#
Load extra JAVA_OPTS from conf/java-opts, if it exists
|
64
|
if [
-e "$FWDIR/conf/java-opts" ]
; then
|
65
|
JAVA_OPTS= "$JAVA_OPTS
`cat $FWDIR/conf/java-opts`"
|
69
|
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
|
70
|
echo -n "Spark
Command: "
|
71
|
echo "$RUNNER" - cp "$CLASSPATH" $JAVA_OPTS "$@"
|
72
|
echo "========================================"
|
76
|
exec "$RUNNER" - cp "$CLASSPATH" $JAVA_OPTS "$@"
|
在Spark上运行我们开发的Java程序,执行如下命令:
1
|
cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
|
我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:
- Spark集群主节点URL:例如我的是spark://m1:7077
- 输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
- GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件
如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:
01
|
14/03/10
22:17:24 INFO job.IPAddressStats: GeoIP file:
/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
|
02
|
SLF4J:
Class path contains multiple SLF4J bindings.
|
03
|
SLF4J:
Found binding in
[jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
|
04
|
SLF4J:
Found binding in
[jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
|
06
|
SLF4J:
Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
|
07
|
14/03/10
22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
|
08
|
14/03/10
22:17:25 INFO Remoting: Starting remoting
|
09
|
14/03/10
22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379]
|
11
|
14/03/10
22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster
|
12
|
14/03/10
22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb
|
13
|
14/03/10
22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB.
|
14
|
14/03/10
22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189)
|
15
|
14/03/10
22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
|
16
|
14/03/10
22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM
|
17
|
14/03/10
22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager
|
18
|
14/03/10
22:17:25 INFO spark.HttpServer: Starting HTTP Server
|
19
|
14/03/10
22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
|
20
|
14/03/10
22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186
|
22
|
14/03/10
22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker
|
23
|
14/03/10
22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370
|
24
|
14/03/10
22:17:25 INFO spark.HttpServer: Starting HTTP Server
|
25
|
14/03/10
22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
|
26
|
14/03/10
22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073
|
27
|
14/03/10
22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT
|
28
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
|
29
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
|
30
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
|
31
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
|
32
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
|
33
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
|
34
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
|
35
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
|
36
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
|
37
|
14/03/10
22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
|
38
|
14/03/10
22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
|
39
|
14/03/10
22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040
|
41
|
14/03/10
22:17:26 INFO client.AppClient$ClientActor: Connecting to masterspark://m1:7077...
|
42
|
14/03/10
22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657
|
43
|
14/03/10
22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB)
|
44
|
14/03/10
22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000
|
45
|
14/03/10
22:17:27 INFO client.AppClient$ClientActor: Executor added:
app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544)
with 1 cores
|
46
|
14/03/10
22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID
app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB
RAM
|
47
|
14/03/10
22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
|
48
|
14/03/10
22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded
|
49
|
14/03/10
22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING
|
50
|
14/03/10
22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1
|
51
|
14/03/10
22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77
|
52
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70)
|
53
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false)
|
54
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77)
|
55
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
|
56
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
|
57
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1
(MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which
has no missing parents
|
58
|
14/03/10
22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from
Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70)
|
59
|
14/03/10
22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
|
61
|
14/03/10
22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL)
|
62
|
14/03/10
22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms
|
63
|
14/03/10
22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM
|
64
|
14/03/10
22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1)
|
65
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
|
66
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s
|
67
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
|
68
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: running: Set()
|
69
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
|
70
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: failed: Set()
|
71
|
14/03/10
22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
|
72
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
|
73
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is
now runnable
|
74
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from
Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70)
|
75
|
14/03/10
22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
|
76
|
14/03/10
22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL)
|
77
|
14/03/10
22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms
|
78
|
14/03/10
22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534
|
79
|
14/03/10
22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes
|
80
|
14/03/10
22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1)
|
81
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
|
82
|
14/03/10
22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s
|
83
|
14/03/10
22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
|
84
|
14/03/10
22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s
|
85
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218 312
|
86
|
14/03/10
22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77 300
|
87
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16 212
|
88
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254 207
|
89
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134 185
|
90
|
14/03/10
22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181 181
|
91
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212 180
|
92
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83 176
|
93
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205 169
|
94
|
14/03/10
22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19 165
|
我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。
另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。
参考链接