FlinkCDC将MySQL接入Doris实战
1. 环境介绍:
Soft | Verison |
---|---|
MySQL | 5.7.34 |
Flink | 1.14.5 |
Doris | 1.1.0 |
2. MySQL环境安装和配置
-
MySQL安装:
参照:/mysql/
-
MySQL启用bin-log
Flink CDC 通过订阅MySQL的binglog实时将数据同步到Doris,因此需要启用MySQL binglog功能。
- 编辑文件
vim /etc/
- 在文件中加入如下配置
log-bin=mysql-bin binlog_format=Row
-
重启MySQL
systemctl restart mysqld
-
查看binlog日志是否打开
mysql> show variables like "log_bin"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.00 sec) mysql>
库表数据准备
-
登录MySQL
mysql -h 127.0.0.1 -P 3306 -uroot
-
创建数据库和表
create database example_db; CREATE TABLE `test_cdc` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
-
测试数据写入
insert into test_cdc (id,name) values(1,'1') ;
-
测试数据验证
mysql> select * from test_cdc; +----+------+ | id | name | +----+------+ | 1 | 1 | 4 rows in set (0.00 sec) mysql>
2. Doris环境安装和配置
-
Doris安装:
Doris安装参照官网:/zh-CN/docs/get-starting/
-
Doirs库表建立
mysql -h 127.0.0.1 -P 9030 -uroot; create database example_db; CREATE TABLE IF NOT EXISTS example_db.expamle_tbl ( `id` LARGEINT NOT NULL COMMENT "用户id", `name` VARCHAR(50) NOT NULL COMMENT "用户昵称" ) UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = ": 1" );
3. Flink环境安装和配置:这里需要注意Scala的版本要对应
-
Java环境安装:略
-
Scala环境安装:
-
下载Scala【注意这里使用2.12.15版本】:
mkdir -p /opt cd /opt/ wget /scala/2.12.15/scala-2.12. tar -xzvf scala-2.12.
-
2. 配置Scala:
vim /etc/profile
export SCALA_HOME=/opt/scala-2.12.15
export PATH=$PATH:$SCALA_HOME/bin
#使配置文件生效
. /etc/profile
3. Scala版本验证:
[root@localhost ~]# scala -version
Scala code runner version 2.12.15 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.
[root@localhost ~]#
-
Flink 环境安装:
-
Flink 下载【注意这里使用2.12版本】
cd /opt/ wget /flink/flink-1.14.5/flink-1.14.5-bin-scala_2. tar -xzvf flink-1.14.5-bin-scala_2.
其中1.14.5是Flink版本,2.12我Scala版本。
-
在Flink 的lib目录加入Flnk CDC依赖包
cd /opt/flink-1.14.5/lib #下载mysql-cdc wget /maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2. #下载flink-doris-connector wget /maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.1.0/flink-doris-connector-1.14_2.12-1.1.
注意:
- flink-cdc版本需要和Flink的版本对应。具体参照Flink CDC官网:/flink-cdc-connectors/master/content/#supported-connectors 可以看到Flink1.13.*, 1.14.*对应2.2.*版本
Flink® CDC Version | Flink® Version |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13., 1.14. |
Flink CDC对应Jar包版本的地址:/ververica/flink-cdc-connectors/releases
Download
flink-sql-connector-mongodb-cdc-2.2.
flink-sql-connector-mysql-cdc-2.2.
flink-sql-connector-oceanbase-cdc-2.2.
flink-sql-connector-oracle-cdc-2.2.
flink-sql-connector-postgres-cdc-2.2.
flink-sql-connector-sqlserver-cdc-2.2.
flink-sql-connector-tidb-cdc-2.2.
- flink-doris-connector的Scala版本需要和Flink 的Scala版本对应
flink-doris-connector下载地址为:/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/
其中1.14为支持的Flink版本,2.12为支持的Scala版本。
- 启动Flink
[root@localhost flink-1.14.5]# cd /opt/flink-1.14.5/
[root@localhost flink-1.14.5]# bin/
Starting cluster.
Starting standalonesession daemon on host .
Starting taskexecutor daemon on host .
4. 通过Flink CDC将MySQL数据同步到Doirs任务编写
- Flink SQL客户端启动
./bin/ embedded
set -mode=tableau;
- 设置Flink CheckPoint:这里必须设置Flink CheckPoint,因为Flink CDC For Doirs是基于CheckPoint提交事务请求的。
SET '' = '10s';
- Flink MySQL数据源定义
CREATE TABLE cdc_mysql_source ( id int ,name VARCHAR ,PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'example_db', 'table-name' = 'test_cdc');
- Flink Doirs Sink(数据输出)定义
CREATE TABLE doris_sink (id INT,name STRING) WITH ( 'connector' = 'doris', 'fenodes' = '127.0.0.1:8030', '' = 'example_db.expamle_tbl', 'username' = 'root', 'password' = '', '-2pc'='false', '-prefix' = 'doris_label');
- 定义写出任务,将通过insert into…将cdc_mysql_source数据实时同步到doris_sink
insert into doris_sink select id,name from cdc_mysql_source;
说明:Flink CDC首次启动会全量同步一次历史数据,等全量数据同步完成后会开启增量同步任务。
- Doris数据验证
mysql -h 127.0.0.1 -P 9030 -uroot
mysql> use example_db
Database changed
mysql>
mysql> select * from expamle_tbl;
+------+------+
| id | name |
+------+------+
| 1 | 1 |
1 rows in set (0.02 sec)
- Flink日志查看
2022-09-05 19:46:30,792 INFO [] - load Result {
"TxnId": 1125,
"Label": "doris_label_0_1662378388355",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1,
"NumberLoadedRows": 1,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 8,
"LoadTimeMs": 123,
"BeginTxnTimeMs": 2,
"StreamLoadPutTimeMs": 108,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 2256,
"CommitAndPublishTimeMs": 60
}
通过日志可可以看到Flink数据写入成功了。
5. 增量同步验证
- 增量同步验证:MySQL写入数据
insert into test_cdc (id,name) values(2,'2') ;
2 . 增量同步验证:Doris数据查看
mysql -h 127.0.0.1 -P 9030 -uroot
mysql> use example_db
Database changed
mysql>
mysql> select * from expamle_tbl;
+------+------+
| id | name |
+------+------+
| 1 | 1 |
| 2 | 2 |
2 rows in set (0.02 sec)
-
新书宣传
最后宣传下我的书:
1 . 《图解Spark 大数据快速分析实战(异步图书出品)》 /
2. 《Offer来了:Java面试核心知识点精讲(第2版)(博文视点出品)》/
3. 《Offer来了:Java面试核心知识点精讲(原理篇)(博文视点出品)》/
4. 《Offer来了:Java面试核心知识点精讲(框架篇)(博文视点出品)》/