基于Kafka消息驱动最终一致事务(二)

时间:2023-03-09 07:41:58
基于Kafka消息驱动最终一致事务(二)

实现用例分析

上篇基于Kafka消息驱动最终一致事务(一)介绍BASE的理论,接着我们引入一个实例看如何实现BASE,我们会用图7显示的算法实现BASE。

aaarticlea/jpeg;base64,*zu/FOvrm3xSOXLm693kUW4F6+SUDLC2ahE3IckGb41XHaH2bun0jAu2MYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwGMYwNF2/R49E2b9hVSuMxEueYrCJnpEbjmI65lUuVLqBs03LsoP8DlGLALT7CCZicgfPYdPG8fCCEHTynHQszGTES+SvbJBBBJRE+sbo/blUnmuZ4px0JtI4qy+zcfy1prArJi1HZ7K0G6tdAQYnRgwQal1926J1D07Gee2+e8sFFtx8kKm0Q4n2ISuUsK+yEtKfkK7mmk7hj26T/Rmf+JOWTyb+K5LmJoUKlm0qeZYuuLClaarkIOTT2BKe8c/g1KB0jrgX/GU2hzXlNhPHpeUrsc0quxDITA9jtFrdkhIZiN6tpBB/xFMfTNPifkHknJ8ysLlyrG7vze4uGwTq8BqIDCQpLJciUjEyx5QUayOBeMZSfJOZ59F/lpp3yrJ44+MFKIUkxL5rYQ3uExZF0idR0mOv3Zx3fJfJKrXcdN8ARX5F9VnL2jTVmAGuh6VMb8N6BIiaXXs9YHTpOB6FjKGnyPmE2uNHkuXrkV5IwuvUJUzvOXSFmyqwgHkmQgJkl7IGYLppnIXmV6OLCsPKmfIQ4lnyUOpRRMwQDZBNgaLRMSkvYHb7m7WJnA9Hxnmo+ZeQ3G8WEXwpOu16LQGZqKrkRskLkN+XHdIum0RRrpPrGSXjnP8AknI+QCuxdrCMOsDb4qWRLlLXuENqRpAwJ12zuN5CUen00C5WL9Gq1KbNhSG2S2V1sMQJhf1VwUxJT90Zuyo88NpPlT76LTFnU4ay1S4FJBBQfp71Eem6BKfd6xH06ZzWeX5SnHGhyfOlQrXqjLZ8gSqwx8iBTsqL7iZCB9xnpOpl6ROBd8Z5uvyXy6zUZddcOi1TeJSVQUJ2xN8EQ6S7qyOJiWbhjXpPSdY6ZaeA5HlGUOWB7Jv2eMt2K9dhiAm2FgDFwcJEB3e/b7RjAn8Z5pHmXLduDq8z8508W27cQKq+tV4OrCS+2Cu4EAJnG1kzP251cv5ryTOQtBxF5H6X8mqn9Qk1glAMrtYU/I7FkY3sCB3GBDHp011gPQcZQ1+W8pRq7+V5KvIv4622jYXtlbnqbtVIGaEwxmyY6CECXrA6Zz0vKPKbnKIQV6rUZupiqm8xA7K2pUxjoSNNhs3SR6EtwCOnujp1D0TGMYDGMYDGMYDGMYDGMYDGMYDGMYGm7NyKxzRhc2enbhuuz1jXXb19MrXK+QeU8VsK3WrQDJ0ExgyGZj6f2muWvK559/0dP/MD/qMy2xMTeK2rExM9eaW9ExWbRaYmI6OPi/KfI+UsFXqpqywQk53QcRpEwP8A5n+dkn3vNf8AyKX9J/8A15A+A/8AWHf8uX+uvL1ne/NaX01pXGI5w42Itemq17Zz4qnyfkXlXF6TbqVxAp0FgwZDM/ZrDen78lKfI8zyHB17tMEfLaRbxZvhe0SMPboWuvSPrnbzFIb3GWK0xrJhOz7jjqM/05x+If8A/O1P9p/+6eczak7cWilYtFoifCYxLqK2jc0zaZrNZmPflD8n5R5NxbRVcrVhk41Aog5iYj7JhuOM8o8m5RpKp1qxSEamUwcQMT9sy3H8w/8AgP8Abf7rH8vP+P8A9j/vctinY7uiur+nPCWb9/t67Y/xlZuOnk5RM8lChfunbCN23bpGmu/XrrrnTjIbyXyEOIrwtWhXHRPbGfQY/rl/2ZkrWb2xWOM9IaZtFK5tPCPF2cnzXHcWG62zQp6ioepl+wf/AH5Er57yDk/dxVCFon0fYnpMfbHUf8muR3i3Clyzz5bk5lwQWgQfXuHHrJf5o/Zl0iIiIiI0iPSMreKbc6ca7Rzmf2x5J1m+5GrOis8ojnPmgPh+as6zerp+4Agv9Zc5pbPnNQd2qLkR6wMRr/Rouf6MsuM5jd8aUn+Lrtf73j+SqVfPNrO1yVWVFE6ES9ek/eB9f8uWlTAasWrncDIggKPSYmNYnK95pwyrFEuRWMRYr6Scx/GHpOv/AMvrkxwv/R6P/Lp/1Bz3cjbmlb0jTmcWh5tzeL2pedWIzEuvGMZFVo5CwdahZsriJNKmMGC9JkBko100+zKZ/j7mP/Jr/wCif/qZc+RQdnj7Ndem9ymLDXpGpDIxr/TkbxHi/G06ixs11vszGrTOION0+sDujTSP2ZfattVrM3rqnPCEd2u5a0RS2mMcVe/x9zH/AJNf/RP/ANTH+PuY/wDJr/6J/wDqZb/0Xh//AODX/wDsh/8ATnwuI4UBkypV4EYmZmVB0iP+7nXe2P8Ak57W9/1VH/H3Mf8Ak1/9E/8A1MtH6he/8qP/AMP5H4S/tv6nr/k9cqvCUFc15C2yKhXRSXc7YjAjpHRYaRpHXTWf35fMpau33aUikcpm0eXCHFbbnbvabzziKz58ZMYyC86sWK3il51Y2KcML2mk5Wzq0BmAMSGYmYnT1zE1p3GVn9Qdww0a1Pj7M2uXeaQRyV82SuVqNu6W9y/EDMB6DP7s4OR805DkODuBxdOAuL4+xZuEVmVfG2k+v+SYKKWHvScj+H0jrGuBdcZW+Xv2EeK8a/VhS5nHA9q3yl0Q1iR3wfbbv1MoghnTUZnrmXiDeSt2OWu8j+P5jq6hGy1qxBByvYKSWtYabfxRG4teunpgWLGMYDGMYDGMYEbf8b4fkbXyrijYyYATGHOBbIXO8IaoGCtkDPWNwzkljGAxjGAxjGBzlx1M+QXyRL1uKUaFt3F0WwgMx267eshHXTOjGMBjGMBjGMBjGMBjGMBjGMBjGMBjGMBjGMBlc8+/6On/AJgf9RmWPK559/0dP/MD/qMyux9WnvT3/p29yI8B/wCsO/5cv9deXrPPvDLtSlyjW2milcoIYIukaya50/yZcJ8k4IYmZuL0j7Jmf/ZGU9VS07uYrM8I5Qn6a1Y2+MxHGecpCTCCgZKIItdsTPWdPXTPuVRfOJ5by2hFbWa9cXQBTGm4iWe4tPs6RlryO5tzTTE85jVjwVpeL6pjlE4VD+Yf/Af7b/dY/l5/x/8Asf8Ae4/mH/wH+2/3WP5ef8f/ALH/AHuavs/b5mf7r2+VbWMBSyYc6AESRT9kRGs55dyd9vJX22j1mWF7B+wfQR/oz0Dyh0p4C4ceshAfuMhCf9bPPOOGD5CsM+hNXE/vKMejrEVvfyPV2mbVp5vTuNqDSoIqjGkKCBn7y/in9850YxmOZzMzPVriMRiOhjGM8GFhC7CGV2xuW0ZA49NYKNJwhK66FoVGi1CIBHroIxtjM8YzOMGOOTGMYDGMYDIDzTk/icZ8Vc/nXNQ++Fx+P+n0yfmYGJIp0iOszPpEZS6WvkflJWi606mhBH02hP5cf94vdltisapvb9tI1T7+kJb1pxFK/uv8MfrKweNcV+mcWtZxo9v5jvt3F/D/AN2OmSmMZx3La+51zl3oro0dMYM5+S46nylJtC8EsrOiIYEEQTOkwUaEshKOsfScw5mw6tw96ygtjk13MWWkToQARDOk6x6xld4jyEqvDVuUuWuS5Nlua9eKzaqq/wDeHxEx2N1anuGS6bt5DnDpNo8b4pLkO0e5tVkurnYtWbEgZASZkfkOZ6gcxp6Zz2PC/G7C4WdYxGAYqZU96iJTjJzFMJTRkwIzKdpax1zT/jSqwBVWpW3cgc2AKgAq7y/izAOJkk6FQMEQ6TBzrrGmucQeTcqj+WQeRFHyuR+DD5PaAx3JH+0IdVjtGesxH7owLHY4nj7NJVByt1VMpJa9xRpNchYr3QUT7SCJ9ev1zOnRq0haNYO2LmseyNZnVjS3mXumfWZ9Mgo8qXxVeUcgu9bZSUDeUtmFbWtDZmR7/wAcwCZiOuiYPQes/bkjxPPp5axYXWrPFNVrq52mQuFE2uyUmAaMk5103RO3TT669MCTxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAxjGAyueff9HT/wAwP+ozLHlc8+/6On/mB/1GZXY+rT3p7/07e5B+E1a1nlWrsqBwQgigWDBxE71xroWv25cy4ThiiYmjX0n7FBE/0xGVHwH/AKw7/ly/115esp6q0xu8JmOEJ+mrE7fGInjKqjwqOL8volWjbXsC0hCeu0hWe6I+7rGWrMSUsjFhDEmGuwpiJkdek6T9NcyyO5uTfTM84jTlWlIpqxymcqh/MP8A4D/bf7rH8vP+P/2P+9x/MP8A4D/bf7rH8vP+P/2P+9zV9n7fMz/de3ypzydXd4G4Gmugb/8AQKD/AOzPOarYTaS6fRZif+jMTnq7lA5RpZGoMGRKPuKNJzyq9TbSuNqN/GkpGfvj6T++OuPR2ia2p5vPVxMTW/k9XiYmNY6xPpOMi/GOSG/xCSmdWpiFNj66jHSf3x1yUzHas1tNZ6ThrraLRFo6xkyFteN2mEZo5a4qSmZiJZJDGv00iR6ZNYz2t7V5PLVi3N5jZ5Lm61ltc71jekyWWjj01GZGf4vuy08TwPIWaSbVrlbcS8BZALaUaQUbo6lJfTKlzX/WL3/MO/1yz0fhf+j0f+XT/qDmz1NprSk1iIm3Pgy+nrqvaLZnHLi68YxmFsMYzFrVpWTWTAgESRFPpER1mcCB805aKfH/AA1zo+3qM6fRcfin9/pnR4lxf6fxIScaOs/ms19YiY9o/ujKxV7nkvk0MYM/Hid5DP8ACkPQZ/8Am9J/bl/zRux29uu11n4r/pCG18d7bnSPhr/cxjGZ12q7VXcpvptmYXYWajkdIKBMZGdNYnr1yIV4mIoq1rHJXLdeiyu2spsVhgCrTErjVNZZTHTSdZycxgQLfDaU25u1bdqncllgyeklSUjakSamRalgSGoRMe3WPtzf/hbj/wDC3+F+434Px/i9zcPe2aaa7tm3d/3cl8YELynidHkrNhrLFhKrwAu/WUQQqwK9dsM3rIx6TtnYQ6x0nO7i+KrcWlya5EQvsPtFvmJ0OwwnHA7YH2xJdM7MYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDGMYDK559MfpCY16zYHp/3GZO3U2HVjVWf8ZxabXbYPb1iZ9s6esdMrPK+NG2QLleciPXt94BCPv2xLYjK7GmLRe1ojE8sTMpb2qazWtZnMc8w4PApiOYbrPrXKI/015e8plHxepFkZoc6v5Ea7O1Ayfp102u1yzcZS5CpDIuXZu7tuzVcBI6a6+kzrr0zv1FqXtrreJ4RwxLnYi9K6bVmOPPMO3GM5uRrXLKIXTtTTZuiZZAQzUdJ9uhTH1+uQiMziZx+K0ziOWVZ/mHMa0I+sd7p/9vH8vJjW/GvWezOn/wBzPnJeMLN8FyfOBDpj294RGdv+bBOjpjjfGFg+S4znAl0R7uyIlO374F09M169rsdruRnxxPjlm0bne7mjh4ZjwwuGQHlXjc8muLVWIi4qNNvpDB+z9sfTJbjq1ysiV3LU3GbpmGSEL0HSPboMz9frnTmatppbNZzj8paLVi9cWjm824XlrXBX57gFC5mAsomNJ0j66T/FH0z0KjfqX0Q+oyGBPrp6xP2FH0nOLmavj1qYXyZpW3T2kTBWyI/brE6ZBp8ZqfI3cJzAi2Ou0TEi0/aoo/8AZlr32t3jM9u/5xKNKbm1wj46flMLhnwiERkimIEY1mZ9IiMr40PNVxtC+hkfawev+RU5pvcHztlJfqnLgutH49BgQ0/zv7OP6cnG3XPHcrj8Mz+inct027Z/HEKhyDxsX7NgPwtawx/YRSWel8L/ANHo/wDLp/1Byu1PH/EUzBWOQXZn7JesB/oAtf8ALlqrwiELivt7ECMK2zqOzT27Zj6aZT1G9S8VrTPwp7G1ek2tbHFnjGMzNBlV855jtJHi0l726G+Y+gfwj++ctWU7kPG6Lb7W3uaSLzLc0CgBKNfpoTunT0y2xNIvq3JxEcY4dUt+LzTTSM55+53+Ecb8bjZuHGjLc6j9yx6D/TOs5YswRCRQuEadmBGF7Z1Hbp7dJj6aZnk9y83tNp6y7pWK1isdIMYxnLp//9kAAA==" alt="" />

