分布式事务方法

  • 基于 XA 协议的二阶段提交协议方法(强一致性);
  • 三阶段提交协议方法(强一致性);
  • TCC 协议(强一致性);
  • 基于消息的最终一致性方法(最终一致性)。

基于 XA 协议的二阶段提交方法

XA 协议

  • XA 协议是一个分布式事务协议,规定了事务管理器和资源管理器接口。
  • XA 分布式事务的原理:事务管理器 = 协调者,负责各个本地资源的提交和回滚,资源管理器 = 参与者,通常由数据库实现。

二阶段提交

  • 二阶段提交协议(Two-phase Commit Protocol,2PC)用于 XA 在全局事务中协调多个资源的机制。
  • 执行过程 = 投票(Voting) + 提交(Commit)两个阶段。
  • Voting 投票阶段
    • 协调者(Coordinator,即事务管理器)会向事务的参与者(Cohort,即本地资源管理器)发起执行操作的 CanCommit 请求,并等待参与者的响应。
    • 参与者接收到请求后,会执行请求中的事务操作,将操作信息记录到事务日志中但不提交(数据库没有到达 commit 阶段)。
    • 参与者执行成功,则向协调者发送“Yes”消息,表示同意操作;若不成功,则发送“No”消息,表示终止操作。
  • Commit 提交阶段
    • 当所有的参与者都返回了操作结果(Yes 或 No 消息)后,进入提交阶段。在提交阶段,协调者会根据所有参与者返回的信息向参与者发送 DoCommit(提交)或 DoAbort(取消)指令。
    • 若协调者从参与者那里收到的都是“Yes”消息,则向参与者发送“DoCommit”消息。参与者收到“DoCommit”消息后,完成剩余的操作(比如修改数据库中的数据)并释放资源(整个事务过程中占用的资源),然后向协调者返回“HaveCommitted”消息;
    • 若协调者从参与者收到的消息中包含“No”消息,则向所有参与者发送“DoAbort”消息。此时投票阶段发送“Yes”消息的参与者,则会根据之前执行操作时的事务日志对操作进行回滚,就好像没有执行过请求操作一样,然后所有参与者会向协调者发送“HaveCommitted”消息;
    • 协调者接收到来自所有参与者的“HaveCommitted”消息后,就意味着整个事务结束了。

|600

如上图所示,在协调两个服务 Application 1 和 Application 2 时,业务会先请求事务协调器创建全局事务,同时生成全局事务的唯一标识 xid,然后再在事务协调器里分别注册两个子事务,生成每个子事务对应的 xid。这里说明一下,xid 由 gtrid+bqual+formatID 组成,多个子事务的 gtrid 是相同的,但其他部分必须区分开,防止这些服务在一个数据库下。

有了子事务的 xid,被请求的服务会通过 xid 标识开启 XA 子事务,让 XA 子事务执行业务操作。当事务数据操作都执行完毕后,子事务会执行 Prepare 指令,将子事务标注为 Prepared 状态,然后以同样的方式执行 xid2 事务。

所有子事务执行完毕后,Prepared 状态的 XA 事务会暂存在 MySQL 中,即使业务暂时断开,事务也会存在。这时,业务代码请求事务协调器通知所有申请的子事务全部执行成功。与此同时,TM 会通知 RM1 和 RM2 执行最终的 commit(或调用每个业务封装的提交接口)。

优点与缺点

  • 优点:尽量保证了数据的强一致性,而且实现成本低。
  • 缺点
    • 同步阻塞问题:参与节点属于事务阻塞型的,即其他资源管理器访问同一临界资源会阻塞状态。不适合高并发场景。
    • 单点故障问题:事务管理器是单一节点,发生故障,整个系统都处于停滞状态。若提交阶段故障,则参与协调者都阻塞住。
    • 数据不一致问题:提交阶段发送“DoCommit”请求时,若网络波动导致部分参与者没收到请求导致没有执行,进而数据不一致。

