目錄
- 分布式事務(wù)
- SAGA
- SAGA實踐
- 處理網(wǎng)絡(luò)異常
- 處理回滾
- 小結(jié)
銀行跨行轉(zhuǎn)賬業(yè)務(wù)是一個典型分布式事務(wù)場景,假設(shè) A 需要跨行轉(zhuǎn)賬給 B,那么就涉及兩個銀行的數(shù)據(jù),無法通過一個數(shù)據(jù)庫的本地事務(wù)保證轉(zhuǎn)賬的 ACID,只能夠通過分布式事務(wù)來解決。
分布式事務(wù)
分布式事務(wù)在分布式環(huán)境下,為了滿足可用性、性能與降級服務(wù)的需要,降低一致性與隔離性的要求,一方面遵循 BASE 理論:
- 基本業(yè)務(wù)可用性( Basic Availability )
- 柔性狀態(tài)( Soft state )
- 最終一致性( Eventual consistency )
- 另一方面,分布式事務(wù)也部分遵循 ACID 規(guī)范:
- 原子性:嚴(yán)格遵循
- 一致性:事務(wù)完成后的一致性嚴(yán)格遵循;事務(wù)中的一致性可適當(dāng)放寬
- 隔離性:并行事務(wù)間不可影響;事務(wù)中間結(jié)果可見性允許安全放寬
- 持久性:嚴(yán)格遵循
SAGA
Saga 是這一篇數(shù)據(jù)庫論文SAGAS提到的一個分布式事務(wù)方案。其核心思想是將長事務(wù)拆分為多個本地短事務(wù),由 Saga 事務(wù)協(xié)調(diào)器協(xié)調(diào),如果各個本地事務(wù)成功完成那就正常完成,如果某個步驟失敗,則根據(jù)相反順序一次調(diào)用補償操作。
目前可用于 SAGA 的開源框架,主要為 Java 語言,其中以 seata 為代表。我們的例子采用 go 語言,使用的分布式事務(wù)框架為https://github.com/yedf/dtm,它對分布式事務(wù)的支持非常優(yōu)雅。下面來詳細(xì)講解 SAGA 的組成:
DTM 事務(wù)框架里,有 3 個角色,與經(jīng)典的 XA 分布式事務(wù)一樣:
- AP/應(yīng)用程序,發(fā)起全局事務(wù),定義全局事務(wù)包含哪些事務(wù)分支
- RM/資源管理器,負(fù)責(zé)分支事務(wù)各項資源的管理
- TM/事務(wù)管理器,負(fù)責(zé)協(xié)調(diào)全局事務(wù)的正確執(zhí)行,包括 SAGA 正向 /逆向操作的執(zhí)行
下面看一個成功完成的 SAGA 時序圖,就很容易理解 SAGA 分布式事務(wù):
SAGA實踐
對于我們要進(jìn)行的銀行轉(zhuǎn)賬的例子,我們將在正向操作中,進(jìn)行轉(zhuǎn)入轉(zhuǎn)出,在補償操作中,做相反的調(diào)整。
首先我們創(chuàng)建賬戶余額表:
CREATE TABLE dtm_busi.`user_account` (
`id` int(11) AUTO_INCREMENT PRIMARY KEY,
`user_id` int(11) not NULL UNIQUE ,
`balance` decimal(10,2) NOT NULL DEFAULT '0.00',
`create_time` datetime DEFAULT now(),
`update_time` datetime DEFAULT now()
);
我們先編寫核心業(yè)務(wù)代碼,調(diào)整用戶的賬戶余額
def saga_adjust_balance(cursor, uid, amount):
affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount))
if affected == 0:
raise Exception("update error, balance not enough")
下面我們來編寫具體的正向操作 /補償操作的處理函數(shù)
@app.post("/api/TransOutSaga")
def trans_out_saga():
saga_adjust_balance(c, out_uid, -30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransOutCompensate")
def trans_out_compensate():
saga_adjust_balance(c, out_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInSaga")
def trans_in_saga():
saga_adjust_balance(c, in_uid, 30)
return {"dtm_result": "SUCCESS"}
@app.post("/api/TransInCompensate")
def trans_in_compensate():
saga_adjust_balance(c, in_uid, -30)
return {"dtm_result": "SUCCESS"}
到此各個子事務(wù)的處理函數(shù)已經(jīng) OK 了,然后是開啟 SAGA 事務(wù),進(jìn)行分支調(diào)用
# 這是 dtm 服務(wù)地址
dtm = "http://localhost:8080/api/dtmsvr"
# 這是業(yè)務(wù)微服務(wù)地址
svc = "http://localhost:5000/api"
req = {"amount": 30}
s = saga.Saga(dtm, utils.gen_gid(dtm))
s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate")
s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate")
s.submit()
至此,一個完整的 SAGA 分布式事務(wù)編寫完成。
如果您想要完整運行一個成功的示例,那么參考這個例子yedf/dtmcli-py-sample,將它運行起來非常簡單
# 部署啟動 dtm
# 需要 docker 版本 18 以上
git clone https://github.com/yedf/dtm
cd dtm
docker-compose up
# 另起一個命令行
git clone https://github.com/yedf/dtmcli-py-sample
cd dtmcli-py-sample
pip3 install flask dtmcli requests
flask run
# 另起一個命令行
curl localhost:5000/api/fireSaga
處理網(wǎng)絡(luò)異常
假設(shè)提交給 dtm 的事務(wù)中,調(diào)用轉(zhuǎn)入操作時,出現(xiàn)短暫的故障怎么辦?按照 SAGA 事務(wù)的協(xié)議,dtm 會重試未完成的操作,這時我們要如何處理?故障有可能是轉(zhuǎn)入操作完成后出網(wǎng)絡(luò)故障,也有可能是轉(zhuǎn)入操作完成中出現(xiàn)機器宕機。如何處理才能夠保障賬戶余額的調(diào)整是正確無問題的?
這類網(wǎng)絡(luò)異常的妥當(dāng)處理,是分布式事務(wù)中的大難題,異常情況包括三類:重復(fù)請求、空補償、懸掛,都需要正確處理
DTM 提供了子事務(wù)屏障功能,保證上述異常情況下的業(yè)務(wù)邏輯,只會有一次正確順序下的成功提交。(子事務(wù)屏障詳情參考分布式事務(wù)最經(jīng)典的七種解決方案的子事務(wù)屏障環(huán)節(jié))
我們把處理函數(shù)調(diào)整為:
@app.post("/api/TransOutSaga")
def trans_out_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, out_uid, -30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "SUCCESS"}
這里的 barrier_from_req(request).call(cursor, busi_callback)調(diào)用會使用子事務(wù)屏障技術(shù),保證 busi_callback 回調(diào)函數(shù)僅被提交一次
您可以嘗試多次調(diào)用這個 TransIn 服務(wù),僅有一次余額調(diào)整。
處理回滾
假如銀行將金額準(zhǔn)備轉(zhuǎn)入用戶 2 時,發(fā)現(xiàn)用戶 2 的賬戶異常,返回失敗,會怎么樣?我們調(diào)整處理函數(shù),讓轉(zhuǎn)入操作返回失敗
@app.post("/api/TransInSaga")
def trans_in_saga():
return {"dtm_result": "FAILURE"}
我們給出事務(wù)失敗交互的時序圖
這里有一點,TransIn 的正向操作什么都沒有做,就返回了失敗,此時調(diào)用 TransIn 的補償操作,會不會導(dǎo)致反向調(diào)整出錯了呢?
不用擔(dān)心,前面的子事務(wù)屏障技術(shù),能夠保證 TransIn 的錯誤如果發(fā)生在提交之前,則補償為空操作;TransIn 的錯誤如果發(fā)生在提交之后,則補償操作會將數(shù)據(jù)提交一次。
您可以將返回錯誤的 TransIn 改成:
@app.post("/api/TransInSaga")
def trans_in_saga():
with barrier.AutoCursor(conn_new()) as cursor:
def busi_callback(c):
saga_adjust_balance(c, in_uid, 30)
barrier_from_req(request).call(cursor, busi_callback)
return {"dtm_result": "FAILURE"}
最后的結(jié)果余額依舊會是對的,原理可以參考:分布式事務(wù)最經(jīng)典的七種解決方案的子事務(wù)屏障環(huán)節(jié)
小結(jié)
在這篇文章里,我們介紹了 SAGA 的理論知識,也通過一個例子,完整給出了編寫一個 SAGA 事務(wù)的過程,涵蓋了正常成功完成,異常情況,以及成功回滾的情況。相信讀者通過這邊文章,對 SAGA 已經(jīng)有了深入的理解。
文中使用的 dtm 是新開源的 Golang 分布式事務(wù)管理框架,功能強大,支持 TCC 、SAGA 、XA 、事務(wù)消息等事務(wù)模式,支持 Go 、python 、PHP 、node 、csharp 等語言的。同時提供了非常簡單易用的接口。
閱讀完此篇干貨,歡迎大家訪問項目https://github.com/yedf/dtm,給顆星星支持!
到此這篇關(guān)于帶你用Python實現(xiàn)Saga 分布式事務(wù)的問題的文章就介紹到這了,更多相關(guān)Python Saga 分布式事務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- 詳解分布式系統(tǒng)中如何用python實現(xiàn)Paxos
- Python搭建Spark分布式集群環(huán)境
- python django框架中使用FastDFS分布式文件系統(tǒng)的安裝方法
- Python多進(jìn)程入門、分布式進(jìn)程數(shù)據(jù)共享實例詳解
- Python分布式進(jìn)程中你會遇到的問題解析
- 講解如何利用 Python完成 Saga 分布式事務(wù)