首先介绍使用技术栈

JDK:1.8

Spring:spring-boot,spring-data-jpa

数据库:Mysql

消息服务器:Kafka

数据表

用户库user创建用户表user,更新应用表updates_applied

CREATE TABLE `user` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) NOT NULL,
`amt_sold` INT(11) NOT NULL DEFAULT '',
`amt_bought` INT(11) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
); CREATE TABLE `updates_applied` (
`trans_id` INT(11) NOT NULL,
`balance` VARCHAR(50) NOT NULL,
`user_id` INT(11) NOT NULL
);

交易库transaction创建交易库表transaction

CREATE TABLE `transaction` (
`xid` INT(11) NOT NULL AUTO_INCREMENT,
`seller_id` INT(11) NOT NULL,
`buyer_id` INT(11) NOT NULL,
`amount` INT(11) NOT NULL,
PRIMARY KEY (`xid`)
);

配置两个数据源

使用JavaConfig方式。其它damain类,repository类,service类请看源码github地址:https://github.com/birdstudiocn/spring-sample/tree/master/Message-Driven-Sample

package cn.birdstudio.user.domain;

import javax.sql.DataSource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager; @Configuration
@EnableJpaRepositories(basePackageClasses = User.class, entityManagerFactoryRef = "userEntityManagerFactory", transactionManagerRef = "userTransactionManager")
class UserDataSourceConfiguration {
@Bean @ConfigurationProperties("app.datasource.user")
DataSourceProperties userDataSourceProperties() {
return new DataSourceProperties();
} @Bean @ConfigurationProperties("app.datasource.user")
DataSource userDataSource() {
return userDataSourceProperties().initializeDataSourceBuilder().build();
} @Bean
LocalContainerEntityManagerFactoryBean userEntityManagerFactory() {
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setGenerateDdl(false);
LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setJpaVendorAdapter(vendorAdapter);
factory.setPackagesToScan(User.class.getPackage().getName());
factory.setDataSource(userDataSource());
factory.setPersistenceUnitName("user");
return factory;
} @Bean
PlatformTransactionManager userTransactionManager() {
JpaTransactionManager txManager = new JpaTransactionManager();
txManager.setEntityManagerFactory(userEntityManagerFactory().getObject());
return txManager;
}
}