应用场景

  • MySQL 内部保证 Binlog 和 InnoDB redo 日志的一致性,就是使用的基于 XA 的 2PC 提交。内部会自动将普通事务当做一个 XA 事务来处理:一个数据库事务会被自动的分成 Prepare 和 Commit 两个阶段;Binlog 作为事务协调者,MySQL Server 作为参与者,Binlog 日志和 Redolog 日志中都有 xid,也就是 XA 事务 ID,如 MySQL Binlog 日志查看

三阶段提交方法

三阶段提交简介

  • 三阶段提交协议(Three-phase Commit Protocol,3PC),是对二阶段提交(2PC)的改进。为了避免阻塞时间更长,三阶段提交引入了超时机制和准备阶段
  • 准备阶段:为了减少因等待锁定数据导致的超时情况,提高事务成功率,事务协调器会发送消息确认资源管理器的资源锁定情况,以及所有子事务的数据库锁定数据的情况。
  • 超时阶段:如果协调者或参与者在规定的时间内没有接收到来自其他节点的响应,就会根据当前的状态选择提交或者终止整个事务,从而减少了整个集群的阻塞时间。
  • 3PC 步骤过多,过程比较复杂,整体执行也更加缓慢,所以在分布式生产环境中很少用到它。

三阶段提交

  • CanCommit 阶段
    • 协调者向参与者发送 CanCommit 请求操作,询问参与者是否可以执行事务提交操作,然后等待参与者的响应;参与者收到 CanCommit 请求之后,回复 Yes,表示可以顺利执行事务;否则回复 No。参与者会根据自身情况,比如自身空闲资源是否足以支撑事务、是否会存在故障等,预估自己是否可以执行事务。
    • 不同之处在于,在 2PC 中,在投票阶段,若参与者可以执行事务,会将操作信息记录到事务日志中但不提交,并返回结果给协调者。但在 3PC 中,在 CanCommit 阶段,参与者仅会判断是否可以顺利执行事务,并返回结果。而操作信息记录到事务日志但不提交的操作由第二阶段预提交阶段执行。
  • PreCommit 阶段
    • 协调者收到所有参与者的回应后,进入 PreCommit 阶段。
    • 若参与者回复都是“Yes”,那么协调者就会进入事务的预执行:
      • 协调者向参与者发送 PreCommit 请求,进入预提交阶段。
      • 参与者接收到 PreCommit 请求后执行事务操作,并将 Undo 和 Redo 信息记录到事务日志中。
      • 如果参与者成功执行了事务操作,则返回 ACK 响应,同时开始等待最终指令。
    • 假如存在 “No” 消息或者响应超时,那么协调者会执行中断事务操作:
      • 协调者向所有参与者发送“Abort”消息。
      • 参与者收到“Abort”消息之后,或超时后仍未收到协调者的消息,执行事务的中断操作。
  • DoCommit 阶段
    • 根据 PreCommit 阶段结果进行“执行提交”或者“事务中断”。
    • 执行提交阶段
      • 若收到所有参与者 ACK 响应,则发送 DoCommit 消息,开始执行阶段。
      • 参与者接收到 DoCommit 消息之后,正式提交事务。完成事务提交之后,释放所有锁住的资源,发送 ACK 响应。
      • 协调者接收到所有 ACK 响应之后,完成事务。
    • 事务中断阶段
      • 协调者向所有参与者发送 Abort 请求。
      • 参与者接收到 Abort 消息之后,利用 PreCommit 阶段的 Undo 信息进行回滚,释放锁住的资源,发送 ACK 响应。
      • 协调者接收到所有 ACK 响应之后,执行事务的中断,并结束事务。

