Hazelcast Support(支持)

时间:2022-12-09 11:00:53

Hazelcast Support(支持)

Spring 集成提供通道适配器和其他实用程序组件,以与内存数据网格 Hazelcast 进行交互。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>6.0.0</version>
</dependency>

Hazelcast 组件的 XML 命名空间和架构位置定义包括:

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast 事件驱动的入站通道适配器

Hazelcast提供分布式数据结构,例如:

  • ​com.hazelcast.map.IMap​
  • ​com.hazelcast.multimap.MultiMap​
  • ​com.hazelcast.collection.IList​
  • ​com.hazelcast.collection.ISet​
  • ​com.hazelcast.collection.IQueue​
  • ​com.hazelcast.topic.ITopic​
  • ​com.hazelcast.replicatedmap.ReplicatedMap​

它还提供事件侦听器,以便侦听对这些数据结构所做的修改。

  • ​com.hazelcast.core.EntryListener<K, V>​
  • ​com.hazelcast.collection.ItemListener​
  • ​com.hazelcast.topic.MessageListener​

Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。 它支持XML和JavaConfig驱动的配置。

XML 配置:

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />

Hazelcast 事件驱动的入站通道适配器需要以下属性:

  • ​channel​​:指定消息发送到的通道;
  • ​cache​​:指定侦听的分布式对象引用。 这是一个强制性属性;
  • ​cache-events​​:指定侦听的缓存事件。 它是一个可选属性,其默认值为 。 其支持的值如下:ADDED
  • 和 支持的缓存事件类型:、、、 和IMapMultiMapADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;
  • 支持的缓存事件类型:、、、ReplicatedMapADDEDREMOVEDUPDATEDEVICTED;
  • 支持的缓存事件类型 和 : , 。 没有 的缓存事件类型。IListISetIQueueADDEDREMOVEDITopic
  • ​cache-listening-policy​​:将缓存侦听策略指定为 或 。 它是一个可选属性,其默认值为 。 侦听具有相同缓存事件属性的同一缓存对象的每个 Hazelcast 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同缓存事件属性的同一缓存对象的所有 Hazelcast 入站通道适配器都将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLEALLSINGLEALLSINGLE

一些配置示例:

分布式地图

<int:channel />

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />

<bean factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>

<bean class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>

分布式多地图

<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>

分布式列表

<int-hazelcast:inbound-channel-adapter  channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />

<bean factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>

分布式集

<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>

分布式队列

<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />

<bean factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>

分布式主题

<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>

复制的地图

<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />

<bean factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>

Java 配置示例:

以下示例显示了一个配置。 相同的配置可用于其他分布式数据结构(、、、、 和 ):​​DistributedMap​​​​IMap​​​​MultiMap​​​​ReplicatedMap​​​​IList​​​​ISet​​​​IQueue​​​​ITopic​

@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

return producer;
}

榛子连续查询入站通道适配器

Hazelcast 连续查询允许侦听对特定地图条目执行的修改。 Hazelcast 连续查询入站通道适配器是一个事件驱动的通道适配器,它根据定义的谓词侦听相关的分布式映射事件。

@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);

return producer;
}

它支持六个属性,如下所示:

  • ​channel​​:指定消息发送到的通道;
  • ​cache​​:指定侦听的分布式映射引用。 命令的;
  • ​cache-events​​:指定侦听的缓存事件。 作为其默认值的可选属性。 支持的值为 、、 和ADDEDADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;
  • ​predicate​​:指定用于侦听对特定映射条目执行的修改的谓词。 命令的;
  • ​include-value​​:指定在连续查询结果中包含值和旧值。 可选为默认值;true
  • ​cache-listening-policy​​:将缓存侦听策略指定为 或 。 可选,默认值为 。 每个侦听具有相同缓存事件属性的相同缓存对象的 Hazelcast CQ 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是,则侦听具有相同缓存事件属性的同一缓存对象的所有 Hazelcast CQ 入站通道适配器都将收到所有事件消息。 如果是,他们将收到唯一的事件消息。SINGLEALLSINGLEALLSINGLE

Hazelcast 群集监视器入站通道适配器

