举个栗子
假如你正在开发一个电商系统, 该系统采用微服务架构, 订单、库存、支付服务分别部署, 各自独立数据库。
下单时需要创建订单、扣减商品库存、扣减用户资金、发送短信, 你会如何编写代码?

1 2 3 4 5 6 7 8 9 10 11 12 13
   |  OrderModel::create($order);
 
  $this->inventoryService->decrement($orderItems);
 
  $this->walletService->decrement($wallet_id, $amount);
 
  $this->smsService->send($notication);
  return "订单创建成功!";
 
  | 
 
让我们假设每行代码都可能出现异常, 阁下该如何应对?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
   | try {     Db::beginTransaction();        OrderModel::create($order);        $inventoryDecrement = $this->inventoryService->decrement($orderItem);
      $walletDecrement = $this->walletService->decrement($walltId, $amount);        $this->smsService->send($notication);        Db::commit();        return "订单创建成功!"; } catch (\Throwable $throwable) {        if ($inventoryDecrement) {     		$this->inventoryService->increment($orderItem);     }        if ($walletDecrement) {     		$this->walletService->increment($walltId, $amount);     }        Db::rollBack();        return "订单创建失败!"; }
  | 
 
假如在事务Commit 时, 恰巧Pod 重启了, 阁下又该如何应对?
假如在回滚资金时, 恰巧出现了请求超时, 阁下叒该如何应对?
假如短信发送没有补偿接口, 阁下又双叒叕该如何应对?
SAGA
在上述的例子中, 其实已经有了初步的分布式事务的概念。
如库存扣减与资金扣减失败后, 手动进行补偿的思想, 就是SAGA 型事务!