优点和缺点

  • 优点:3PC 协议在协调者和参与者均引入了超时机制。即当参与者在预提交阶段向协调者发送 Ack 消息后,如果长时间没有得到协调者的响应,在默认情况下,参与者会自动将超时的事务进行提交,从而减少整个集群的阻塞时间,在一定程度上减少或减弱了 2PC 中出现的同步阻塞问题。
  • 缺点:
    • 仍然存在数据不一致情况。比如在 PreCommit 阶段,部分参与者已经接受到 ACK 消息进入执行阶段,但部分参与者与协调者网络不通,导致接收不到 ACK 消息,此时接收到 ACK 消息的参与者会执行任务,未接收到 ACK 消息且网络不通的参与者无法执行任务,最终导致数据不一致。
    • 因为确认步骤过多,很多业务的互斥排队时间会很长,所以 3PC 的事务失败率要比 2PC 高很多。

TCC 协议

TCC 协议简介

  • TCC(Try-Confirm-Cancel),从流程上来看,它比 2PC 多了一个阶段,也就是将 Prepare 阶段又拆分成了两个阶段:Try 阶段和 Confirm 阶段。TCC 可以不使用 XA,只使用普通事务就能实现分布式事务。

TCC 协议执行

  • Try 阶段
    • 业务代码会预留业务所需的全部资源,比如提前扣除库存、冻结账户 100 元等,减少子事务锁定的数据量。后续可无锁进行。
  • Confirm 阶段
    • 业务确认所需的资源都拿到后,子事务会并行执行这些业务。可以不做任何锁互斥,也无需检查,直接执行 Try 阶段准备的所有资源就行。
  • Cancel 阶段
    • 若子事务在 Try 阶段或 Confirm 阶段多次重试后仍旧失败,启动执行 Cancel 阶段的代码,并释放 Try 预留的资源,同时回滚 Confirm 期间的内容。
  • 需要注意的是 Confirm 阶段和 Cancel 阶段的操作都需要满足幂等性,从而支持重试。

优点和缺点

  • 优点
    • 并发能力高,且无长期资源锁定;
    • 数据一致性相对来说较好;
    • 适用于订单类业务,以及对中间状态有约束的业务。
  • 缺点
    • 只适合短事务,不适合多阶段的事务;
    • 不适合多层嵌套的服务;
    • 相关事务逻辑要求幂等;
    • 代码入侵,实现分布式事务回滚,开发量较大,需要代码提供每个阶段的具体操作;
    • 存在执行过程被打断时,容易丢失数据的情况。

应用场景

  • 在业务中引入 TCC 一般是依赖单独的 TCC 事务框架,可以选择自研或者应用开源组件。TCC 框架扮演了资源管理器的角色,常用的 TCC 开源组件有 Tcc-transaction、ByteTCC、Spring-cloud-rest-tcc 等。
  • Seata,可以选择 TCC 事务模式,也支持了 AT 模式及 Saga 模式。

最终一致性(消息中间件)

将需要分布式处理的事务通过消息或者日志的方式异步执行,消息或日志可以存到本地文件、数据库或消息队列中,再通过业务规则进行失败重试。基于分布式消息的最终一致性方案的事务处理,引入了一个消息中间件(如 MQ),用于在多个应用之间进行消息传递。若都运行成功,则事务一致性。若其中一个环节运行失败,则回滚或者重试或者补偿机制。

事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。

|600

对于一个订单系统来说,在消息队列开启事务 向消息队列发送“半消息” 消息发送成功后,执行本地事务,创建订单记录,提交订单库的数据库事务 根据订单数据库事务的执行结果,决定消息队列的事务消息是提交还是回滚 若提交,则消息会被购物车系统接收到并进行处理。其中,“半消息”包含的内容是完整的消息内容,和普通消息的唯一区别在于,在事务提交之前,对于消费者来说是不可见的。

若提交事务消息失败了,应该怎么办?Kafka 的方案是抛出异常让用户自行处理,可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ 增加了事务反查机制来解决失败问题。具体来说,Broker 会定期去 Producer 上反查这个事务对应的本地事务状态,根据反查结果决定是否提交或者回滚事务。为了支撑 RocketMQ 的事务反查机制,业务代码中需要实现一个反查本地事务状态的借口,告知 RocketMQ 本地事务是成功还是失败。例如,订单系统例子中,可以根据订单 ID 去查询订单库的订单是否存在,存在则成功,反之则失败,若未知则稍后再次查询。关于 RocketMQ 的示例,可以参考官方文档:Transactional Message Sending

