分布式事务极速入门

举个栗子

假如你正在开发一个电商系统, 该系统采用微服务架构, 订单、库存、支付服务分别部署, 各自独立数据库。
下单时需要创建订单、扣减商品库存、扣减用户资金、发送短信, 你会如何编写代码?

1
2
3
4
5
6
7
8
9
10
11
12
13
# 创建订单
OrderModel::create($order);

# 扣减库存
$this->inventoryService->decrement($orderItems);

# 扣减资金
$this->walletService->decrement($wallet_id, $amount);

# 发送短信
$this->smsService->send($notication);

return "订单创建成功!";

让我们假设每行代码都可能出现异常, 阁下该如何应对?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
try {
Db::beginTransaction();

OrderModel::create($order);

$inventoryDecrement = $this->inventoryService->decrement($orderItem);

$walletDecrement = $this->walletService->decrement($walltId, $amount);

$this->smsService->send($notication);

Db::commit();

return "订单创建成功!";
} catch (\Throwable $throwable) {

if ($inventoryDecrement) {
$this->inventoryService->increment($orderItem);
}

if ($walletDecrement) {
$this->walletService->increment($walltId, $amount);
}

Db::rollBack();

return "订单创建失败!";
}

假如在事务Commit 时, 恰巧Pod 重启了, 阁下又该如何应对?

假如在回滚资金时, 恰巧出现了请求超时, 阁下叒该如何应对?

假如短信发送没有补偿接口, 阁下又双叒叕该如何应对?


SAGA

在上述的例子中, 其实已经有了初步的分布式事务的概念。

库存扣减资金扣减失败后, 手动进行补偿的思想, 就是SAGA 型事务!


我们用Saga 重写一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 获取全局事务ID
$gid = $this->saga->init();

# 编排业务流程, Service 均是URL
$this->saga->add(
$orderService . '/create',
$orderService . '/delete',
['order_info' => $order],
)->add(
$inventoryService . '/decrement', # 分支事务
$inventoryService . '/increment',
['items' => $orderItem],
)->add(
$walletService . '/decrement',
$walletService . '/increment',
['wallet_id' => $walletId, 'amount' => $amount],
)->add(
$smsService . '/send',
'',
['notication' => $notication],
);

# 提交全局事务
return $this->saga->submit(); // {"dtm_result":"SUCCESS"}

本地事务封装为远程调用, 由DTM 回调触发!

需要注意, 在Saga 事务Submit 时, 返回的响应仅仅是DTM 接收请求并落库的结果, 而非分支事务的响应结果! 且DTM 在判定分支事务是否成功时, 是根据特定的StatusCode, 有且仅有409 代表失败!

由于分布式事务涉及分布式协作,某些参与者可能出现暂时不可用或者返回500等异常情况是在所难免的。这些暂时不可用和500,与业务上的失败有非常大的区别。

  • SUCCESS: 状态码 200 StatusOK
  • FAILURE: 状态码 409 StatusConflict
  • ONGOING: 状态码 425 StatusTooEarly , 表示未完成,还在正常进行中,此时DTM 服务器需要采用固定间隔重试,而不是指数退避算法重试
  • 其他: 表示临时错误,采用指数退避算法重试,避免出现大量重试,导致负载过高

DTM 在SAGA 型事务中扮演的角色, 其实仅仅是一个数据库, 外加请求重试。。。
image.png

假如在事务提交前, 恰巧Pod 重启了, 阁下叒该如何应对?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
Db::beginTransaction();

OrderModel::create($order);

$inventoryDecrement = $this->inventoryService->decrement($orderItem);

$walletDecrement = $this->walletService->decrement($walletId, $amount);

$this->smsService->send($notication);

Db::commit();

