当前技术博客产生背景:
当前问题产生已经在2年前发生,当前博客只是为了记录当时的情况。
业务需求:SparkStreaming实时传输数据需要实时与MySql中数据进行比对,所以需要每分钟更新MySql数据数据,实现广播变量,将MySql数据更新后,实现动态变量广播。(之前方案将MySql的连接查询静态方法写在Executer端调用,如果每一个Executer的SparkStreaming的Filter数据过滤时候都在过滤时候调用MySql查询连接,当分布式计算一次调用一批数据量到达十万,那么当前Filter 就会查询十万次数据库,会造成数据库压力,连接创建,数据查询,并且比对,会造成数据实时比对延迟,同时会造成数据存储延迟。)
故而,使用动态广播变量,动态广播变量可以实现每分钟广播一次,广播在Driver端,实现分布式集群共享。
下列代码为示例
package ;
import ;
import ;
import ;
import ;
import ;
import ;
import .JDBCUtils3;
import .JJGA_MessageUtils;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import .;
import .;
import .;
import scala.Tuple2;
import ;
import ;
import ;
import ;
import ;
import .*;
public class IM_RealTime_Depoly {
//Mysql 每分钟动态广播变量数据变量
private static volatile Broadcast<ArrayList<IMDataDTO>> broadcastList = null;
static HashMap<String, String> map = null;
static {
map = new HashMap<>();
//设置静态参数
("66001", "66001");
("66002", "66001");
("66003", "66001");
}
public static void main(String[] args) throws IOException {
//设置hadoop 文件备份为1,Hadoop 系统默认3份 减少数据同步延迟性
//无法写入;我的环境中有3个datanode,备份数量设置的是3。在写操作时,它会在pipeline中写3个机器
Configuration hdfs = new Configuration();
("","1");
("","NEVER");
("","true");
//初始化spark
SparkConf conf = new SparkConf().setAppName("ZXSK_RealTime_Depoly");
("", "false");
("", "");
//设置本地模式线程,最少2+
("local[5]");
JavaSparkContext sc = new JavaSparkContext(conf);
//初始化sparkStreaming对象 设置调用时间60秒 获取一批数据
JavaStreamingContext ssc = new JavaStreamingContext(sc, (60));
("/sparkStreaming_consume_kafka/deploy/RealTime_Depoly");
//初始化广播变量
broadcastList = (readDeployUser_ZXSK());
//配置kafka参数(节点、消费者组、topic)
String brokers = "IP:9092,IP:9092,IP:9092";//指定kafka地址
String groupId = "kafka-02"; //指定消费者组id
String topics = "topics"; //指定topic
Set<String> topicsSet = new HashSet<>(((",")));
Map<String, Object> kafkaParams = new HashMap<>();
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
(ConsumerConfig.GROUP_ID_CONFIG, groupId);
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, );
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, );
//此处可以参考 偏移量 反压我的相关博客,可以使用 限制消费速率等配置
//每次程序启动获取最新的消费者偏移量|【此处可以使用 Redis 记录数据偏移量,详情见我偏移量博客】
("", "latest");
//关闭消费之偏移量自动提交
("", false);
//链接kafka,获得DStream对象
JavaInputDStream<ConsumerRecord<String, String>> messages =
(ssc, (), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams));
//判断数据Key是否是 IM 开头的
JavaDStream<ConsumerRecord<String, String>> FirstData = (new Function<ConsumerRecord<String, String>, Boolean>() {
@Override
public Boolean call(ConsumerRecord<String, String> v1) throws Exception {
return ().startsWith("IM");
}
});
//讲获得的kafka数据转换为K,V格式对象
JavaPairDStream<String, String> lines = (new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>((), ());
}
});
//Spark 数据转化为Map 对象 key Value 数据结构
//时间戳,IM,位置区码,基站号,基站经度,基站纬度
JavaPairDStream<String, IMDataDTO> MyJavaPairDS = (new PairFunction<Tuple2<String, String>, String, IMDataDTO>() {
@Override
public Tuple2<String, IMDataDTO> call(Tuple2<String, String> tuple2) throws Exception {
String[] split = tuple2._2.split(",", -1);
IMDataDTO IMPerson = new IMDataDTO();
//获取redis 中存储得对应的字典表
String Redis_IM_Device = ("IM_Device_" + split[11], 5);
//采集时间戳 1
(split[0] + "000");
//设置设备地址 2
StringBuffer addr = new StringBuffer();
if (null != Redis_IM_Device) {
(Redis_IM_Device.split("\\^", -1)[3]).append(Redis_IM_Device.split("\\^", -1)[4]);
}
(());
//IM 3
(split[6]);
//编号 4
(split[11]);
//设置主ID 更具 当前设备ID 获取到 当前属于什么ID
((split[11]));
//经度 5
(split[12]);
//基站纬度 6
(split[13]);
//设置数据 7
("数据类型");
//根据当前主ID进行分组
return new Tuple2<String, IMDataDTO>((), IMPerson);
}
});
//Spark得过滤算子 Filter
JavaPairDStream<String, IMDataDTO> filter = (new Function<Tuple2<String, IMDataDTO>, Boolean>() {
@Override
public Boolean call(Tuple2<String, IMDataDTO> x) throws Exception {
//筛选人员IM | Map
Map<String, IMDataDTO> map = new HashMap<>();
//获取Driver端得广播变量数据 获取数据库查询到List集合【可以理解为Mysql查询得当前数据为重点数据,需要在这一批次查询过滤比对出这些数据】
ArrayList<IMDataDTO> DeployUser = ();
String[] split = null;
for (IMDataDTO dto : DeployUser) {
//管理中的IM不为空则按照逗号分隔,因为可能有IM填写多个的情况
if ((())) {
split = ().split(",", -1);
for (String IM : split) {
//存储Key 数据对象
(IM, dto);
}
}
}
//筛选IM 如果不是重点,则不要
if (!(x._2.getPersonIM())) {
return false;
}
//上面重点规则
HashMap<String, String> device = new HashMap<>();
JSONArray device_rule = ((((x._2.getPersonIM()).getRule()).get("device").toString()));
for (int i = 0; i < device_rule.size(); i++) {
JSONObject jsonObject = device_rule.getJSONObject(i);
if (null == jsonObject) {
continue;
}
String source = ("source");
if ("不是我要的数据类型".equals(source)) {
continue;
}
String device_number = ("device_number");
(device_number, device_number);
}
//筛选设备和规则,不是重点SHEB,同样不用入库,不产生报警
if (!(x._2.getPid())) {
return false;
}
//设置人员
x._2.setPerson_number((x._2.getPersonIM()).getPerson_number());
//设置控描述
x._2.setDescribe((x._2.getPersonIM()).getDescribe());
//设置控ID
x._2.setDepID((x._2.getPersonIM()).getDeployID());
//设置布员名称
x._2.setPersonName((x._2.getPersonIM()).getPersonName());
//是否是布控人的IM不是就返回false
return true;
}
});
//对上述IM相同数据进行 分组,GroupByKey 合并到一起
JavaPairDStream<String, Iterable<IMDataDTO>> IMJavaPairDS = ();
(new VoidFunction<JavaPairRDD<String, Iterable<IMDataDTO>>>() {
@Override
public void call(JavaPairRDD<String, Iterable<IMDataDTO>> v1) throws Exception {
//此处判断是否为广播变量, 对存储在Driver 端的广播变量进行数据修改。
if (broadcastList != null) {
//不为空时,数据广播出去
(true);
//此处查询Mysql 数据 广播到全局,避免只在Executer 端,产生数据延迟
broadcastList = ().broadcast(readDeployUser_ZXSK(), ());
}
//次数使用数据合并,使用一个分区进行遍历分许,减少数据库开销连接,如果数据量巨大,可以按需求增加数量
(1).foreachPartition(new VoidFunction<Iterator<Tuple2<String, Iterable<IMDataDTO>>>>() {
@Override
public void call(Iterator<Tuple2<String, Iterable<IMDataDTO>>> v2) throws Exception {
String time_stmp = (new Date(), "yyyy-MM-dd");
DepolyTrackDTO dtrack = new DepolyTrackDTO();
String Deploy_keys = null;
List<String> ids=null;
String person_number = null;
while (()) {
//数据转化为数组,然后遍历循环
list = (()._2.iterator());
for (IMDataDTO dto : list) {
String mess = "姓名:" + ()+"**************对应业务说明***************";
String sql = "insert into pub_result(deploy_id,create_time,message) values
("(()) + "," + ())";
Connection conn = null;
PreparedStatement psvm = null;
try {
conn = ();
//设置手动事务
(false);
psvm = (sql);
();
//成功则提交
();
} catch (Exception e) {
();
try {
//异常则回滚
();
} catch (SQLException e1) {
();
}
} finally {
(psvm, conn);
}
}
}
}
}
});
}
});
();
try {
();
} catch (InterruptedException e) {
();
}
}
/**
* 查询Mysql 数据库,实现广播变量
* @return
*/
public static ArrayList<imDataDTO> readDeployUser_ZXSK(){
String sql = "select id,im,`describe from pub_dep_new where im !=''";
Connection conn = null;
PreparedStatement psvm = null;
ResultSet rs = null;
ArrayList<imDataDTO> rst = new ArrayList<imDataDTO>();
try {
conn = ();
psvm = (sql);
rs = ();
String deploy_name= "";
String deploy_rule= "";
String im = "";
imDataDTO person =null;
while (()) {
person = new imDataDTO();
deploy_id = ("id");
im = ("im");
//人ID
(deploy_id);
//被管im
(im);
(person);
}
} catch (Exception e) {
();
}
finally {
(rs,psvm,conn);
}
return rst;
}
}