TransactionDataSourceConfiguration

package cn.birdstudio.transaction.domain;

import javax.sql.DataSource;

import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager; @Configuration
@EnableJpaRepositories(basePackageClasses = Transaction.class, entityManagerFactoryRef = "transactionEntityManagerFactory", transactionManagerRef = "transactionManager")
class TransactionDataSourceConfiguration {
@Bean
@ConfigurationProperties("app.datasource.transaction")
DataSourceProperties transactionDataSourceProperties() {
return new DataSourceProperties();
} @Bean
@ConfigurationProperties("app.datasource.transaction")
DataSource transactionDataSource() {
return transactionDataSourceProperties().initializeDataSourceBuilder().build();
} @Bean
LocalContainerEntityManagerFactoryBean transactionEntityManagerFactory() {
HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
vendorAdapter.setGenerateDdl(false);
LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setJpaVendorAdapter(vendorAdapter);
factory.setPackagesToScan(Transaction.class.getPackage().getName());
factory.setDataSource(transactionDataSource());
factory.setPersistenceUnitName("transaction");
return factory;
} @Bean
PlatformTransactionManager transactionManager() {
JpaTransactionManager txManager = new JpaTransactionManager();
txManager.setEntityManagerFactory(transactionEntityManagerFactory().getObject());
return txManager;
}
}

