什么是分布式事务
创建订单与从购物车删除已下单商品两个操作,因为从购物车删除已下单商品这个步骤,并不是用户下单支付这个主要流程中必需的步骤,使用消息队列来异步清理购物车是更加合理的设计。
如果本地的订单没有创建成功,但是发出去的删除购物车中商品的消息已经被消费处理了,这就是一个分布式事务。
创建订单和发送消息这两个步骤要么都操作成功,要么都操作失败,不允许一个成功而另一个失败的情况出现,这就是消息队列分布式事务需要解决的问题了。
关于分布式事务更详细的内容,参考分布式事务。
分布式事务有什么局限性
对于分布式系统来说,严格的实现 ACID 这四个特性几乎是不可能的,或者说实现的代价太大,大到我们无法接受。
分布式事务就是要在分布式系统中的实现事务。在分布式系统中,在保证可用性和不严重牺牲性能的前提下,光是要实现数据的一致性就已经非常困难了,所以出现了很多“残血版”的一致性,比如顺序一致性、最终一致性等等。
消息队列是怎么实现分布式事务的?
- 订单系统在消息队列上开启一个事务
- 订单系统给消息服务器发送一个“半消息”。半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
- 订单系统就可以执行本地事务了。如在订单库中创建一条订单记录,并提交订单库的数据库事务。
- 根据本地事务的执行结果决定提交或者回滚事务消息
订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程
订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。
半消息存在什么问题,怎么解决的
存在的问题是,本地事务执行成功,但是分布式事务却提交失败了,那么也会导致数据不一致的问题。
kafka 解决方案:直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之
RocketMQ 解决方案:增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
kafka 的事务实现流程
- 开启事务的时候,生产者会给协调者发一个请求来开启事务,协调者在事务日志中记录下事务 ID。
- 生产者在发送消息前,还要给协调者发送请求,告知发送的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志。
- 生产者就可以像发送普通消息一样来发送事务消息
- 消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。
- 第一阶段:协调者会告诉生产者事务已经提交了,把事务的状态设置为“预提交(Prepare Commit)”,并写入事务日志。到这里,实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。
- 第二阶段:协调者在事务相关的所有分区 Leader 中,都会写一条“事务结束(CompleteCommit)”的特殊消息,当 Kafka 的消费者,也就是客户端,读到这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。
- 最后,协调者记录最后一条事务日志,标识这个事务已经结束了。
补充说明 1: 特殊情况下,事务已经提交成功,但还是读取不到数据,那是因为当前提交成功只是一阶段提交成功,事务协调器会继续向各个分区发送特殊信息,此操作会无限重试,直至成功。
补充说明 2: 不同分区 Leader 可能无法全部同时收到特殊信息,此时有的可以访问,有的无法访问,属于正常现象。毕竟 Kafka 事务只能保证最终一致性,不能保证强一致性。
实现代码
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启
enable.idempotence = true。 - 设置 Producer 端参数
transactional.id。最好为其设置一个有意义的名字。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。修改起来也很简单,设置 isolation.level 参数的值即可。当前这个参数有两个取值:
read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
应用场景
更多的是配合 Kafka 的幂等机制来实现流式处理场景的 Exactly Once。在流计算中,用 Kafka 作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这样的过程中,保证每条消息都被恰好计算一次,确保计算结果正确。这里面有一个很重要的限制条件,就是数据必须来自 Kafka 并且计算结果都必须保存到 Kafka 中,才可以享受到 Kafka 的 Excactly Once 机制。这里的 Excactly Once 不同于日常消息生产消费结果的 Excactly Once。