Spring Integration 消息

时间:2022-12-07 12:06:57

Spring Integration 消息

Spring 集成是数据的通用容器。 任何对象都可以作为有效负载提供,每个实例都包含包含用户可扩展属性作为键值对的标头。​​Message​​​​Message​

界面​​Message​

以下清单显示了接口的定义:​​Message​

public interface Message<T> {

T getPayload();

MessageHeaders getHeaders();

}

接口是 API 的核心部分。 通过将数据封装在通用包装器中,邮件系统可以在不了解数据类型的情况下传递数据。 随着应用程序发展到支持新类型,或者当类型本身被修改或扩展时,邮件系统不受影响。 另一方面,当消息传递系统中的某些组件确实需要访问有关的信息时,此类元数据通常可以存储到消息头中的元数据并从中检索。​​Message​​​​Message​

邮件头

正如Spring Integration允许any用作a的有效载荷一样,它也支持anytype作为标头值。 实际上,该类实现了,如以下类定义所示:​​Object​​​​Message​​​​Object​​​​MessageHeaders​​​​java.util.Map_ interface​

public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}

即使类实现了,它实际上也是一个只读实现。 映射中的任何尝试 to 值都会导致 an. 这同样适用于。 由于消息可以传递给多个消费者,因此无法修改其结构。 同样,消息的有效负载不能在初始创建之后。 但是,标头值本身(或有效负载对象)的可变性有意留给框架用户决定。​​MessageHeaders​​​​Map​​​​put​​​​UnsupportedOperationException​​​​remove​​​​clear​​​​Map​​​​Object​​​​set​

作为实现,可以通过调用标头的名称来检索标头。 或者,您可以提供预期作为附加参数。 更好的是,当检索预定义的值之一时,可以使用方便的吸气器。 以下示例分别显示了这三个选项:​​Map​​​​get(..)​​​​Class​

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息标头:

表 1.预定义的消息头

标头名称

标头类型

用法


MessageHeaders.ID



java.util.UUID


此消息实例的标识符。 每次消息发生突变时都会更改。


MessageHeaders.
TIMESTAMP



java.lang.Long


创建消息的时间。 每次消息发生突变时都会更改。


MessageHeaders.
REPLY_CHANNEL



java.lang.Object
(String or
MessageChannel)


在未配置显式输出通道且没有或已用尽时向其发送回复(如果有)的通道。 如果值为 a,则它必须表示 Bean 名称或由​​ROUTING_SLIP​​​​ROUTING_SLIP​​​​String​​​​ChannelRegistry.​


MessageHeaders.
ERROR_CHANNEL



java.lang.Object
(String or
MessageChannel)


将错误发送到的通道。 如果值为 a,则它必须表示 Bean 名称或由​​String​​​​ChannelRegistry.​

许多入站和出站适配器实现还提供或需要某些标头,您可以配置其他用户定义的标头。 这些标头的常量可以在存在此类标头的模块中找到,例如,,等等。​​AmqpHeaders​​​​JmsHeaders​

​MessageHeaderAccessor​​应用程序接口

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至模块,并引入了 API 以提供对消息传递实现的额外抽象。 所有(核心)Spring 集成特定的消息头常量现在都在类中声明。 下表描述了预定义的消息标头:​​spring-messaging​​​​MessageHeaderAccessor​​​​IntegrationMessageHeaderAccessor​

表 2.预定义的消息头

标头名称

标头类型

用法


IntegrationMessageHeaderAccessor.
CORRELATION_ID



java.lang.Object


用于关联两条或多条消息。


IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER



java.lang.Integer


通常,带有一组带有 abut 的消息的序列号也可用于 ato 对无限的消息组重新排序。​​SEQUENCE_SIZE​​​​<resequencer/>​


IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE



java.lang.Integer


一组相关消息中的消息数。


IntegrationMessageHeaderAccessor.
EXPIRATION_DATE



java.lang.Long


指示消息何时过期。 不直接由框架使用,但可以使用标头扩充器进行设置,并在配置了 an 的 a 中使用。​​<filter/>​​​​UnexpiredMessageSelector​


IntegrationMessageHeaderAccessor.
PRIORITY



java.lang.Integer


消息优先级 — 例如,在 a 中。​​PriorityChannel​


IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE



