Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

时间:2022-09-24 10:55:08

[comment]: # Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

前言

Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。

Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。

本文的目标是写一个基于kafka的scala工程,在一个spark standalone的集群环境中运行。

项目结构和文件说明

说明

这个工程包含了两个应用。

一个Consumer应用:CusomerApp - 实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。

一个Producer应用:ProducerApp - 实现了向Kafka集群发消息的功能。

文件结构

KafkaSampleApp   # 项目目录
|-- build.bat # build文件
|-- src
|-- main
|-- scala
|-- ConsumerApp.scala # Consumer应用
|-- ProducerApp.scala # Producer应用

构建工程目录

可以运行:

mkdir KafkaSampleApp
mkdir -p /KafkaSampleApp/src/main/scala

代码

build.sbt

name := "kafka-sample-app"

version := "1.0"

scalaVersion := "2.11.8"

scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps" libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.0",
"org.apache.spark" %% "spark-streaming" % "2.0.0",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0",
"org.apache.kafka" %% "kafka" % "0.8.2.1"
)

CusomerApp.scala

这个例子中使用了Spark自带的Stream+Kafka结合的技术,有个限制的绑定了kafka的8.x版本。

我个人建议只用Kafka的技术,写一个Consomer,或者使用其自带的Consumer,来接受消息。

然后再使用Spark的技术。

这样可以跳过对kafak版本的限制。

import java.util.Properties
import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf object ConsumerApp {
def main(args: Array[String]) {
val brokers = "localhost:9092"
val topics = "test-topic" // Create context with 10 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10)) // Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
println("============== Start ==============")
wordCounts.print
println("============== End ==============") // Start the computation
ssc.start()
ssc.awaitTermination()
}
}

ProducerApp.scala

import java.util.Arrays
import java.util.List
import java.util.Properties
import org.apache.kafka.clients.producer._ object ProducerApp {
def main(args: Array[String]): Unit = { val props = new Properties()
// Must-have properties
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // Optional properties
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none")
props.put(ProducerConfig.SEND_BUFFER_CONFIG, (1024*100).toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, (100).toString)
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, (5*60*1000L).toString)
//props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, (60*1000l).toString)
props.put(ProducerConfig.ACKS_CONFIG, (0).toString)
//props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, (1500).toString)
props.put(ProducerConfig.RETRIES_CONFIG, (3).toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, (1000).toString)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (32 * 1024 * 1024L).toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, (200).toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-app-producer") val producer = new KafkaProducer[String, String](props) // Thread hook to close produer
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
}) // send 10 messages
var i = 0
for( i <- (1 to 10)) {
val data = new ProducerRecord[String, String]("test-topic", "test-key", s"test-message $i")
producer.send(data)
} // Reduce package lost
Thread.sleep(1000)
producer.close()
}
}

构建工程

进入目录KafkaSampleApp。运行:

sbt package

第一次运行时间会比较长。

测试应用

启动Kafka服务

# Start zookeeper server
gnome-terminal -x sh -c '$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties; bash' # Wait zookeeper server is started.
sleep 8s # Start kafka server
gnome-terminal -x sh -c '$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties; bash' # Wait kafka server is started.
sleep 5s

注:使用Ctrl+C可以中断服务。

  • 创建一个topic
# Create a topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic # List topics
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

启动Spark服务

  • 启动spark集群master server
$SPARK_HOME/sbin/start-master.sh

master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

  • 启动spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

启动Consumer应用

新起一个终端,来运行:

$SPARK_HOME/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --master spark://$(hostname):7077 --class ConsumerApp target/scala-2.11/kafka-sample-app_2.11-1.0.jar

注:如果定义的topic没有create,第一次运行会失败,再运行一次就好了。

如果出现java.lang.NoClassDefFoundError错误,

请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境

确保kafka的包在Spark中设置好了。

启动Producer应用

java -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp
# or
# $KAFKA_HOME/bin/kafka-run-class.sh -classpath ./target/scala-2.11/kafka-sample-app_2.11-1.0.jar:$KAFKA_HOME/libs/* ProducerApp

然后:看看Consumer应用是否收到了消息。

总结

建议写一个Kafka的Consumer,然后调用Spark功能,而不是使用Spark的Stream+Kafka的编程方式。

好处是可以使用最新版本的Kafka。

Kafka的包中带有一个Sample代码,可以从中学习一些编写程序的方法。