配置Kafka消息服务

生产者配置类KafkaProducerConfig.java,配置KafkaTransactionManager必须设置producerFactory.setTransactionIdPrefix("trans");

Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Map<String, Object>> producerFactory() {
DefaultKafkaProducerFactory<String, Map<String, Object>> producerFactory = new DefaultKafkaProducerFactory<>(
producerConfigs());
producerFactory.setTransactionIdPrefix("trans");
return producerFactory;
} @Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.168:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 2);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props;
} @Bean
public KafkaTemplate<String, Map<String, Object>> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

消费者配置类KafkaConsumerConfig.java,配置KafkaTransactionManager

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
ProducerFactory<String, Map<String, Object>> producerFactory) {
ConcurrentKafkaListenerContainerFactory<String, TransactionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//factory.setMessageConverter(new StringJsonMessageConverter());
//factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setTransactionManager(new KafkaTransactionManager<>(producerFactory));
return factory;
} @Bean
public ConsumerFactory<String, TransactionMessage> consumerFactory() {
JsonDeserializer<TransactionMessage> jd = new JsonDeserializer<>(TransactionMessage.class);
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jd);
} @Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.1.168:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}
}

Kafka消息监听接口实现UserServiceImpl。@KafkaListener(groupId = "group1", topics = "transaction")注释监听事件接口,@Transactional("userTransactionManager")注释数据库事务。事件接口被调用KafkaTransactionManager事务开始,然后JpaTransactionManager事务开始,如果事务提交则调用producer.sendOffsetsToTransaction(),最后KafkaTransactionManager事务提交。如果JpaTransactionManager事务有异常则不调用producer.sendOffsetsToTransaction()。如果JpaTransactionManager事务提交后KafkaTransactionManager事务有异常也不调用producer.sendOffsetsToTransaction()。int processed = updatesAppliedRepository.find(trans_id, id, type.toString())语句来判断是否已经更新了User。producer.sendOffsetsToTransaction()作用与删除队列消息相当。

