Debezium发布历史136-如果没有存货,则创建表格( 序列主键, symbol varchar(10) unique, 价格浮动8, 更新时间戳 ); 插入库存(符号、价格、上一次更新)值(‘aapl’,500.0,现在)冲突时无所作为; 插入库存(符号,价格,最新)值(“IBM”,50.0,现在在冲突中无所作为; 插入库存(符号、价格、上一次更新)值,在冲突时不做任何事; 插入库存(符号、价格、上一次更新)值(“谷歌”,100.0,现在)冲突时无所作为; 插入库存(符号、价格、上一次更新)值(‘fb’,200.0,现在)冲突时无所作为; 插入库存(符号、价格、上一次更新)值(‘amzn’,100.0,现在)冲突时无所作为; 插入库存(符号、价格、上一次更新)值(‘tsla’,500.0,现在)冲突时无所作为; 插入库存(符号、价格、上一次更新)值(“nflx”,500.0,现在)冲突时无所作为; 插入库存(符号、价格、上一次更新)值(‘ttr’,50.0,现在)冲突时无所作为; 插入库存(

时间:2024-02-18 10:51:43

"关于冲突无所作为"条款被用来避免重复条目的出现。
重新启动应用时的表。
Java代码 以随机值更新价格和时间戳。更新并不是完全随机的,应用程序使用一个非常简单的算法生成更新,非常类似于股票价格的波动。在现实的场景中,应用程序将从某些外部来源获得价格。

生产者被包装成最小的文件, 码头文件 ,并连接到后格来格:

从马文出发:3.8-JDK-11小建筑
收到。/1.xml/选择系统/库存/
收到。/特别资源中心。/选择方案/库存/战略资源中心
工作量/选择方案/库存
运行MVN清洁安装-Dskip测试
来自阿祖尔/祖卢-开放jdk:最新11
复制–来自=建造者/被占领巴勒斯坦领土/库存/目标/卡夫卡–样本-库存–*。罐/罐
CMD [“java”, “-jar”, “/stocks.jar”]
卡夫卡连接器、德贝兹和卡夫卡连接器
在我们深入研究卡夫卡连接、德贝兹和卡夫卡连接器的配置之前,让我们看看它们之间的关系。

卡夫卡连接是构建连接器以在卡夫卡和其他系统之间移动数据的框架。它支持2类连接器:

源连接器-从源系统读取数据并将其写入卡夫卡

插入连接器-读取卡夫卡的数据并将其写入接收系统

Debezum是卡夫卡连接的源连接器,可以监视和捕获数据库中的行级更改。它是什么意思?每当在数据库中插入、更新或删除一行时,Debezum将捕获更改并将其作为事件写入卡夫卡。

在技术层面上,德贝兹是在卡夫卡连接框架内运行的卡夫卡连接器。这反映在 德贝兹容器图像 ,该包卡夫卡连接与德贝兹连接器预先安装。

卡夫卡连接器也是卡夫卡连接器。它是一个接收器连接器,可以读取卡夫卡的数据并将其写入QISSTDB。我们添加了QESTDB卡夫卡连接器到德贝兹容器图像,我们得到了一个卡夫卡连接图像,两者都有德贝兹和QSTDB卡夫卡连接器安装!

这是我们用来制作图像的码头文件:

( 码头文件连接 )

来自乌本图:最新的建筑师
工作场所/选择项目
运行程序-获得更新和应用-获得安装-Y旋涡解压缩JQ
RUN curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest | jq -r ‘.assets[]|select(.content_type == “application/zip”)|.browser_download_url’|wget -qi -
运行解压缩卡夫卡-奎斯特-连接*-绑定。

从德贝兹/连接点:1.9.6.最后
COPY --from=builder /opt/kafka-questdb-connector/*.jar /kafka/connect/questdb-connector/
码头文件下载了最新发布的QESTDB卡夫卡连接器,解压缩将其复制到Debezum容器图像。生成的图像同时安装了德贝兹和奎斯特卜卡夫卡连接器:

在这里插入图片描述

加装卡夫卡连接器层

整个卡夫卡连接器与源连接器和接收器连接器完成:
在这里插入图片描述

源和汇连接器如何与卡夫卡集群和数据库一起工作

德贝兹接头
我们已经知道,Debezum是一个卡夫卡连接连接器,可以监视和捕获数据库中的行级更改。我们也有一个码头图像,有德贝兹和奎斯特卡夫卡连接器安装。然而,此时两个连接器都不运行。我们需要配置并启动它们。这是通过向卡夫卡连接RESTAPI发送一个邮件请求的CAR命令来完成的。

curl -X POST -H “Content-Type: application/json” -d '{“name”:“debezium_source”,“config”:{“tasks.max”:1,“database.hostname”:“postgres”,“database.port”:5432,“database.user”:“postgres”,“database.password”:“postgres”,“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“database.dbname”:“postgres”,“database.server.name”:“dbserver1”}} ’ localhost:8083/connectors
请求体包含德贝兹连接器的配置,让我们分解它:

{
“name”: “debezium_source”,
“config”: {
“tasks.max”: 1,
“database.hostname”: “postgres”,
“database.port”: 5432,
“database.user”: “postgres”,
“database.password”: “postgres”,
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.dbname”: “postgres”,
“database.server.name”: “dbserver1”
}
}
它听后GREGSQL数据库中的更改,并使用上述配置向卡夫卡发布。主题名称默认为..

. 在我们的例子中,它是dbserver1.public.stock .为什么?因为数据库服务器名是dbserver1 ,架构是public 我们只有一张桌子stock .

所以在我们发出请求后,德贝佐姆将开始倾听stock 把它们放在桌上发表到dbserver1.public.stock 专题。

卡夫卡连接器
在这一点上,我们有一个后格勒克表stock 充满了随机股价和卡夫卡主题dbserver1.public.stock 包含了更改。下一个步骤是将Qustdb卡夫卡连接器配置为从dbserver1.public.stock 主题并将数据写到查询数据库。

让我们更深入地研究一下 启动卡夫卡连接槽 :

{
“name”: “questdb-connect”,
“config”: {
“topics”: “dbserver1.public.stock”,
“table”: “stock”,
“connector.class”: “io.questdb.kafka.QuestDBSinkConnector”,
“tasks.max”: “1”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“host”: “questdb”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“include.key”: “false”,
“symbols”: “symbol”,
“timestamp.field.name”: “last_update”
}
}
这里需要注意的是:

table 和topics :"卡夫卡"连接器将创建带有名称的"卡夫卡"表stock 把数据写在dbserver1.public.stock 它的主题。

host :卡夫卡连接器将连接到运行在questdb 招待。这是查询容器的名称。

connector.class:卡夫卡连接器类名称。这告诉卡夫卡连接使用卡夫卡连接器。

value.converter :德贝兹连接器以JSON格式生成数据。这就是为什么我们需要配置查询数据库连接器来使用JSON转换器来读取数据:org.apache.kafka.connect.json.JsonConverter .

symbols *股票符号被翻译成 查询符号类型 ,用于低基数的字串值(例如。…

timestamp.field.name当前位置:由于QESTDB对时间戳和基于时间戳的分区有很大的支持,我们可以指定指定的时间戳列。

transforms :打开场用途io.debezium.transforms.ExtractNewRecordState 类型只提取新数据,而不提取Debezns发布的元数据。换句话说,这是一个过滤器payload.after 关于卡夫卡主题的德贝兹数据的一部分。看到 文件 更多的细节。

…ExtractNewRecordState 转换可能是配置中最不直观的部分。让我们更仔细地看看它:简而言之,对于后面的SQL表中的每一个变化,Debezum都会向卡夫卡主题发送一个JSON消息,比如:

{
“schema”: “This JSON key contains Debezium message schema. It’s not very relevant for this sample. Omitted for brevity.”,
“payload”: {
“before”: null,
“after”: {
“id”: 8,
“symbol”: “NFLX”,
“price”: 1544.3357414199545,
“last_update”: 1666172978269856
}
},
“source”: {
“version”: “1.9.6.Final”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1666172978272,
“snapshot”: “false”,
“db”: “postgres”,
“sequence”: “[“87397208”,“87397208”]”,
“schema”: “public”,
“table”: “stock”,
“txId”: 402087,
“lsn”: 87397208,
“xmin”: null
},
“op”: “u”,
“ts_ms”: 1666172978637,
“transaction”: null
}
如果你被这条信息的尺寸吓到了,不要害怕。大多数字段是元数据,它们与此示例无关。看 德贝兹文件 想知道更多细节。重要的是,我们不能将整个JSON消息推到查询数据库,而且我们也不希望查询数据库中的所有元数据。我们需要取出payload.after 邮件的一部分,然后将其推到查询。这正是ExtractNewRecordState 转换后会:它将大消息转换为一个较小的信息,只包含payload.after 一部分信息。因此,这条信息看起来就像这样:

{
“id”: 8,
“symbol”: “NFLX”,
“price”: 1544.3357414199545,
“last_update”: 1666172978269856
}
这是我们可以推敲的信息。卡夫卡连接器将读取此消息并将其写入到卡夫达表。如果QESTB卡夫卡表不存在,则该连接器也将创建该表。查询表将具有与JSON消息相同的模式–其中每个JSON字段将是查询表中的一列。

问题b和格拉法纳
一旦数据写入了查询表,我们就可以更容易地处理时间序列数据。由于QESTDB是兼容后GREQLY协议的,所以我们可以使用格拉法纳上的后GRESQL数据源来可视化数据。预先配置的仪表板使用以下查询:

SELECT
$__time(timestamp),
min(price) as low,
max(price) as high,
first(price) as open,
last(price) as close
FROM
stock
WHERE
KaTeX parse error: Expected group after '_' at position 1: _̲_timeFilter(tim…Symbol’
SAMPLE BY $Interval ALIGN TO CALENDAR;
我们已经创建了一个系统,它可以连续地追踪和存储多个股票的最新价格。然后这些价格作为事件通过德贝齐姆输入卡夫卡,这抓住了每一个价格变化。卡夫卡连接器读取卡夫卡的这些事件,并将每一个变化作为卡夫卡的一个新行存储,使我们能够保留一个完整的股票价格历史。这段历史可以通过使用工具(例如格拉法纳)进行分析和可视化,如蜡烛图所示。

下一步
这个示例项目是将数据从关系数据库流到优化的时间序列数据库的基本参考体系结构。对于使用后GERGSQL的现有项目,可以对Debezum进行配置,以启动将数据流到查询db,并利用时间序列查询和分区的优势。对于同时存储原始历史数据的数据库,采用Debezum可能需要一些架构更改。然而,这是有益的,因为这是一个在事务数据库和分析性时序数据库之间改进性能和建立服务边界的机会。

这个引用架构也可以扩展到配置卡夫卡连接到流到其他数据仓库进行长期存储。在检查数据后,测试数据也可以配置为为更长期的存储,甚至可以对数据进行向下采样。 分离分区以节省空间 .

把这个给我 抽样应用 尝试加入 休闲社区 如果你有任何问题的话。