spark 广播变量

时间:2023-03-10 03:04:10
spark 广播变量

Spark广播变量

spark 广播变量

使用广播变量来优化,广播变量的原理是:

在每一个Executor中保存一份全局变量,task在执行的时候需要使用和这一份变量就可以,极大的减少了Executor的内存开销。

Executor中task在执行的时候如果使用到了广播变量,会找Executor里面的BlockManager来获取广播变量。

如果BlockManager中没有这个关闭变量,会从driver端拉取关闭变量。

在Driver端也有一个blockManagerMaster,其他的task执行的时候直接使用blockmanager中的广播变量就可以。

package SparkStreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast; import java.util.Arrays;
import java.util.List; public class BroadCast {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("BroadCast");
JavaSparkContext sc = new JavaSparkContext(conf);
/*
* 使用广播变量,广播变量的定义必须在driver端,因为sc没有被序列化不能被发送到Executor端
* */
Broadcast<String> blackname = sc.broadcast("dwj3");
List<String> name = Arrays.asList(
"dwj1",
"dwj2",
"dwj3");
//String blackName = "dwj3";
JavaRDD<String> nameRDD = sc.parallelize(name);
JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
String blacknames = blackname.getValue();
return !blacknames.equals(s);
}
});
List<String> lastname = namefilter.collect();
for(String str:lastname){
System.out.println(str);
}
}
}

注意:在声明广播变量的时候,必须在driver端,因为sc没有被序列化,是不能被发送到Executor端的。