@Component("userService")
public class UserServiceImpl implements UserService {
private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
private final UserRepository userRepository;
@Resource
private UpdatesAppliedRepository updatesAppliedRepository; public UserServiceImpl(UserRepository userRepository) {
this.userRepository = userRepository;
} private void sold(TransactionMessage msg) {
Type type = msg.getType();
int id = msg.getId();
int amount = msg.getAmount();
int trans_id = msg.getXid();
int processed = updatesAppliedRepository.find(trans_id, id, type.toString());
if (processed == 0) {
switch (type) {
case SELLER:
userRepository.updateAmtSold(id, amount);
break;
case BUYER:
userRepository.updateAmtBought(id, amount);
break;
}
//throwException();
UpdatesApplied updatesApplied = new UpdatesApplied();
updatesApplied.setTrans_id(trans_id);
updatesApplied.setUser_id(id);
updatesApplied.setBalance(type.toString());
updatesAppliedRepository.save(updatesApplied);
}
} @Override
@Transactional("userTransactionManager")
@KafkaListener(groupId = "group1", topics = "transaction")
//@KafkaListener(groupId = "group1", topicPartitions = @TopicPartition(topic = "", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5")))
public void receivekafka(TransactionMessage msg) {
logger.info("receive kafka message {}", msg);
sold(msg);
} private void throwException() {
throw new RuntimeException("throw exception in test");
}
}

参考资料

1,http://queue.acm.org/detail.cfm?id=1394128

2,Spring Data JPA - Multiple datasources exam

3,JMS

4,https://*.com/questions/42230797/spring-cloud-stream-kafka-eventual-consistency-does-kafka-auto-retry-unackno

5,http://www.kennybastani.com/2016/04/event-sourcing-microservices-spring-cloud.html

6,使用Spring Cloud和Reactor在微服务中实现Event Sourcing

7,Spring Kafka Tutorial – Getting Started with the Spring for Apache Kafka

8,碧桂园旺生活平台解决分布式事务方案之tcc开源框架 https://github.com/yu199195/happylifeplat-tcc