Hazelcast 群集监视器支持侦听在群集上执行的修改。 Hazelcast 群集监视器入站通道适配器是一个事件驱动的通道适配器,侦听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:

@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

return producer;
}

它支持以下三个属性:

  • ​channel​​:指定消息发送到的通道;
  • ​hazelcast-instance​​:指定用于侦听群集事件的 Hazelcast 实例引用。 这是一个强制性属性;
  • ​monitor-types​​:指定侦听的监视器类型。 它是一个可选属性,是默认值。 支持的值为 、、、、。MEMBERSHIPMEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENT

Hazelcast 分布式 SQL 入站通道适配器

Hazelcast允许在分布式地图上运行分布式查询。 Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。 它运行定义的分布式 sql 命令,并根据迭代类型返回结果。

@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

return messageSource;
}

它需要一个轮询器并支持四个属性:

  • ​channel​​:指定消息发送到的通道。 这是一个强制性属性;
  • ​cache​​:指定要查询的分布式引用。 它是必需属性;IMap
  • ​iteration-type​​:指定结果类型。 分布式 SQL 可以在 、 或 上运行。 它是一个可选属性,是默认属性。 支持的值为 ,和EntrySetKeySetLocalKeySetValuesVALUEENTRY, `KEYLOCAL_KEYVALUE;
  • ​distributed-sql​​:指定 sql 语句的 where 子句。 这是一个必需属性。

榛子出站通道适配器

Hazelcast 出站通道适配器侦听其定义的通道,并将传入消息写入相关的分布式缓存。 它需要 之一 或 用于分布式对象定义。 支持的分布式对象包括:、 和 。​​cache​​​​cache-expression​​​​HazelcastHeaders.CACHE_NAME​​​​IMap​​​​MultiMap​​​​ReplicatedMap​​​​IList​​​​ISet​​​​IQueue​​​​ITopic​

@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}

它需要以下属性:

  • ​channel​​:指定消息发送到的通道;
  • ​cache​​:指定分布式对象引用。 自选;
  • ​cache-expression​​:通过 Spring 表达式语言 (SpEL) 指定分布式对象。 自选;
  • ​key-expression​​:通过 Spring 表达式语言 (SpEL) 指定键值对的键。 可选且仅对 和分布式数据结构是必需的。IMapMultiMapReplicatedMap
  • ​extract-payload​​:指定是发送整条消息还是仅发送有效负载。 默认属性为可选属性。 如果为 true,则只有有效负载将写入分布式对象。 否则,将通过转换消息头和有效负载来写入整个消息。true

通过在标头中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。 如果未定义 或 属性,则必须在请求中设置标头。​​cache​​​​cache-expression​​​​HazelcastHeaders.CACHE_NAME​​​​Message​

榛子领袖选举

如果需要领导者选举(例如,对于只有一个节点应该接收消息的高可用性消息使用者),可以使用基于 Hazelcast 的:​​LeaderInitiator​

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}

当一个节点被选为领导者时,它将向所有应用程序侦听器发送一个。​​OnGrantedEvent​

榛子消息存储

对于分布式消息传递状态管理,例如对于持久或跟踪消息组,提供了以下实现:​​QueueChannel​​​​Aggregator​​​​HazelcastMessageStore​

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}

默认情况下,用于将消息和组存储为键/值。 任何自定义都可以提供给 .​​SPRING_INTEGRATION_MESSAGE_STORE​​​​IMap​​​​IMap​​​​HazelcastMessageStore​

榛子广播元数据存储

可以使用后备 Hazelcast 来实现 。 默认地图是使用可自定义的名称创建的。​​ListenableMetadataStore​​​​IMap​​​​SPRING_INTEGRATION_METADATA_STORE​

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}

该实现允许您注册自己的侦听器类型以通过 侦听事件。​​HazelcastMetadataStore​​​​ListenableMetadataStore​​​​MetadataStoreListener​​​​addListener(MetadataStoreListener callback)​

榛子锁注册表

可以使用后备 Hazelcast 分布式支持来实现 a:​​LockRegistry​​​​ILock​

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}

当与共享(例如 存储管理),可用于跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作组。​​MessageGroupStore​​​​Aggregator​​​​HazelcastLockRegistry​

对于所有分布式操作,必须在 上启用 CP 子系统。​​HazelcastInstance​