java.lang.Boolean


如果消息被幂等接收方拦截器检测为重复,则为 True。 请参阅幂等接收器企业集成模式。


IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE



java.io.Closeable


如果消息与消息处理完成后应关闭的 关联,则存在此标头。 例如,与使用 FTP、SFTP 等的流文件传输相关联。​​Closeable​​​​Session​


IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT



java.lang.
AtomicInteger


如果消息驱动的通道适配器支持 a 的配置,则此标头包含当前传递尝试。​​RetryTemplate​


IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK



o.s.i.support.
Acknowledgment
Callback


如果入站终端节点支持,则回调以接受、拒绝或重新排队消息。 请参阅延迟确认可轮询消息源​和​​MQTT 手动确认​​。

类中提供了其中一些标头的方便的类型化 getter,如以下示例所示:​​IntegrationMessageHeaderAccessor​

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在 但是 通常不被用户代码使用的标头(也就是说,它们通常由 Spring 集成的内部部分使用 — 为了完整起见,此处包含它们):​​IntegrationMessageHeaderAccessor​

表 3.预定义的消息头

标头名称

标头类型

用法


IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS



java.util.
List<List<Object>>


需要嵌套相关性时使用的相关数据堆栈(例如,)。​​splitter→…→splitter→…→aggregator→…→aggregator​


IntegrationMessageHeaderAccessor.
ROUTING_SLIP



java.util.
Map<List<Object>, Integer>


请参阅传送名单。

消息 ID 生成

当消息通过应用程序转换时,每次它都会发生突变(例如, 通过转换器)分配一个新的消息 ID。 消息 ID 为 a。 从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的实现更有效。 它使用基于安全随机种子的简单随机数,而不是每次都创建安全随机数。​​UUID​​​​java.util.UUID.randomUUID()​

可以通过声明在应用程序上下文中实现的 Bean 来选择不同的 UUID 生成策略。​​org.springframework.util.IdGenerator​

类装入器中只能使用一个 UUID 生成策略。 这意味着,如果两个或多个应用程序上下文在同一个类装入器中运行,它们将共享相同的策略。 如果其中一个上下文更改了策略,则所有上下文都会使用该上下文。 如果同一类装入器中的两个或多个上下文声明了类型的 Bean,则它们都必须是同一类的实例。 否则,尝试替换自定义策略的上下文将无法初始化。 如果策略相同,但已参数化,则使用要初始化的第一个上下文中的策略。​​org.springframework.util.IdGenerator​

除了默认策略之外,还提供了两个额外的策略。使用以前的机制。 当不需要 UUID 并且简单的递增值就足够时,您可以使用。​​IdGenerators​​​​org.springframework.util.JdkIdGenerator​​​​UUID.randomUUID()​​​​o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator​

只读标头

Theand是只读标头,不能被覆盖。​​MessageHeaders.ID​​​​MessageHeaders.TIMESTAMP​

从版本 4.3.2 开始,提供了 API 来自定义不应从上游复制的标头列表。 默认情况下,只有 theand是只读的。 提供全局属性(请参阅全局属性)是为了自定义框架组件。 当您不想填充一些现成的标头时,这可能很有用,例如JSON 转换器。​​MessageBuilder​​​​readOnlyHeaders(String… readOnlyHeaders)​​​​Message​​​​MessageHeaders.ID​​​​MessageHeaders.TIMESTAMP​​​​spring.integration.readOnly.headers​​​​DefaultMessageBuilderFactory​​​​contentType​​​​ObjectToJsonTransformer​

当您尝试使用 构建新消息时,将忽略此类标头,并向日志发出特定消息。​​MessageBuilder​​​​INFO​

从版本 5.0 开始,消息网关、标头扩充器、内容扩充器和标头筛选器不允许您在使用时配置 andheader 名称,并且它们会抛出。​​MessageHeaders.ID​​​​MessageHeaders.TIMESTAMP​​​​DefaultMessageBuilderFactory​​​​BeanInitializationException​

标头传播

通常,当消息生成终结点(如服务激活器)处理(和修改)消息时,入站标头将传播到出站消息。 一个例外是转换器,当完整的消息返回到框架时。 在这种情况下,用户代码负责整个出站消息。 当转换器仅返回有效负载时,将传播入站标头。 此外,仅当出站消息中尚不存在标头时,才会传播标头,从而允许您根据需要更改标头值。