RocketMQ 反查机制的实现方式:为了防止提交或回滚的超时或者失败,采取回查的补偿机制,回查次数默认 15 次,仍出错则回滚。消息对消费者不可见的实现方式为,将其消息的主题 topic 修改为 RMQ_SYS_TRANS_HALF_TOPIC,和队列 id 修改为 0,原先的主题和队列 id 也做为消息的属性,如果事务提交或者回滚会将其消息的队列改为原先的队列。RocketMQ 开启任务,从 half topic 中获取消息,调用其中的生产者的监听进行回查是否提交回滚。

RocketMQ 反查机制的好处:反查的实现不依赖于消息的发送方,这样即使订单服务节点挂了,也可以通过其他订单服务的节点进行反查,从而确保事务的完整性。

RocketMQ 事务的源码解析查看:25 | RocketMQ与Kafka中如何实现事务?-消息队列高手课-极客时间

Kafka 分布式事务的好处:确保在一个事务中发送的多条消息,要么都成功,要么都失败。其中,多条消息不一定要在同一个主题和分区中,可以是发往多个主题和分区的消息。也可以在执行中加入本地事务来实现和 RocketMQ 中事务类似效果,但是 Kafka 是没有事务反查机制的。

Kafka 分布式事务机制的场景:更多的是配合 Kafka 的幂等机制来实现流式处理场景的 Exactly Once。在流计算中,用 Kafka 作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这样的过程中,保证每条消息都被恰好计算一次,确保计算结果正确。这里面有一个很重要的限制条件,就是数据必须来自 Kafka 并且计算结果都必须保存到 Kafka 中,才可以享受到 Kafka 的 Excactly Once 机制。这里的 Excactly Once 不同于日常消息生产消费结果的 Excactly Once。

Kafka 分布式事务的流程:参考 Kafka 实现分布式事务

当然,基于消息队列还有其他的实现方式。如现在订单库中创建订单,然后使用 Canal 或者 Maxwell 来检测订单库的 binlog,当有订单创建完成后,将数据写入到 MQ 中进行异步消费。

本地事务表

利用各系统的本地事务来实现分布式事务。将业务系统的执行和消息均放入本地消息表中,确保本地业务执行和消息表的操作在一个事务中。通过定时任务或其他机制重试读取本地消息表,调用远程应用操作,以实现最终的数据一致性。

例如:订单服务 + 库存服务的分布式事务。

  • 创建订单:用户在电商平台下单,订单服务接收到请求后,开启一个本地事务。在事务中,首先将订单信息插入到订单表中,同时生成一条消息记录插入到本地消息表中,消息内容可以是订单相关的关键信息,如订单编号、商品 ID、购买数量等,消息状态设置为 “未发送”。假设订单表和本地消息表的插入操作都成功,事务提交。
  • 发送消息:订单服务有一个定时任务或者消息发送线程,会定期扫描本地消息表中状态为 “未发送” 的消息。对于每条消息,将其发送到消息队列(如 RabbitMQ、Kafka 等)。发送成功后,将本地消息表中该消息的状态更新为 “已发送”;如果发送失败,可根据情况进行重试,或者记录错误信息并设置状态为 “处理失败”,等待后续的补偿机制处理。
  • 消费消息与扣减库存:库存服务从消息队列中接收消息,接收到消息后先进行幂等性检查,例如根据订单编号判断该消息是否已经处理过。如果是重复消息,则直接返回成功;如果是新消息,则开启本地事务,根据消息中的商品 ID 和购买数量,在库存表中扣减相应的库存数量。如果库存扣减成功,将本地事务提交,并返回处理成功的响应给订单服务;如果库存不足或者其他原因导致库存扣减失败,本地事务回滚,并返回处理失败的响应。
  • 更新消息状态:订单服务收到库存服务的响应后,如果是处理成功的响应,则将本地消息表中对应的消息状态更新为 “处理成功”;如果是处理失败的响应,则可以根据具体业务逻辑决定是否进行重试,或者将消息状态设置为 “处理失败”,并记录相关的错误信息。同时,可以设置一个最大重试次数,当重试次数超过该阈值时,不再重试,而是将问题记录下来,以便人工介入处理。

