在分布式系统和实时数据处理不断发展的背景下,高效、可扩展和可靠的消息中间件的需求变得日益重要。Apache Pulsar作为一个强大的解决方案应运而生,提供了许多针对现代数据密集型应用的功能。在本文中,我们将探索Pulsar的关键特性、使用场景和优势。
Apache Pulsar是什么?
Apache Pulsar是一个分布式消息和事件流平台,旨在解决实时处理海量数据的挑战。由Yahoo开发,并随后作为Apache软件基金会的一部分开源,Pulsar拥有独特的架构,将消息存储和消息处理分开,实现了出色的可扩展性和可靠性。
Apache Pulsar的关键特性:
-
多租户性能: Pulsar支持多租户性能,允许组织在保持不同租户之间隔离的同时共享集群。这一功能对于云环境和拥有多样化数据处理需求的大型企业至关重要。
-
水平扩展性: Pulsar的架构天然具有可扩展性,支持无缝扩展集群以满足不断增长的工作负载。它采用分布式架构,可以动态添加或删除代理节点,而不会影响系统的可用性。
-
地理复制: Pulsar支持地理复制,实现数据在多个地理区域的冗余。这确保了高可用性和灾难恢复能力,对于需要严格SLA的关键应用至关重要。
-
统一的消息和流: 与传统的消息系统不同,Pulsar无缝集成了消息和事件流的能力。它支持传统的发布-订阅消息模式和现代事件流范式,满足各种使用情况。
-
持久化消息存储: Pulsar提供由Apache BookKeeper支持的持久化消息存储,确保消息即使在节点故障或系统崩溃时也能安全持久化。
-
分层存储: Pulsar提供分层存储,允许组织根据其访问模式和保留策略将消息存储在不同的存储层中。这一功能通过将较旧或不经常访问的数据转移到更便宜的存储解决方案来优化存储成本。
Apache Pulsar的使用场景:
-
实时分析: Pulsar非常适用于实时分析应用,其中需要处理来自各种来源的流数据,并在接近实时的时间内进行分析。它使组织能够从流数据流中获取有价值的见解,支持欺诈检测、异常检测和预测性维护等用例。
-
微服务通信: Pulsar作为微服务架构的有效通信层,实现了分布式服务之间的无缝交互。它提供了可靠的消息传递和事件驱动通信,支持可伸缩和具有韧性的微服务部署。
-
物联网数据接入: 随着物联网设备生成海量数据的增加,Pulsar为物联网应用提供了一个理想的平台,用于实时接收、处理和分析物联网数据流。它可以处理物联网应用的高吞吐量和低延迟要求,帮助组织利用物联网数据进行可行动的见解和决策。
-
日志聚合和监控: Pulsar可用于日志聚合和监控,将分布式系统和应用程序的日志集中到一个*存储库中。它提供了实时日志处理能力,使组织能够实时监控系统健康状况、检测异常并主动解决问题。
Apache Pulsar的优势:
-
可扩展性: Pulsar的可扩展架构使其能够轻松处理海量工作负载,使其适用于小规模部署和大规模企业应用。
-
可靠性: Pulsar确保可靠的消息传递和容错,这得益于其分布式和韧性的架构。
-
灵活性: Pulsar支持广泛的消息模式和使用情况,为各种应用需求提供了灵活性。
-
性能: Pulsar提供高吞吐量和低延迟,非常适用于实时数据处理和事件驱动应用。
-
社区和生态系统: 作为Apache软件基金会的一部分,Pulsar受益于充满活力的社区和丰富的集成和扩展生态系统。
总而言之,Apache Pulsar作为现代数据密集型应用的强大消息中间件解决方案崛起。凭借其可扩展的架构、可靠的消息传递和灵活的特性,Pulsar使组织能够构建强大的实时数据处理管道,推动创新和价值创造。
1、引用Maven依赖
<dependency>
<groupId></groupId>
<artifactId>pulsar-client</artifactId>
<version>2.11.0</version>
</dependency>
2、YAML配置
test:
pulsar:
url: pulsar://127.0.0.1:6650
topics:
test: persistent://public/test/test
3、pulsar配置类
/**
* @author Lucas
* date 2024/3/6 10:56
* description pulsar 配置文件,主要声明了无事务client 和有事务client
*/
@Configuration
@RefreshScope
public class PulsarConfig {
@Value("${}")
private String serviceUrl;
@Bean("PulsarClient")
public PulsarClient getPulsarClient() {
try {
PulsarClient client = ()
.serviceUrl(serviceUrl)
.build();
return client;
} catch (PulsarClientException e) {
(e);
throw new RuntimeException("初始化Pulsar Client失败");
}
}
@Bean("PulsarTransactionClient")
public PulsarClient getPulsarTransactionClient() {
try {
PulsarClient client = ()
.serviceUrl(serviceUrl)
.enableTransaction(true)
.build();
return client;
} catch (PulsarClientException e) {
(e);
throw new RuntimeException("初始化Pulsar Client失败");
}
}
}
4、读取topic配置
/**
* @author Lucas
* date 2024/3/6 10:48
* description pulsar topic配置
*/
@Configuration
@ConfigurationProperties(prefix = "test")
public class SystemConfig {
@Getter
private final Pulsar pulsar = new Pulsar();
public static class Pulsar {
@Getter
private final Topics topics = new Topics();
}
@Data
public static class Topics {
private String test;
}
}
5、pulsar公用方法
/**
* @author Lucas
* date 2024/3/6 10:55
* description pulsar 公共方法
*/
@Component
public class PulsarCommon {
@Qualifier("PulsarClient")
@Resource
private PulsarClient client;
@Qualifier("PulsarTransactionClient")
@Resource
private PulsarClient transactionClient;
/**
* 创建一个生产者
* @param topic topic name
* @param schema schema方式
* @param <T> 泛型
* @return Producer生产者
*/
public <T> Producer<T> createProducer(String topic, Schema<T> schema) {
try {
return (schema)
.topic(topic)
.batchingMaxPublishDelay(10, )
.sendTimeout(10, )
.blockIfQueueFull(true)
.create();
} catch (PulsarClientException e) {
throw new RuntimeException("初始化Pulsar Producer失败: " + topic, e);
}
}
/**
* @param topic topic name
* @param subscription sub name
* @param messageListener MessageListener的自定义实现类
* @param schema schema消费方式
* @param <T> 泛型
* @return Consumer消费者
*/
public <T> Consumer<T> createConsumer(String topic, String subscription,
MessageListener<T> messageListener, Schema<T> schema) {
try {
return (schema)
.topic(topic)
.subscriptionName(subscription)
.ackTimeout(10, )
.subscriptionType()
.messageListener(messageListener)
.subscribe();
} catch (PulsarClientException e) {
throw new RuntimeException("初始化Pulsar Consumer失败: " + topic, e);
}
}
/**
*
* @param topic topic name
* @param subscription sub name
* @param messageListener MessageListener的自定义实现类
* @param schema schema消费方式
* @param <T> 泛型
* @return Consumer消费者
*/
public <T> Consumer<T> createConsumerDelay(String topic, String subscription,
MessageListener<T> messageListener, Schema<T> schema,
Integer ackRedeliveryTime, TimeUnit ackRedeliveryTimeUnit,
Integer negativeAckRedeliveryTime, TimeUnit negativeAckRedeliveryTimeUnit) {
try {
return (schema)
.topic(topic)
.subscriptionName(subscription)
.ackTimeout(ackRedeliveryTime,ackRedeliveryTimeUnit)
.negativeAckRedeliveryDelay(negativeAckRedeliveryTime,negativeAckRedeliveryTimeUnit)
.subscriptionType()
.messageListener(messageListener)
.subscribe();
} catch (PulsarClientException e) {
throw new RuntimeException("初始化Pulsar Consumer失败: " + topic, e);
}
}
}
6、生产者配置
/**
* @author Lucas
* date 2024/2/22 18:40
* description 生产者
*/
@Component
public class PulsarProducerInit {
@Resource
private PulsarCommon pulsarCommon;
@Resource
private SystemConfig systemConfig;
@Bean(name = "testProducer")
public Producer<String> testProducer() {
return (().getTopics().getTest(), ()/*StringSchema.utf8()*/);
}
}
7、消费者配置
/**
* @author Lucas
* date 2024/2/22 18:45
* description 消费者
*/
@Component
public class PulsarConsumerInit {
@Resource
private PulsarCommon pulsarCommon;
@Resource
private SystemConfig systemConfig;
@Resource
private TestListener testListener;
@Bean(name = "testConsumer")
public Consumer<String> testConsumer(){
return (().getTopics().getTest(),
"test",
testListener, ()/*StringSchema.utf8()*/,10,,20,);
}
}
8、生产者生产消息
public class test {
@Resource(name = "testProducer")
private Producer<String> testProducer;
/**
* 发送pulsar消息
*/
public void test(){
String id = "CS123456789";
("测试发送Pulsar异步通知,id:{}", id);
try {
().value(id).send();
} catch (PulsarClientException e) {
("测试发送Pulsar异步通知,ID:{},请重试:{}",id, e);
}
}
/**
* 发送pulsar延时消息
*/
public void testDelay(){
String id = "CS123456789";
Date pulsarTime = new Date();
("测试发送Pulsar异步延迟通知,id:{}", id);
try {
long timeMillis = ();
long time = ();
().value(id).deliverAfter(time - timeMillis, ).send();
} catch (PulsarClientException e) {
("测试发送Pulsar异步延迟通知,ID:{},请重试:{}",id, e);
}
}
}
9、消费者消费消息
/**
* @author Lucas
* date 2024/2/22 18:36
* description 测试消费者
*/
@Slf4j
@Component
public class TestListener implements MessageListener<String> {
@Override
public void received(Consumer<String> consumer, Message<String> msg) {
("测试,交易订单ID:{}", ());
try {
(())
.doOnSuccess(success -> {
(msg);
}).doOnError(error -> {
("测试,发生错误", error);
(msg);
}).subscribe(logs -> {
(());
("测试,订单号:{}", ());
});
} catch (Exception e) {
("测试,发生错误", e);
(());
}
}
}