参照

Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用的更多相关文章

  1. Spark集群 &plus; Akka &plus; Kafka &plus; Scala 开发&lpar;3&rpar; &colon; 开发一个Akka &plus; Spark的应用

    前言 在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境. 在Spark集群 + Akka + Kafka + S ...

  2. Spark集群 &plus; Akka &plus; Kafka &plus; Scala 开发&lpar;2&rpar; &colon; 开发一个Spark应用

    前言 在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境,我们已经部署好了一个Spark的开发环境. 本文的目标是写一个Spark应用,并可以在集群中测试. ...

  3. Spark集群 &plus; Akka &plus; Kafka &plus; Scala 开发&lpar;1&rpar; &colon; 配置开发环境

    目标 配置一个spark standalone集群 + akka + kafka + scala的开发环境. 创建一个基于spark的scala工程,并在spark standalone的集群环境中运 ...

  4. Spark入门:第2节 Spark集群安装:1 - 3;第3节 Spark HA高可用部署:1 - 2

    三. Spark集群安装 3.1 下载spark安装包 下载地址spark官网:http://spark.apache.org/downloads.html 这里我们使用 spark-2.1.3-bi ...

  5. 大数据技术之&lowbar;19&lowbar;Spark学习&lowbar;01&lowbar;Spark 基础解析 &plus; Spark 概述 &plus; Spark 集群安装 &plus; 执行 Spark 程序

    第1章 Spark 概述1.1 什么是 Spark1.2 Spark 特点1.3 Spark 的用户和用途第2章 Spark 集群安装2.1 集群角色2.2 机器准备2.3 下载 Spark 安装包2 ...

  6. hadoop&plus;spark集群搭建入门

    忽略元数据末尾 回到原数据开始处 Hadoop+spark集群搭建 说明: 本文档主要讲述hadoop+spark的集群搭建,linux环境是centos,本文档集群搭建使用两个节点作为集群环境:一个 ...

  7. CentOS7 安装spark集群

    Spark版本 1.6.0 Scala版本 2.11.7 Zookeeper版本 3.4.7 配置虚拟机 3台虚拟机,sm,sd1,sd2 1. 关闭防火墙 systemctl stop firewa ...

  8. spark集群搭建

    文中的所有操作都是在之前的文章scala的安装及使用文章基础上建立的,重复操作已经简写: 配置中使用了master01.slave01.slave02.slave03: 一.虚拟机中操作(启动网卡)s ...

  9. Spark学习笔记5:Spark集群架构

    Spark的一大好处就是可以通过增加机器数量并使用集群模式运行,来扩展计算能力.Spark可以在各种各样的集群管理器(Hadoop YARN , Apache Mesos , 还有Spark自带的独立 ...

随机推荐

  1. c&plus;&plus;之函数重载&lpar;函数匹配&rpar;

    Case void f(); void f(int); void f(int, int); void f(double, double = 3.14); 匹配原则: 1)其形参数量与本次调用提供的实参 ...

  2. mysql 直接从date 文件夹备份表,还原数据库之后提示 table doesn&grave;t exist的原因和解决方法

    补充:正常情况下,建议数据库备份最好用工具进行备份,通过拷贝数据库表进行数据迁移,不同的环境会出现各种不同的意外问题. 背景:今天在整理一个网站的时候,操作系统由于系统自动更新导致一直出现系统蓝屏死机 ...

  3. Linux iptables 应用控制访问SSH服务

    Title:Linux iptables 应用控制访问SSH服务  --2012-02-23 17:51 今天用到了以前从来没有用到过的,iptables控制访问,只允许外部访问SSH服务(22号端口 ...

  4. 一、Cocos2dx在visualStudio或者vc&plus;&plus;中环境搭建(入门篇)

    本文由qinning199原创,转载请注明:http://www.cocos2dx.net/?p=106 0.概述 Cocos2dx-win32的项目能够被向导生成 向导支持vs2008,vs2010 ...

  5. ThoughtWorks 2017技术雷达

    前言: ThoughtWorks人酷爱技术.我们对技术进行构建.研究. 测试.开源.记述,并始终致力于对其进行改进-以求造福 大众.我们的使命是支持卓越软件并掀起IT革命.我们创建 并分享Though ...

  6. 理解 JavaScript 中的 this

    前言 理解this是我们要深入理解 JavaScript 中必不可少的一个步骤,同时只有理解了 this,你才能更加清晰地写出与自己预期一致的 JavaScript 代码. 本文是这系列的第三篇,往期 ...

  7. 【Windows】windows核心编程整理(上)

    小续 这是我11年看<windows核心编程>时所作的一些笔记,现整理出来共享给大家 windows核心编程整理(上) windows核心编程整理(下) 线程的基础知识 进程是不活泼的,进 ...

  8. 【转】两款 Web 前端性能测试工具

    前段时间接手了一个 web 前端性能优化的任务,一时间不知道从什么地方入手,查了不少资料,发现其实还是蛮简单的,简单来说说. 一.前端性能测试是什么? 前端性能测试对象主要包括: HTML.CSS.J ...

  9. JetBrains产品永久破解

    吃水不忘挖井人,本博客转自:https://www.cnblogs.com/jyiqing/p/7699649.html 目的: 本人使用idea和webstorm进行开发,无奈正版实在是太贵了,只能 ...

  10. 应用程序 调用 webservice

    首先用VS创建一个WebService服务工程,并且完成基本功能,本人完成的是html转pdf功能. 然后,新建一个Windows应用程序. 添加WebService到Windows项目中,如图 然后 ...