对账

把方案扩展一下,岂止事务有状态,系统中的各种数据对象都有状态,或者说都有各自完整的生命周期,同时数据与数据之间存在着关联关系。我们可以很好地利用这种完整的生命周期和数据之间的关联关系,来实现系统的一致性,这就是“对账”。

在前面,我们把注意力都放在了“过程”中,而在“对账”的思路中,将把注意力转移到“结果”中。什么意思呢?

在前面的方案中,无论最终一致性,还是 TCC、事务状态表,都是为了保证“过程的原子性”,也就是多个系统操作(或系统调用),要么全部成功,要么全部失败。

但所有的“过程”都必然产生“结果”,过程是我们所说的“事务”,结果就是业务数据。一个过程如果部分执行成功、部分执行失败,则意味着结果是不完整的。从结果也可以反推出过程出了问题,从而对数据进行修补,这就是“对账”的思路!

下面举几个对账的例子。

案例 1:电商网站的订单履约系统。一张订单从“已支付”,到“下发给仓库”,到“出仓完成”。假定从“已支付”到“下发给仓库”最多用 1 个小时;从“下发给仓库”到“出仓完成”最多用 8 个小时。意味着只要发现 1 个订单的状态过了 1 个小时之后还处于“已支付”状态,就认为订单下发没有成功,需要重新下发,也就是“重试”。同样,只要发现订单过了 8 个小时还未出仓,这时可能会发出报警,仓库的作业系统是否出了问题……诸如此类。

这个案例跟事务的状态很类似:一旦发现系统中的某个数据对象过了一个限定时间生命周期仍然没有走完,仍然处在某个中间状态,就说明系统不一致了,要进行某种补偿操作(比如重试或报警)。

更复杂一点:订单有状态,库存系统的库存也有状态,优惠系统的优惠券也有状态,根据业务规则,这些状态之间进行比对,就能发现系统某个地方不一致,做相应的补偿。

案例 2:微博的关注关系。需要存两张表,一张是关注表,一张是粉丝表,这两张表各自都是分库分表的。假设 A 关注了 B,需要先以 A 为主键进行分库,存入关注表;再以 B 为主键进行分库,存入粉丝表。也就是说,一次业务操作,要向两个数据库中写入两条数据,如何保证原子性?

案例 3:电商的订单系统也是分库分表的。订单通常有两个常用的查询维度,一个是买家,一个是卖家。如果按买家分库,按卖家查询就不好做;如果按卖家分库,按买家查询就不好做。这种通常会把订单数据冗余一份,按买家进行分库分表存一份,按卖家再分库分表存一份。和案例 2 存在同样的问题:一个订单要向两个数据库中写入两条数据,如何保证原子性?

如果把案例 2、案例 3 的问题看作为一个分布式事务的话,可以用最终一致性、TCC、事务状态表去实现,但这些方法都太重,一个简单的方法是“对账”。

因为两个库的数据是冗余的,可以先保证一个库的数据是准确的,以该库为基准校对另外一个库。

对账又分为全量对账和增量对账:

  1. 全量对账。比如每天晚上运作一个定时任务,比对两个数据库。

  2. 增量对账。可以是一个定时任务,基于数据库的更新时间;也可以基于消息中间件,每一次业务操作都抛出一个消息到消息中间件,然后由一个消费者消费这条消息,对两个数据库中的数据进行比对(当然,消息可能丢失,无法百分之百地保证,还是需要全量对账来兜底)。

总之,对账的关键是要找出“数据背后的数学规律”。有些规律比较直接,谁都能看出来,比如案例 2、案例 3 的冗余数据库;有些规律隐含一些,比如案例 1 的订单履约的状态。找到了规律就可以基于规律进行数据的比对,发现问题,然后补偿。

