Service Broker应用(1):简介、同server不同DB间的数据传输

时间:2021-08-30 16:07:25

简介:SQL Server Service Broker,以下简称SSB,是一种完全基于MSSQL数据库的数据处理技术,为短时间内处理大量数据提供了一种可靠、稳定、高效的解决方案。一次同步的数据最大可达2G,采用二进制传输,多线程处理数据。可以理解为数据库中的消息中间件。

  根据负载类型分,SSB有Windows负载类型和证书类型,由于证书类型不支持跨集群的数据传输,故不讨论,用Windows负载类型。后续脚本都是Windows负载类型的脚本。

  SQL Server版本在2008及以上。

应用场景

  • 外部数据进入DB(如XML、TXT等文档中的数据,文档中的数据需用程序读取)
  • DB之间的数据同步:包括同server不同DB,不同server之间,集群之间的数据同步

优点:安全、稳定可靠、解耦合、多线程、处理数据很快、对网络的实时连接要求不高

缺点:跨集群的解决方案很复杂,实施起来不易;查找数据处理失败的异常原因比较困难;

实施方案--同server不同DB:新建数据库的步骤省略,跨server的实施方案见下一个篇

消息发送方配置脚本:

--第一步:用户数据库开启service broker服务
USE master
GO
select is_broker_enabled,service_broker_guid,* from sys.databases where name IN ('QEC_Interface')--is_broker_enabled是1表示开启,0反之
ALTER DATABASE QEC_Interface SET NEW_BROKER WITH ROLLBACK IMMEDIATE --遇到无法执行下面的语句时,先执行这一条语句:回滚事务 ALTER DATABASE QEC_Interface SET ENABLE_BROKER
ALTER DATABASE QEC_Interface SET TRUSTWORTHY ON GO --第二步:发送方用户数据库上创建消息类型:消息类型的名称要与接收方的名称完全相同(区分大小写)
use QEC_Interface
go
create message type [//qciscm05/QEC_Interface/NoneCustomsSend]
validation = well_formed_xml
--For Receiving MessageType
create message type [//qciscm05/QEC_Interface/NoneCustomsReceived]
validation = well_formed_xml
--第三步:用户数据库上创建约定:约定的名称要与接收方的名称完全相同(区分大小写)
create contract [//qciscm05/QEC_Interface/NoneCustomsContract]
( [//qciscm05/QEC_Interface/NoneCustomsSend] SENT BY INITIATOR,
[//qciscm05/QEC_Interface/NoneCustomsReceived] SENT BY TARGET
)
--第四步:用户数据库上创建队列
create queue InterfaceNoneCustomsQueue
with status=on
--第五步:用户数据库上创建服务
create service [//qciscm05/QEC_Interface/NoneCustomsService]
on queue InterfaceNoneCustomsQueue
([//qciscm05/QEC_Interface/NoneCustomsContract]) -----------------------------------------------------
--目标队列绑定SP,自动处理反馈消息
alter queue InterfaceNoneCustomsQueue
with status = on,
activation(
status = ON,
procedure_name= [dbo].[SP_SSB_HandleReceiveNoneCustoms],
max_queue_readers=50,
execute as self
); ---=手动发送消息 exec SP_SSB_SendNoneCustoms 'TEHP1301996QN','<XML>
<RESPONSECONTENT>
<RESPONSE_MESID>C4EE2C5C758A4656800B6617BAC5B541</RESPONSE_MESID>
<APPL_TYPE>A</APPL_TYPE>
<DECLARE_TYPE>GJN</DECLARE_TYPE>
<STEP_ID />
<SEQ_NO>TEHP1301996QN</SEQ_NO>
<JOB_NO>GJAtestOK</JOB_NO>
<RESULT_CODE>OK01</RESULT_CODE>
<RESULT_INFO />
</RESPONSECONTENT>
</XML>' --==基本查询操作
select conversation_handle,state_desc,* from sys.conversation_endpoints--查看当前数据库中开启的会话
select conversation_handle,cast(message_body as xml),* from InterfaceNoneCustomsQueue--查看队列中的消息
select transmission_status,cast(message_body as xml),* from sys.transmission_queue --查看当期数据库中待传送的消息

发送方脚本

消息接受方配置脚本:

--第一步:用户数据库开启service broker服务
use master
go
select is_broker_enabled,service_broker_guid,* from sys.databases where name IN ('QEC')
ALTER DATABASE QEC SET NEW_BROKER WITH ROLLBACK IMMEDIATE --遇到无法执行下面的语句时,先执行这一条语句 ALTER DATABASE QEC set ENABLE_BROKER
ALTER DATABASE QEC SET TRUSTWORTHY ON go --第二步:发送方用户数据库上创建消息类型:消息类型的名称要与发送方的名称完全相同(区分大小写)
use QEC
go
create message type [//qciscm05/QEC_Interface/NoneCustomsSend]
validation = well_formed_xml
--For Receiving MessageType
create message type [//qciscm05/QEC_Interface/NoneCustomsReceived]
validation = well_formed_xml
--第三步:用户数据库上创建约定:约定的名称要月发送方的名称完全相同(区分大小写)
create contract [//qciscm05/QEC_Interface/NoneCustomsContract]
( [//qciscm05/QEC_Interface/NoneCustomsSend] SENT BY INITIATOR,
[//qciscm05/QEC_Interface/NoneCustomsReceived] SENT BY TARGET
)
--第四步:用户数据库上创建队列
create queue QECNoneCustomsQueue
with status=on
--第五步:用户数据库上创建服务
create service [//qciscm05/QEC/NoneCustomsService]
on queue QECNoneCustomsQueue
([//qciscm05/QEC_Interface/NoneCustomsContract]) --==队列绑定SP,自动处理队列中接收到的消息
alter queue QECNoneCustomsQueue
with status = on, --on设置队列为启用,off相反
activation(
status = on, --on设置绑定的SP为激活状态,off相反
procedure_name=SP_SSB_HandleMessageForNoneCustoms, --指定要绑定的SP
max_queue_readers=50, --设置并发数:SP同时处理消息的进程数,多线程(最大值32767)
execute as self --指定SP以当前用户身份执行
);
--==
select conversation_handle,conversation_id,state_desc,* from sys.conversation_endpoints--查看当前数据库中开启的会话
select conversation_handle,cast(message_body as xml),* from QECNoneCustomsQueue--查看队列中的消息
select transmission_status,cast(message_body as xml),* from sys.transmission_queue --查看当期数据库中待传送的消息

接收方脚本