Flink开发-IDEA scala开发环境搭建

时间:2023-02-13 19:40:07

现在大数据相关服务,越来越倾向于使用scala语言,scala函数式编程的优势我不多赘述。最明显的一个优点,代码简洁。看个WordCount实现对比:

Java版WordCount
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 DataSet<String> text = env.readTextFile("/path/to/file");

 DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy()
.sum(); counts.writeAsCsv(outputPath, "\n", " "); // User-defined functions
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs
for (String token : tokens) {
if (token.length() > ) {
out.collect(new Tuple2<String, Integer>(token, ));
}
}
}
}
scala版WordCount
 val env = ExecutionEnvironment.getExecutionEnvironment

 // get input data
val text = env.readTextFile("/path/to/file") val counts = text.flatMap { _.toLowerCase.split("\\s+") filter { _.nonEmpty } }
.map ( (_, ) )
.groupBy()
.sum() counts.writeAsCsv(outputPath, "\n", " ")
如何搭建开发环境呢?
这里介绍一下主要关注的点。
  • 版本匹配
idea和scala的版本匹配有比较大的耦合,具体如何对应需要确认一下。根据flink的scala版本选定要安装的scala sdk,flink使用的scala sdk是2.12,所以我这里选择的sdk版本是2.12.8。
  • 安装scala plugin
Flink开发-IDEA scala开发环境搭建
 
Flink开发-IDEA scala开发环境搭建
  • 设置Library
创建flink maven工程后,右键工程打开module setting,在Libraries中下载所需版本的scala sdk,选择下载并等待下载成功。
Flink开发-IDEA scala开发环境搭建
  • 避免多版本sdk冲突
确认安装成功,如果存在其他版本的scala sdk,删除掉
 
Global Libraries
Global Libraries中,同样选中所需的scala sdk,否则compile可能出错,类似这样的错误compiler error
Flink开发-IDEA scala开发环境搭建
  • maven依赖
使用flink scala版本的包依赖,官方的例子用的java版本依赖,会导致写scala调用一些flink api出现语法错误。
 
 <version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.</artifactId>
<version>1.9.</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.</artifactId>
<version>1.9.</version>
</dependency>
</dependencies>
这样代码可以正常在IDEA里面跑了。
 
 
如果创建时找不到scala的文件
 
需要导入scala的sdk,忘了说了,别忘了装好1.8的java的sdk
Flink开发-IDEA scala开发环境搭建
 
 
 
 import org.apache.flink.api.scala.ExecutionEnvironment
object WordCountBatch {
def main(args: Array[String]): Unit = { val inputPath = "D:\\data\\11.txt"
// val env = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._
val text = env.readTextFile(inputPath)
val counts = text.flatMap(_.split("\\W+"))
.filter(_.nonEmpty)
.map((_,))
.groupBy()
.sum() counts.writeAsCsv("D:\\data\\output6").setParallelism()
env.execute("batch wordCount")
} }