妥协方案:弱一致性+基于状态的补偿

可以发现:

  • “最终一致性”是一种异步的方法,数据有一定延迟;
  • TCC 是一种同步方法,但 TCC 需要两个阶段,性能损耗较大;
  • 事务状态表也是一种同步方法,但每次要记事务流水,要更新事务状态,很烦琐,性能也有损耗;
  • “对账”也是一个事后过程。

如果需要一个同步的方案,既要让系统之间保持一致性,又要有很高的性能,支持高并发,应该怎么处理呢?

如图所示,电商网站的下单至少需要两个操作:创建订单和扣库存。订单系统有订单的数据库和服务,库存系统有库存的数据库和服务。先创建订单,后扣库存,可能会创建订单成功,扣库存失败;反过来,先扣库存,后创建订单,可能会扣库存成功,创建订单失败。如何保证创建订单+扣库存两个操作的原子性,同时还要能抵抗线上的高并发流量?

400|400

如果用最终一致性方案,因为是异步操作,如果库存扣减不及时会导致超卖,因此最终一致性的方案不可行;如果用 TCC 方案,则意味着一个用户请求要调用两次(Try 和 Confirm)订单服务、两次(Try 和 Confirm)库存服务,性能又达不到要求。如果用事务状态表,要写事务状态,也存在性能问题。

既要满足高并发,又要达到一致性,鱼和熊掌不能兼得。可以利用业务的特性,采用一种弱一致的方案。

对于该需求,有一个关键特性:对于电商的购物来讲,允许少卖,但不能超卖。比如有 100 件东西,卖给 99 个人,有 1 件没有卖出去,这是可以接受的;但如果卖给了 101 个人,其中 1 个人拿不到货,平台违约,这就不能接受。而该处就利用了这个特性,具体做法如下。

方案 1:先扣库存,后创建订单。

如所示,有三种情况:

  1. 扣库存成功,提交订单成功,返回成功。
  2. 扣库存成功,提交订单失败,返回失败,调用方重试(此处可能会多扣库存)。
  3. 扣库存失败,不再提交订单,返回失败,调用方重试(此处可能会多扣库存)。

事务一致性-4.png|400

方案 2:先创建订单,后扣库存。

如表所示,也有三种情况:

  1. 提交订单成功,扣库存成功,返回成功。
  2. 提交订单成功,扣库存失败,返回失败,调用方重试(此处可能会多扣库存)。
  3. 提交订单失败,不再扣库存,调用方重试。

事务一致性-5.png|400

无论方案 1,还是方案 2,只要最终保证库存可以多扣,不能少扣即可。

但是,库存多扣了,数据不一致,怎么补偿呢?

库存每扣一次,都会生成一条流水记录。这条记录的初始状态是“占用”,等订单支付成功后,会把状态改成“释放”。

对于那些过了很长时间一直是占用,而不释放的库存,要么是因为前面多扣造成的,要么是因为用户下了单但没有支付。

通过比对,得到库存系统的“占用又没有释放的库存流水”与订单系统的未支付的订单,就可以回收这些库存,同时把对应的订单取消。类似 12306 网站,过一定时间不支付,订单会取消,将库存释放。

妥协方案 :重试+回滚+报警+人工修复

上文介绍了基于订单的状态+库存流水的状态做补偿(或者说叫对账)。如果业务很复杂,状态的维护也很复杂,就可以采用下面这种更加妥协而简单的方法。

按方案 1,先扣库存,后创建订单。不做状态补偿,为库存系统提供一个回滚接口。创建订单如果失败了,先重试。如果重试还不成功,则回滚库存的扣减。如回滚也失败,则发报警,进行人工干预修复。

总之,根据业务逻辑,通过三次重试或回滚的方法,最大限度地保证一致。实在不一致,就发报警,让人工干预。只要日志流水记录得完整,人工肯定可以修复!通常只要业务逻辑本身没问题,重试、回滚之后还失败的概率会比较低,所以这种办法虽然丑陋,但很实用。

