概要:
Oracle Stream Analytics(OSA)是企业级大数据流实时分析计算平台。它可以通过使用复杂的关联模式,扩充和机器学习算法来自动处理和分析大规模实时信息。流式传输的大数据可以源自IoT传感器,Web管道,日志文件,销售点设备,ATM机,社交媒体,事务数据库,NoSQL数据库或任何其他数据源。
OSA为业务用户提供了动态创建和实施即时洞察解决方案。它允许用户通过实时图表,地图,可视化视图来实时浏览实时数据,并以图形方式构建流传输管道,而无需进行任何手工编码。 OSA使用与Oracle连续查询引擎集成的Apache Kafka和Apache Spark Streaming在可伸缩且高度可用的集群大数据环境中执行。
OSA广泛应用于以下场景,以解决现代企业中的关键实时用例:
- 金融服务:实时反欺诈,实时风控,实时营销。
- 交通运输:公共交通,车辆调度,集装箱追踪。
- 运营商:精准营销,网络管理,“万物互联”。
- 零售:个性化推荐,动态定价,货架管理。
- 制造业:智能库存,品质管控,预测性维护。
- 公共安全:平安城市。
本文我们将介绍如何使用Oracle Stream Analytics实现实时数据采集,实时数据处理,实时数据可视化以及实时数据同步到大数据平台的整个过程。
Oracle DB→Oracle GoldenGate→Kafka→OSA REST→HBase
具体场景及工具如下:
测试环境说明如下:
整体步骤大致如下:
- 源端数据同步及KAFKA准备
- OSA安装及HBASE配置
- OSA配置
详细步骤如下:
源端数据同步及KAFKA准备
1.在源端数据库Enable GoldenGate并创建测试表
2.配置源端Golden Gate实时数据捕获
3.部署并配置GoldenGate for BigData
- OGG for BigData无需安装,只需解压即可使用
- 编辑配置文件:kafka.props,custom_kafka_producer.properties
4. 拉起extract, pump, replicat进程
Oracle stream analytics安装及Hbase配置
5. OSA安装
OSA下载链接如下:
Oracle Stream Analytics Downloads
目前最新版本为19.1,按照官方安装文档进行安装。
6.HBase的安装及测试表创建
安装并创建同步测试表:
Oracle Stream Analytics配置
下面将详细介绍OSA配置过程:
7.登录OSA
osaadmin为OSA预制用户
8.进入OSA界面,选择【Catalog】选项
9.选择创建新连接选项,点击【Create New Item】
10.创捷连接选项,输入连接名称,连接类型选择【kafka】
并在kafka bootstrap里输入broker地址,测试连接成功
11.创建新的流数据,点击创建【Create Stream】
类型选择【kafka】选择之前创建的连接,输入Topic名称,选择数据格式【JSON】。
topic名称为OGG for bigdata配置文件里指定的Topic
12.预先定义好JSON数据格式的文件如下:
下面定义可以从kafka消费信息中获取,使用bin/kafka-console-consumer.sh
13.OSA支持的数据格式有【CSV】,【JSON】,【AVRO】
这次使用的为上述预定义好的JSON格式
14.创建Custom Jar,选择创建【Custom Jar】
输入名称,选择类型为【Custom Jar】,在Jar URL上指定预先创建好的java程序
需要使用Custom Jar获取流数据并且进行Base64加密处理,Hbase上使用Rest进行操作时,仅支持Base64加密过的数据的增删改查。
注意: Custom Jar所使用的Java程序可以只做针对Event的Base64加密处理
REST可以在OSA上添加REST类型的Stage执行写入到Hbase
OSA目前版本尚未支持流数据base64加密输出
本文使用Java示例程序执行Base64加密及REST操作写入到Hbase
15.创建Pipeline,选择创建【Pipeline】
指定名称,选择上述已创建好的Stream,并且保存。
16.添加已创建的Custom Stage到Pipeline上
右击Pipeline上的Stream标识,选择【Custom Stage from Custom Jar】
17. 指定【Custom Stage】名称,点击保存
18.在Custom Stage的配置选项,我们选择之前已经上传的Custom Jar对象
19.在【Input Mapping】选项,需要把Stream内容和Custom Stage进行关联
并且点击【Publish】发布
注意:如果不选择Publish则无法把OSA输出进行写入操作到Hbase
JOB_ID=after_JOB_ID、JOB_TITILE=after_JOB_TITLE、MIN_SALARY= after_ MIN_SALARY 、MAX_SALARY= after_ MAX_SALARY
20.【Publish】的Pipeline将被锁定,如需更改配置点击【Unpublish】进行更改
Publish的Pipeline如下:
在下方可以看到实时流数据的监听
21.至此为止,OSA端的配置已完成。最后一步为测试与演示。
测试与演示
1.源端数据库插入一条数据并提交
2.OGG源端捕获确认(Extract和Pump)
3.OGG目标端Kafka投递确认
4.Kafka端消费信息确认
5.Kafka端显示以下纪录已经消费,OP_TYPE代表操作类型,
OP_TYPE=I ,I表示这是一条INSERT插入操作。
6.查看OSA Stream监听状态
点击【kafka-stream】图标,可以看到该条记录已经被Kafka Stream正确的监听
7.查看OSA输出结果
点击【restapi】图标,可以看到OSA输出结果
8.使用HBase写入确认
从HBase的HR_JOBS表中,可以看到源端插入的数据已经写入到该表中
写在最后
Oracle Stream Analytics 还具有以下产品特性:
1.提供友好的图形交互页面,集成可视化并可以通过Java语言进行扩展
2. 提供丰富的内建流模式库并可通过Java进行扩展
3. 与位置和地理空间功能集成
4. 预测分析与机器学习集成
5. 可对接Druid Superset
Oracle Stream Analytics支持Continuous Query Language (CQL) ,构建于分布式分内存计算网格框架之上,使得查询处理的性能可得到线性增长。学习成本低,使得业务人员和开发人员可以更大限度的关注业务而非应用技术架构。是您快速构建企业级大数据流实时分析计算平台不二之选。