举个栗子
假如你正在开发一个电商系统, 该系统采用微服务架构, 订单、库存、支付服务分别部署, 各自独立数据库。
下单时需要创建订单、扣减商品库存、扣减用户资金、发送短信, 你会如何编写代码?
1 2 3 4 5 6 7 8 9 10 11 12 13
| OrderModel::create($order);
$this->inventoryService->decrement($orderItems);
$this->walletService->decrement($wallet_id, $amount);
$this->smsService->send($notication);
return "订单创建成功!";
|
让我们假设每行代码都可能出现异常, 阁下该如何应对?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| try { Db::beginTransaction(); OrderModel::create($order); $inventoryDecrement = $this->inventoryService->decrement($orderItem);
$walletDecrement = $this->walletService->decrement($walltId, $amount); $this->smsService->send($notication); Db::commit(); return "订单创建成功!"; } catch (\Throwable $throwable) { if ($inventoryDecrement) { $this->inventoryService->increment($orderItem); } if ($walletDecrement) { $this->walletService->increment($walltId, $amount); } Db::rollBack(); return "订单创建失败!"; }
|
假如在事务Commit 时, 恰巧Pod 重启了, 阁下又该如何应对?
假如在回滚资金时, 恰巧出现了请求超时, 阁下叒该如何应对?
假如短信发送没有补偿接口, 阁下又双叒叕该如何应对?
SAGA
在上述的例子中, 其实已经有了初步的分布式事务的概念。
如库存扣减
与资金扣减
失败后, 手动进行补偿的思想, 就是SAGA 型事务!
我们用Saga 重写一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| $gid = $this->saga->init();
$this->saga->add( $orderService . '/create', $orderService . '/delete', ['order_info' => $order], )->add( $inventoryService . '/decrement', # 分支事务 $inventoryService . '/increment', ['items' => $orderItem], )->add( $walletService . '/decrement', $walletService . '/increment', ['wallet_id' => $walletId, 'amount' => $amount], )->add( $smsService . '/send', '', ['notication' => $notication], );
return $this->saga->submit();
|
本地事务封装为远程调用, 由DTM 回调触发!
需要注意, 在Saga 事务Submit 时, 返回的响应仅仅是DTM 接收请求并落库的结果, 而非分支事务的响应结果! 且DTM 在判定分支事务是否成功时, 是根据特定的StatusCode, 有且仅有409 代表失败!
由于分布式事务涉及分布式协作,某些参与者可能出现暂时不可用或者返回500等异常情况是在所难免的。这些暂时不可用和500,与业务上的失败有非常大的区别。
- SUCCESS: 状态码 200 StatusOK
- FAILURE: 状态码 409 StatusConflict
- ONGOING: 状态码 425 StatusTooEarly , 表示未完成,还在正常进行中,此时DTM 服务器需要采用固定间隔重试,而不是指数退避算法重试
- 其他: 表示临时错误,采用指数退避算法重试,避免出现大量重试,导致负载过高
DTM 在SAGA 型事务中扮演的角色, 其实仅仅是一个数据库, 外加请求重试。。。
假如在事务提交前, 恰巧Pod 重启了, 阁下叒该如何应对?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| try { Db::beginTransaction(); OrderModel::create($order); $inventoryDecrement = $this->inventoryService->decrement($orderItem);
$walletDecrement = $this->walletService->decrement($walletId, $amount); $this->smsService->send($notication); Db::commit(); return "订单创建成功!"; } catch (\Throwable $throwable) {
|
由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 再给用户B扣减余额。如果给A用户充值成功, 在B扣减成功之前, A用户把余额消费掉了, 这时事务发生回滚, 则无法进行补偿了。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:
- 业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款。
总结
Saga 仅能最低限度保证事务性, 一般用在业务时间较长、业务重要性较低或者非同步响应的场景。
幂等
假如在回滚资金时, 恰巧出现了请求超时, 阁下叒该如何应对?
1 2 3 4
| if ($walletDecrement) { $this->walletService->increment($walletId, $amount); }
|
重试吗? 若超时的请求实际上已经成功了, 会出现用户资金多次增加;
不重试吗? 超时的请求没有成功的话, 资金会凭空丢失, 用户直接”C 语言”投诉;
定义
聊聊幂等
任意多次执行所产生的影响均与一次执行的影响相同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| function makeZero(int $num): int { return $num * 0; }
function setTrue(): bool { return $this->foo = true; }
function increment(int $amount): int { return $this->amount += $amount; }
|
场景举例, 博客 / 微博系统点赞:
1
| UPDATE post SET likes = likes + 1 WHERE post_id = 1981273;
|
上述设计为反面案例。正常情况下应该设计为:
1 2 3 4 5 6 7 8
| # 幂等 START TRANSACTION;
INSERT INTO post_likes (user_id, post_id) VALUES (10086, 1981273); # 唯一索引 UPDATE post SET likes = likes + 1 WHERE post_id = 1981273;
COMMIT;
|
那我们来重新设计一下资产服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class WalletService { public function increment(int $walletId, string $orderNo, int $amount): bool { try { Db::beginTransaction(); WalletLog::create(['order_no' => $orderNo, 'amount' => $amount, 'wallet_id' => $walletId]);
Wallet::find($walletId)->increment($amount); Db::commit(); return true; } catch (UniqueIndexException $exception) { Db::rollback(); return true; } catch (\Throwable $throwable) { Db::rollback(); return false; } } }
|
流水表可以反推出任意时间点的用户余额, 流水重于余额。
总结
通过在事务里, 向具有唯一索引的流水表插入数据, 确保操作数值幂等, 是常用的幂等策略。
还有其他的实现方式, 比如请求中携带request_id
进行系统级幂等、DTM 使用gid
全局事务ID 进行幂等…
TCC
在上面SAGA 代码中, 事务一致性比较弱, 并且是异步完成, 无法满足强一致性与同步响应的业务场景, 而这种场景往往是普遍且核心的场景。
TCC是Try、Confirm、Cancel三个词语的缩写, 适用于一致性要求较强, 事务逻辑较短的业务场景。
下面我们用TCC 重写一下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| try {
$gid = $this->tcc->init();
Db::beginTransaction();
$orderNo = OrderModel::create($order);
$inventoryResult = $this->tcc->callBranch( [ 'order_no' => $orderNo, 'order_item' => $orderItem, ], $inventoryService . '/tcc/decrementTry', $inventoryService . '/tcc/decrementConfirm', $inventoryService . '/tcc/decrementCancel' );
$walletServiceResult = $this->tcc->callBranch( [ 'order_no' => $orderNo, 'amount' => $amount, 'wallet_id' => $walletId ], $walletService . '/tcc/decrementTry', $walletService . '/tcc/decrementConfirm', $walletService . '/tcc/decrementCancel' );
$smsServiceResult = $this->tcc->callBranch( [ 'order_no' => $orderNo, 'notication' => $notication, ], $smsService . '/tcc/sendTry', $smsService . '/tcc/sendConfirm', $smsService . '/tcc/sendCancel' );
$this->tcc->submit(); Db::commit();
return "订单创建成功!"; } catch (\Throwable $throwable) {
$this->tcc->abort();
Db::rollBack();
return "订单创建失败!"; }
|
上述代码可以看到TCC 的特点:
优点:
- 无需将本地事务异步化
- 同步获取分支事务执行结果
- 调用方比较简单…
缺点:
- 服务需要实现Try / Confirm / Cancel
- 流水表需要pending 状态
待定状态
每个操作接口都需要Try、Confirm、Cancel三个接口, 流水表需要pending 状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class WalletService { #[Transaction] public function decrementTry(string $orderNo, int $walletId, int $amount) { WalletLog::create([ 'order_no' => $orderNo, 'amount' => $amount, 'wallet_id' => $walletId, 'status' => 'pending', # 未确认状态, 最终Confirm 或Cancel ]);
return Wallet::find($walletId)->decrement($amount); } public function decrementConfirm(string $orderNo) { WalletLog::where(['order_no' => $orderNo])->update(['status' => 'confirm']); return true; } #[Transaction] public function decrementCancel(string $orderNo) { WalletLog::where(['order_no' => $orderNo, 'status' => 'pending'])->update(['status' => 'cancel']); Wallet::find($walletId)->increment($amount); return true; } }
|
客户端的列表接口需要过滤peding、cancel 状态的流水, 不提供给用户查看。
资源先占后补
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class WalletService { #[Transaction] public function incrementTry(string $orderNo, int $walletId, int $amount) { WalletLog::create([ 'order_no' => $orderNo, 'amount' => $amount, 'wallet_id' => $walletId, 'status' => 'pending', # 未确认状态, 最终Confirm 或Cancel ]);
return true; } #[Transaction] public function incrementConfirm(string $orderNo) { WalletLog::where(['order_no' => $orderNo 'status' => 'pending'])->update(['status' => 'confirm']); Wallet::find($walletId)->increment($amount); return true; } public function incrementCancel(string $orderNo) { WalletLog::where(['order_no' => $orderNo])->update(['status' => 'cancel']); return true; } }
|
第三方操作
针对部分第三方操作无法回滚, 一般有两种办法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| class SmsService { public function sendTry(string $phone, string $notication) { return Sms::check($phone, $notication); } public function sendConfirm(string $phone, string $notication) { return Sms::send([ 'phone' => $phone, 'notication' => $notication, ]); } public function sendCancel() { return true; } }
class SmsService { public function sendTry(string $phone, string $notication) { Sms::check($phone, $notication); return Sms::create( 'phone' => $phone, 'notication' => $notication, 'status' => 'pending', ); } public function sendConfirm(string $phone, string $notication) { return Sms::where([ 'phone' => $phone, 'notication' => $notication, ])->update(['status' => 'confirm']); } public function sendCancel(string $phone, string $notication) { return Sms::where([ 'phone' => $phone, 'notication' => $notication, ])->update(['status' => 'cancel']); } }
|
子事务屏障
业务处理请求4的时候,Cancel 在Try 之前执行,需要处理空补偿
业务处理请求6的时候,Cancel 重复执行,需要幂等
业务处理请求8的时候,Try 在Cancel 后执行,需要处理悬挂
空补偿: Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回
悬挂: Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回
幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性
子事务屏障原理
原则
- 任何请求都要考虑失败
- 接口响应错误 != 业务失败
资料
DTM 文档
dtm-php-client