方案选择

可以从算法一致性同步异步同步阻塞问题单点故障问题实现难度并发度系统性能等角度来考虑这三种方案。

扩展补充

  • AT 模式(Automatic Transaction Mode):这是一种对业务无侵入的分布式事务解决方案。用户只需关注自己的“业务 SQL”,而框架会自动生成事务的二阶段提交和回滚操作。
  • Saga 模式:Saga 模式适用于长事务场景,其中包含一系列补偿性事务。每个正向操作都有一个对应的补偿操作,如果某个操作失败,则执行前面所有已成功操作的补偿操作来回滚整个事务。
  • 市面上有很多优秀的中间件,如 DTM、Seata 对分布式事务协调做了很多优化。比如 Seata 支持 AT 模式、TCC 模式、Sega 模式、XA 模式。实际使用的时候,可以基于开源中间件来使用。
  • 2PC 或 TCC 在工业界落地代价很大,不适合互联网场景,所以只有少部分的强一致性业务场景(如金融支付领域)会基于这两种方案实现。大厂一般不会进行锁很严重的分布式事务,尽力修改事务的粒度,分区,分片,分块。
  • 当然还有其他的分布式事务模式,需要的时候可以再做了解。

代码示例

基于 XA 协议的二阶段提交方法

// 库存 + 钱包 + 订单 三个数据库进行分布式事务
package main
import (
   "database/sql"
   "fmt"
   _ "github.com/go-sql-driver/mysql"
   "strconv"
   "time"
)
func main() {
   // 库存的连接
   stockDb, err := sql.Open("mysql", "root:paswd@tcp(127.0.0.1:3306)/shop_product_stock")
   if err != nil {
      panic(err.Error())
   }
   defer stockDb.Close()
   //订单的连接
   orderDb, err := sql.Open("mysql", "root:paswd@tcp(127.0.0.1:3307)/shop_order")
   if err != nil {
      panic(err.Error())
   }
   defer orderDb.Close()
   //钱包的连接
   moneyDb, err := sql.Open("mysql", "root:paswd@tcp(127.0.0.1:3308)/user_money_bag")
   if err != nil {
      panic(err.Error())
   }
   defer moneyDb.Close()
   
   // 生成xid(如果在同一个数据库,子事务不能使用相同xid)
   xid := strconv.FormatInt(time.Now().UnixMilli(), 10)
   //如果后续执行过程有报错,那么回滚所有子事务
   defer func() {
      if err := recover(); err != nil {
         stockDb.Exec("XA ROLLBACK ?", xid)
         orderDb.Exec("XA ROLLBACK ?", xid)
         moneyDb.Exec("XA ROLLBACK ?", xid)
      }
   }()
 
   // 第一阶段 Prepare
   // 库存 子事务启动
   if _, err = stockDb.Exec("XA START ?", xid); err != nil {
      panic(err.Error())
   }
   //扣除库存,这里省略了数据行锁操作
   if _, err = stockDb.Exec("update product_stock set stock=stock-1 where id =1"); err != nil {
      panic(err.Error())
   }
   //事务执行结束
   if _, err = stockDb.Exec("XA END ?", xid); err != nil {
      panic(err.Error())
   }
   //设置库存任务为Prepared状态
   if _, err = stockDb.Exec("XA PREPARE ?", xid); err != nil {
      panic(err.Error())
   }
 
   // 订单 子事务启动
   if _, err = orderDb.Exec("XA START ?", xid); err != nil {
      panic(err.Error())
   }
   //创建订单
   if _, err = orderDb.Exec("insert shop_order(id,pid,xx) value (1,2,3)"); err != nil {
      panic(err.Error())
   }
   //事务执行结束
   if _, err = orderDb.Exec("XA END ?", xid); err != nil {
      panic(err.Error())
   }
   //设置任务为Prepared状态
   if _, err = orderDb.Exec("XA PREPARE ?", xid); err != nil {
      panic(err.Error())
   }
   
   // 钱包 子事务启动
   if _, err = moneyDb.Exec("XA START ?", xid); err != nil {
      panic(err.Error())
   }
   //扣减用户账户现金,这里省略了数据行锁操作
   if _, err = moneyDb.Exec("update user_money_bag set money=money-1 where id =9527"); err != nil {
      panic(err.Error())
   }
   //事务执行结束
   if _, err = moneyDb.Exec("XA END ?", xid); err != nil {
      panic(err.Error())
   }
   //设置任务为Prepared状态
   if _, err = moneyDb.Exec("XA PREPARE ?", xid); err != nil {
      panic(err.Error())
   }
   // 在这时,如果链接断开、Prepared状态的XA事务仍旧在MySQL存在
   // 任意一个链接调用XA RECOVER都能够看到这三个没有最终提交的事务
   
   // --------
   // 第二阶段 运行到这里没有任何问题
   // 那么执行 commit
   // --------
   if _, err = stockDb.Exec("XA COMMIT ?", xid); err != nil {
      panic(err.Error())
   }
   if _, err = orderDb.Exec("XA COMMIT ?", xid); err != nil {
      panic(err.Error())
   }
   if _, err = moneyDb.Exec("XA COMMIT ?", xid); err != nil {
      panic(err.Error())
   }
   //到这里全部流程完毕
}