return "订单创建成功!";
} catch (\Throwable $throwable) {

由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 再给用户B扣减余额。如果给A用户充值成功, 在B扣减成功之前, A用户把余额消费掉了, 这时事务发生回滚, 则无法进行补偿了。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:

  • 业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款。

总结

Saga 仅能最低限度保证事务性, 一般用在业务时间较长、业务重要性较低或者非同步响应的场景。


幂等

假如在回滚资金时, 恰巧出现了请求超时, 阁下叒该如何应对?

1
2
3
4
if ($walletDecrement) {
# 请求超时...
$this->walletService->increment($walletId, $amount);
}

重试吗? 若超时的请求实际上已经成功了, 会出现用户资金多次增加;

不重试吗? 超时的请求没有成功的话, 资金会凭空丢失, 用户直接”C 语言”投诉;

定义

聊聊幂等

任意多次执行所产生的影响均与一次执行的影响相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function makeZero(int $num): int
{
return $num * 0;
}


function setTrue(): bool
{
return $this->foo = true;
}


function increment(int $amount): int
{
return $this->amount += $amount;
}

场景举例, 博客 / 微博系统点赞:

1
UPDATE post SET likes = likes + 1 WHERE post_id = 1981273;

上述设计为反面案例。正常情况下应该设计为:

1
2
3
4
5
6
7
8
# 幂等
START TRANSACTION;

INSERT INTO post_likes (user_id, post_id) VALUES (10086, 1981273); # 唯一索引

UPDATE post SET likes = likes + 1 WHERE post_id = 1981273;

COMMIT;

那我们来重新设计一下资产服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class WalletService 
{
public function increment(int $walletId, string $orderNo, int $amount): bool
{
try {
Db::beginTransaction();

# order_no 唯一索引
WalletLog::create(['order_no' => $orderNo, 'amount' => $amount, 'wallet_id' => $walletId]);

# 流水优先于余额
Wallet::find($walletId)->increment($amount);

Db::commit();

return true;
} catch (UniqueIndexException $exception) {
Db::rollback();

return true; # 注意这里
} catch (\Throwable $throwable) {
Db::rollback();

return false;
}
}
}

流水表可以反推出任意时间点的用户余额, 流水重于余额。

总结

通过在事务里, 向具有唯一索引的流水表插入数据, 确保操作数值幂等, 是常用的幂等策略。
还有其他的实现方式, 比如请求中携带request_id进行系统级幂等、DTM 使用gid全局事务ID 进行幂等…


TCC

在上面SAGA 代码中, 事务一致性比较弱, 并且是异步完成, 无法满足强一致性与同步响应的业务场景, 而这种场景往往是普遍且核心的场景。

TCC是Try、Confirm、Cancel三个词语的缩写, 适用于一致性要求较强, 事务逻辑较短的业务场景。
下面我们用TCC 重写一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
try {

$gid = $this->tcc->init();

Db::beginTransaction();

$orderNo = OrderModel::create($order);

# 调用库存服务, 同步获取扣减相应
$inventoryResult = $this->tcc->callBranch(
[
'order_no' => $orderNo,
'order_item' => $orderItem,
],
$inventoryService . '/tcc/decrementTry',
$inventoryService . '/tcc/decrementConfirm',
$inventoryService . '/tcc/decrementCancel'
);

# 调用资金服务, 同步获取扣减相应
$walletServiceResult = $this->tcc->callBranch(
[
'order_no' => $orderNo,
'amount' => $amount,
'wallet_id' => $walletId
],
$walletService . '/tcc/decrementTry',
$walletService . '/tcc/decrementConfirm',
$walletService . '/tcc/decrementCancel'
);

# 调用短信服务, 同步获取相应(资源预占, 不发送)
$smsServiceResult = $this->tcc->callBranch(
[
'order_no' => $orderNo,
'notication' => $notication,
],
$smsService . '/tcc/sendTry',
$smsService . '/tcc/sendConfirm',
$smsService . '/tcc/sendCancel'
);

# DTM 回调各分支事务Confirm 接口
$this->tcc->submit();

Db::commit();

return "订单创建成功!";
} catch (\Throwable $throwable) {

# DTM 回调各分支事务Cancel 接口
$this->tcc->abort();

Db::rollBack();

return "订单创建失败!";
}

上述代码可以看到TCC 的特点:
优点:

  • 无需将本地事务异步化
  • 同步获取分支事务执行结果
  • 调用方比较简单…

缺点:

  • 服务需要实现Try / Confirm / Cancel
  • 流水表需要pending 状态

待定状态

每个操作接口都需要Try、Confirm、Cancel三个接口, 流水表需要pending 状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class WalletService
{
#[Transaction]
public function decrementTry(string $orderNo, int $walletId, int $amount)
{
# order_no 唯一索引
WalletLog::create([
'order_no' => $orderNo,
'amount' => $amount,
'wallet_id' => $walletId,
'status' => 'pending', # 未确认状态, 最终Confirm 或Cancel
]);

# 资源预占, 负数报错
return Wallet::find($walletId)->decrement($amount);
}

public function decrementConfirm(string $orderNo)
{
WalletLog::where(['order_no' => $orderNo])->update(['status' => 'confirm']);
return true;
}

#[Transaction]
public function decrementCancel(string $orderNo)
{
# 乐观锁
WalletLog::where(['order_no' => $orderNo, 'status' => 'pending'])->update(['status' => 'cancel']);

# 资源解冻
Wallet::find($walletId)->increment($amount);
return true;
}
}

客户端的列表接口需要过滤peding、cancel 状态的流水, 不提供给用户查看。

资源先占后补

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class WalletService
{
#[Transaction]
public function incrementTry(string $orderNo, int $walletId, int $amount)
{
WalletLog::create([
'order_no' => $orderNo,
'amount' => $amount,
'wallet_id' => $walletId,
'status' => 'pending', # 未确认状态, 最终Confirm 或Cancel
]);

# 增加资源切勿直接加
# Wallet::find($walletId)->increment($amount);
return true;
}

#[Transaction]
public function incrementConfirm(string $orderNo)
{
# 乐观锁
WalletLog::where(['order_no' => $orderNo 'status' => 'pending'])->update(['status' => 'confirm']);

# 资源增加
Wallet::find($walletId)->increment($amount);
return true;
}

public function incrementCancel(string $orderNo)
{
WalletLog::where(['order_no' => $orderNo])->update(['status' => 'cancel']);
return true;
}
}

第三方操作

针对部分第三方操作无法回滚, 一般有两种办法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class SmsService
{
public function sendTry(string $phone, string $notication)
{
# 检查是否可以发送, 确保Confirm 不会失败
return Sms::check($phone, $notication);
}

public function sendConfirm(string $phone, string $notication)
{
# 调用N 次可不能发送N 次
return Sms::send([
'phone' => $phone,
'notication' => $notication,
]);
}

public function sendCancel()
{
return true;
}
}


# 仅录入数据库, 异步执行
class SmsService
{
public function sendTry(string $phone, string $notication)
{
Sms::check($phone, $notication);
return Sms::create(
'phone' => $phone,
'notication' => $notication,
'status' => 'pending',
);
}

public function sendConfirm(string $phone, string $notication)
{
return Sms::where([
'phone' => $phone,
'notication' => $notication,
])->update(['status' => 'confirm']);
}

public function sendCancel(string $phone, string $notication)
{
return Sms::where([
'phone' => $phone,
'notication' => $notication,
])->update(['status' => 'cancel']);
}
}

子事务屏障

  • 业务处理请求4的时候,Cancel 在Try 之前执行,需要处理空补偿

  • 业务处理请求6的时候,Cancel 重复执行,需要幂等

  • 业务处理请求8的时候,Try 在Cancel 后执行,需要处理悬挂

  • 空补偿: Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回

  • 悬挂: Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回

  • 幂等: 由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性

子事务屏障原理

原则

  • 任何请求都要考虑失败
  • 接口响应错误 != 业务失败

资料

DTM 文档
dtm-php-client