从版本 4.3.10 开始,您可以配置消息处理程序(修改消息并生成输出)以禁止特定标头的传播。 要配置您不想复制的标头,请在抽象类上调用 theormethods。​​setNotPropagatedHeaders()​​​​addNotPropagatedHeaders()​​​​MessageProducingMessageHandler​

还可以通过将属性设置为逗号分隔的标头列表来全局禁止特定消息标头的传播。​​readOnlyHeaders​​​​META-INF/spring.integration.properties​

从版本 5.0 开始,上的实现应用简单的模式 (,​​,*xxx​​ 或) 以允许过滤具有通用后缀或前缀的标头。 参见PatternMatchUtilsJavadoc了解更多信息。 当其中一个模式为(星号)时,不会传播任何标头。 忽略所有其他模式。 在这种情况下,服务激活器的行为方式与转换器相同,并且必须从服务方法中提供任何所需的标头。 该选项在Java DSL中可用 它也可用于组件的XML配置作为属性。​​setNotPropagatedHeaders()​​​​AbstractMessageProducingHandler​​​​xxx*​​​xxx​​​xxx*yyy​​​​*​​​​Message​​​​notPropagatedHeaders()​​​​ConsumerEndpointSpec​​​​<service-activator>​​​​not-propagated-headers​

标头传播抑制不适用于不修改消息的终结点,例如网桥​和路由器。

消息实现

接口的基本实现是,它提供了两个构造函数,如以下清单所示:​​Message​​​​GenericMessage<T>​

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建 ais 时,会生成一个随机的唯一 ID。 接受 aof 标头的构造函数将提供的标头复制到新创建的标头。​​Message​​​​Map​​​​Message​

还有一个方便的实现设计用于传达错误条件。 此实现将对象作为其有效负载,如以下示例所示:​​Message​​​​Throwable​

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了基类参数化的事实。 因此,如两个示例所示,检索有效负载时不需要强制转换。​​GenericMessage​​​​Message​​​​Object​

帮助程序类​​MessageBuilder​

您可能会注意到,该接口为其有效负载和标头定义了检索方法,但没有提供资源库。 这样做的原因是 a 在初始创建后无法修改。 因此,当将实例发送给多个使用者时(例如, 通过发布-订阅通道),如果其中一个使用者需要使用不同的有效负载类型发送回复,则必须创建一个新的。 因此,其他使用者不受这些更改的影响。 请记住,多个使用者可以访问相同的有效负载实例或标头值,并且此类实例本身是否不可变是留给您的决定。 换句话说,契约类似于不可修改的契约,地图进一步说明了这一点。 即使 theclass 实现了,任何在实例上调用操作(或“删除”或“清除”)的尝试都会导致 an。​​Message​​​​Message​​​​Message​​​​Message​​​​Message​​​​Collection​​​​MessageHeaders​​​​MessageHeaders​​​​java.util.Map​​​​put​​​​MessageHeaders​​​​UnsupportedOperationException​

Spring Integration 不需要将 Map 的创建和填充传递到 GenericMessage 构造函数中,而是提供了一种更方便的构造 Messages: 的方法。 提供了两种工厂方法,用于从具有有效负载的现有实例或创建实例。 从现有构建时,其标头和有效负载将复制到新文件,如以下示例所示:​​MessageBuilder​​​​MessageBuilder​​​​Message​​​​Message​​​​Object​​​​Message​​​​Message​​​​Message​

Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要使用新的有效负载创建 a,但仍想从现有负载复制标头,则可以使用“copy”方法之一,如以下示例所示:​​Message​​​​Message​

Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();

Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,该方法不会覆盖现有值。 此外,在前面的示例中,您可以看到如何使用 设置任何用户定义的标头。 最后,有可用于预定义标头的方法以及用于设置任何标头的非破坏性方法(也为预定义标头名称定义常量)。​​copyHeadersIfAbsent​​​​setHeader​​​​set​​​​MessageHeaders​

您还可以用于设置消息的优先级,如以下示例所示:​​MessageBuilder​

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

只有在使用 a(如下一章所述)时才考虑标头。 它被定义为 a。​​priority​​​​PriorityChannel​​​​java.lang.Integer​