基于 RocketMQ 的分布式事务

public class CreateOrderService {
 
  @Inject
  private OrderDao orderDao; // 注入订单表的 DAO
  @Inject
  private ExecutorService executorService; // 注入一个 ExecutorService
 
  private TransactionMQProducer producer;
 
  // 初始化 transactionListener 和 producer
  @Init
  public void init() throws MQClientException {
    TransactionListener transactionListener = createTransactionListener();
    producer = new TransactionMQProducer("myGroup");
    producer.setExecutorService(executorService);
    producer.setTransactionListener(transactionListener);
    producer.start();
  }
 
  // 创建订单服务的请求入口
  @PUT
  @RequestMapping(...)
  public boolean createOrder(@RequestBody CreateOrderRequest request) {
    // 根据创建订单请求创建一条消息
    Message msg = createMessage(request);
    // 发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, request);
    // 返回:事务是否成功
    return sendResult.getSendStatus() == SendStatus.SEND_OK;
  }
 
  private TransactionListener createTransactionListener() {
    return new TransactionListener() {
      @Override
      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        CreateOrderRequest request = (CreateOrderRequest ) arg;
        try {
          // 执行本地事务创建订单
          orderDao.createOrderInDB(request);
          // 如果没抛异常说明执行成功,提交事务消息
          return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Throwable t) {
          // 失败则直接回滚事务消息
          return LocalTransactionState.ROLLBACK_MESSAGE;
        }
      }
      // 反查本地事务
      @Override
      public LocalTransactionState checkLocalTransaction(MessageExt msg) {、
        // 从消息中获得订单 ID
        String orderId = msg.getUserProperty("orderId");
 
        // 去数据库中查询订单号是否存在,如果存在则提交事务;
        // 如果不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回 UNKNOW
        //(PS:这里 RocketMQ 有个拼写错误:UNKNOW)
        return orderDao.isOrderIdExistsInDB(orderId)?
                LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
      }
    };
  }
 
    //....
}

扩展问题

对于 2PC 或者 3PC 来说,若在提交执行阶段数据库挂了,那么事务怎么办?

  • 当进行确认的时候,数据库会将信息写入到日志中,确保在服务恢复的时候能继续运行。

基于 MQ 的最终一致性,如果前几个模块运行完成,最后一个模块失败了,那么前几个模块如何回滚或恢复?

  • 可以类似于购物的时候,订单下单成功,付款也完成了,后面货物不够了的时候,自动发起退款的情况。
  • 即,引入重试+补偿的机制来解决失败模块的问题。

参考链接