我们用Saga 重写一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   |  $gid = $this->saga->init();
 
  $this->saga->add(     $orderService . '/create',     $orderService . '/delete',     ['order_info' => $order], )->add(     $inventoryService . '/decrement',		# 分支事务     $inventoryService . '/increment',     ['items' => $orderItem], )->add(     $walletService . '/decrement',     $walletService . '/increment',     ['wallet_id' => $walletId, 'amount' => $amount], )->add(     $smsService . '/send',     '',     ['notication' => $notication], );
 
  return $this->saga->submit(); 
 
  | 
 
本地事务封装为远程调用, 由DTM 回调触发!
需要注意, 在Saga 事务Submit 时, 返回的响应仅仅是DTM 接收请求并落库的结果, 而非分支事务的响应结果! 且DTM 在判定分支事务是否成功时, 是根据特定的StatusCode, 有且仅有409 代表失败!
由于分布式事务涉及分布式协作,某些参与者可能出现暂时不可用或者返回500等异常情况是在所难免的。这些暂时不可用和500,与业务上的失败有非常大的区别。
- SUCCESS: 状态码 200 StatusOK
 
- FAILURE: 状态码 409 StatusConflict
 
- ONGOING: 状态码 425 StatusTooEarly , 表示未完成,还在正常进行中,此时DTM 服务器需要采用固定间隔重试,而不是指数退避算法重试
 
- 其他: 表示临时错误,采用指数退避算法重试,避免出现大量重试,导致负载过高
 
DTM 在SAGA 型事务中扮演的角色, 其实仅仅是一个数据库, 外加请求重试。。。

假如在事务提交前, 恰巧Pod 重启了, 阁下叒该如何应对?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   | try {     Db::beginTransaction();        OrderModel::create($order);        $inventoryDecrement = $this->inventoryService->decrement($orderItem);
      $walletDecrement = $this->walletService->decrement($walletId, $amount);        $this->smsService->send($notication);        Db::commit();        return "订单创建成功!"; } catch (\Throwable $throwable) {
  | 
 
由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 再给用户B扣减余额。如果给A用户充值成功, 在B扣减成功之前, A用户把余额消费掉了, 这时事务发生回滚, 则无法进行补偿了。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:
- 业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款。
 
总结
Saga 仅能最低限度保证事务性, 一般用在业务时间较长、业务重要性较低或者非同步响应的场景。
幂等
假如在回滚资金时, 恰巧出现了请求超时, 阁下叒该如何应对?
1 2 3 4
   | if ($walletDecrement) {      		$this->walletService->increment($walletId, $amount); }
  | 
 
重试吗? 若超时的请求实际上已经成功了, 会出现用户资金多次增加;
不重试吗? 超时的请求没有成功的话, 资金会凭空丢失, 用户直接”C 语言”投诉;
定义
聊聊幂等
任意多次执行所产生的影响均与一次执行的影响相同。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | function makeZero(int $num): int {     return $num * 0; }
 
  function setTrue(): bool {     return $this->foo = true; }
 
  function increment(int $amount): int {     return $this->amount += $amount; }
   | 
 
场景举例, 博客 / 微博系统点赞:
1
   | UPDATE post SET likes = likes + 1 WHERE post_id = 1981273;
   | 
 
上述设计为反面案例。正常情况下应该设计为:
1 2 3 4 5 6 7 8
   | # 幂等 START TRANSACTION;
      INSERT INTO post_likes (user_id, post_id) VALUES (10086, 1981273); # 唯一索引          UPDATE post SET likes = likes + 1 WHERE post_id = 1981273;
  COMMIT;
   | 
 
那我们来重新设计一下资产服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | class WalletService  {     public function increment(int $walletId, string $orderNo, int $amount): bool   	{     		try {     				Db::beginTransaction();                             WalletLog::create(['order_no' => $orderNo, 'amount' => $amount, 'wallet_id' => $walletId]);
                         	Wallet::find($walletId)->increment($amount);                        	Db::commit();                    		return true; 				} catch (UniqueIndexException $exception) {             Db::rollback();                   			return true;	 				} catch (\Throwable $throwable) {             Db::rollback();                        return false;         }     } }
   | 
 
流水表可以反推出任意时间点的用户余额, 流水重于余额。
总结
通过在事务里, 向具有唯一索引的流水表插入数据, 确保操作数值幂等, 是常用的幂等策略。
还有其他的实现方式, 比如请求中携带request_id进行系统级幂等、DTM 使用gid全局事务ID 进行幂等…
TCC
在上面SAGA 代码中, 事务一致性比较弱, 并且是异步完成, 无法满足强一致性与同步响应的业务场景, 而这种场景往往是普遍且核心的场景。
TCC是Try、Confirm、Cancel三个词语的缩写, 适用于一致性要求较强, 事务逻辑较短的业务场景。
下面我们用TCC 重写一下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
   | try {
      $gid = $this->tcc->init();
      Db::beginTransaction();
      $orderNo = OrderModel::create($order);
           $inventoryResult = $this->tcc->callBranch(         [             'order_no' => $orderNo,             'order_item' => $orderItem,         ],         $inventoryService . '/tcc/decrementTry',         $inventoryService . '/tcc/decrementConfirm',         $inventoryService . '/tcc/decrementCancel'     );
           $walletServiceResult = $this->tcc->callBranch(         [             'order_no' => $orderNo,             'amount' => $amount,             'wallet_id' => $walletId         ],         $walletService . '/tcc/decrementTry',         $walletService . '/tcc/decrementConfirm',         $walletService . '/tcc/decrementCancel'     );
           $smsServiceResult = $this->tcc->callBranch(         [             'order_no' => $orderNo,             'notication' => $notication,         ],         $smsService . '/tcc/sendTry',         $smsService . '/tcc/sendConfirm',         $smsService . '/tcc/sendCancel'     );
           $this->tcc->submit();        Db::commit();
      return "订单创建成功!"; } catch (\Throwable $throwable) {
           $this->tcc->abort();
      Db::rollBack();
      return "订单创建失败!"; }
  | 
 
上述代码可以看到TCC 的特点:
优点:
- 无需将本地事务异步化
 
- 同步获取分支事务执行结果
 
- 调用方比较简单…
 
缺点:
- 服务需要实现Try / Confirm / Cancel
 
- 流水表需要pending 状态
 
待定状态
每个操作接口都需要Try、Confirm、Cancel三个接口, 流水表需要pending 状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | class WalletService {     #[Transaction]   	public function decrementTry(string $orderNo, int $walletId, int $amount)     {                  WalletLog::create([             'order_no' => $orderNo,              'amount' => $amount,              'wallet_id' => $walletId,             'status' => 'pending', # 未确认状态, 最终Confirm 或Cancel         ]);
                   return Wallet::find($walletId)->decrement($amount);     }      	public function decrementConfirm(string $orderNo)     {         WalletLog::where(['order_no' => $orderNo])->update(['status' => 'confirm']);         return true;     }        #[Transaction]   	public function decrementCancel(string $orderNo)     {                  WalletLog::where(['order_no' => $orderNo, 'status' => 'pending'])->update(['status' => 'cancel']);                         Wallet::find($walletId)->increment($amount);         return true;     } }
   | 
 
客户端的列表接口需要过滤peding、cancel 状态的流水, 不提供给用户查看。
资源先占后补
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | class WalletService {     #[Transaction]   	public function incrementTry(string $orderNo, int $walletId, int $amount)     {         WalletLog::create([             'order_no' => $orderNo,              'amount' => $amount,              'wallet_id' => $walletId,             'status' => 'pending', # 未确认状态, 最终Confirm 或Cancel         ]);
                            return true;     }        #[Transaction]   	public function incrementConfirm(string $orderNo)     {                  WalletLog::where(['order_no' => $orderNo 'status' => 'pending'])->update(['status' => 'confirm']);                         Wallet::find($walletId)->increment($amount);         return true;     }      	public function incrementCancel(string $orderNo)     {         WalletLog::where(['order_no' => $orderNo])->update(['status' => 'cancel']);         return true;     } }
   | 
 
第三方操作
针对部分第三方操作无法回滚, 一般有两种办法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
   | class SmsService {   	public function sendTry(string $phone, string $notication)     {                  return Sms::check($phone, $notication);     }      	public function sendConfirm(string $phone, string $notication)     {                  return Sms::send([             'phone' => $phone,             'notication' => $notication,         ]);     }      	public function sendCancel()     {         return true;     } }
 
 
  class SmsService {   	public function sendTry(string $phone, string $notication)     {       	Sms::check($phone, $notication);         return Sms::create(             'phone' => $phone,             'notication' => $notication,             'status' => 'pending',         );     }      	public function sendConfirm(string $phone, string $notication)     {         return Sms::where([             'phone' => $phone,             'notication' => $notication,         ])->update(['status' => 'confirm']);     }      	public function sendCancel(string $phone, string $notication)     {         return Sms::where([             'phone' => $phone,             'notication' => $notication,         ])->update(['status' => 'cancel']);     } }
   | 
 
子事务屏障

业务处理请求4的时候,Cancel 在Try 之前执行,需要处理空补偿
 
业务处理请求6的时候,Cancel 重复执行,需要幂等
 
业务处理请求8的时候,Try 在Cancel 后执行,需要处理悬挂
 
空补偿: Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回
 
悬挂: Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回
 
幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性
 

子事务屏障原理
原则
- 任何请求都要考虑失败
 
- 接口响应错误 != 业务失败
 
资料
DTM 文档
dtm-php-client