本文的完整代碼在github.com/hdt3213/godis/redis/client
通常 TCP 客戶端的通信模式都是阻塞式的: 客戶端發(fā)送請(qǐng)求 -> 等待服務(wù)端響應(yīng) -> 發(fā)送下一個(gè)請(qǐng)求。因?yàn)樾枰却W(wǎng)絡(luò)傳輸數(shù)據(jù),完成一次請(qǐng)求循環(huán)需要等待較多時(shí)間。
我們能否不等待服務(wù)端響應(yīng)直接發(fā)送下一條請(qǐng)求呢?答案是肯定的。
TCP 作為全雙工協(xié)議可以同時(shí)進(jìn)行上行和下行通信,不必?fù)?dān)心客戶端和服務(wù)端同時(shí)發(fā)包會(huì)導(dǎo)致沖突。
p.s. 打電話的時(shí)候兩個(gè)人同時(shí)講話就會(huì)沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電臺(tái)發(fā)送到收音機(jī)不能反向傳輸,這種方式稱為單工。
我們?yōu)槊恳粋€(gè) tcp 連接分配了一個(gè) goroutine 可以保證先收到的請(qǐng)求先先回復(fù)。另一個(gè)方面,tcp 協(xié)議會(huì)保證數(shù)據(jù)流的有序性,同一個(gè) tcp 連接上先發(fā)送的請(qǐng)求服務(wù)端先接收,先回復(fù)的響應(yīng)客戶端先收到。因此我們不必?fù)?dān)心混淆響應(yīng)所對(duì)應(yīng)的請(qǐng)求。
這種在服務(wù)端未響應(yīng)時(shí)客戶端繼續(xù)向服務(wù)端發(fā)送請(qǐng)求的模式稱為 Pipeline 模式。因?yàn)闇p少等待網(wǎng)絡(luò)傳輸?shù)臅r(shí)間,Pipeline 模式可以極大的提高吞吐量,減少所需使用的 tcp 鏈接數(shù)。
pipeline 模式的 redis 客戶端需要有兩個(gè)后臺(tái)協(xié)程程負(fù)責(zé) tcp 通信,調(diào)用方通過 channel 向后臺(tái)協(xié)程發(fā)送指令,并阻塞等待直到收到響應(yīng),這是一個(gè)典型的異步編程模式。
我們先來定義 client 的結(jié)構(gòu):
type Client struct { conn net.Conn // 與服務(wù)端的 tcp 連接 pendingReqs chan *Request // 等待發(fā)送的請(qǐng)求 waitingReqs chan *Request // 等待服務(wù)器響應(yīng)的請(qǐng)求 ticker *time.Ticker // 用于觸發(fā)心跳包的計(jì)時(shí)器 addr string ctx context.Context cancelFunc context.CancelFunc writing *sync.WaitGroup // 有請(qǐng)求正在處理不能立即停止,用于實(shí)現(xiàn) graceful shutdown } type Request struct { id uint64 // 請(qǐng)求id args [][]byte // 上行參數(shù) reply redis.Reply // 收到的返回值 heartbeat bool // 標(biāo)記是否是心跳請(qǐng)求 waiting *wait.Wait // 調(diào)用協(xié)程發(fā)送請(qǐng)求后通過 waitgroup 等待請(qǐng)求異步處理完成 err error }
調(diào)用者將請(qǐng)求發(fā)送給后臺(tái)協(xié)程,并通過 wait group 等待異步處理完成:
func (client *Client) Send(args [][]byte) redis.Reply { request := request{ args: args, heartbeat: false, waiting: wait.Wait{}, } request.waiting.Add(1) client.working.Add(1) defer client.working.Done() client.pendingReqs - request // 請(qǐng)求入隊(duì) timeout := request.waiting.WaitWithTimeout(maxWait) // 等待響應(yīng)或者超時(shí) if timeout { return reply.MakeErrReply("server time out") } if request.err != nil { return reply.MakeErrReply("request failed") } return request.reply }
client 的核心部分是后臺(tái)的讀寫協(xié)程。先從寫協(xié)程開始:
// 寫協(xié)程入口 func (client *Client) handleWrite() { for req := range client.pendingReqs { client.doRequest(req) } } // 發(fā)送請(qǐng)求 func (client *Client) doRequest(req *request) { if req == nil || len(req.args) == 0 { return } // 序列化請(qǐng)求 re := reply.MakeMultiBulkReply(req.args) bytes := re.ToBytes() _, err := client.conn.Write(bytes) i := 0 // 失敗重試 for err != nil i 3 { err = client.handleConnectionError(err) if err == nil { _, err = client.conn.Write(bytes) } i++ } if err == nil { // 發(fā)送成功等待服務(wù)器響應(yīng) client.waitingReqs - req } else { req.err = err req.waiting.Done() } }
讀協(xié)程是我們熟悉的協(xié)議解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。
// 收到服務(wù)端的響應(yīng) func (client *Client) finishRequest(reply redis.Reply) { defer func() { if err := recover(); err != nil { debug.PrintStack() logger.Error(err) } }() request := -client.waitingReqs if request == nil { return } request.reply = reply if request.waiting != nil { request.waiting.Done() } } // 讀協(xié)程是個(gè) RESP 協(xié)議解析器 func (client *Client) handleRead() error { ch := parser.ParseStream(client.conn) for payload := range ch { if payload.Err != nil { client.finishRequest(reply.MakeErrReply(payload.Err.Error())) continue } client.finishRequest(payload.Data) } return nil }
最后編寫 client 的構(gòu)造器和啟動(dòng)異步協(xié)程的代碼:
func MakeClient(addr string) (*Client, error) { conn, err := net.Dial("tcp", addr) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) return Client{ addr: addr, conn: conn, sendingReqs: make(chan *Request, chanSize), waitingReqs: make(chan *Request, chanSize), ctx: ctx, cancelFunc: cancel, writing: sync.WaitGroup{}, }, nil } func (client *Client) Start() { client.ticker = time.NewTicker(10 * time.Second) go client.handleWrite() go func() { err := client.handleRead() logger.Warn(err) }() go client.heartbeat() }
關(guān)閉 client 的時(shí)候記得等待請(qǐng)求完成:
func (client *Client) Close() { // 先阻止新請(qǐng)求進(jìn)入隊(duì)列 close(client.sendingReqs) // 等待處理中的請(qǐng)求完成 client.writing.Wait() // 釋放資源 _ = client.conn.Close() // 關(guān)閉與服務(wù)端的連接,連接關(guān)閉后讀協(xié)程會(huì)退出 client.cancelFunc() // 使用 context 關(guān)閉讀協(xié)程 close(client.waitingReqs) // 關(guān)閉隊(duì)列 }
測試一下:
func TestClient(t *testing.T) { client, err := MakeClient("localhost:6379") if err != nil { t.Error(err) } client.Start() result = client.Send([][]byte{ []byte("SET"), []byte("a"), []byte("a"), }) if statusRet, ok := result.(*reply.StatusReply); ok { if statusRet.Status != "OK" { t.Error("`set` failed, result: " + statusRet.Status) } } result = client.Send([][]byte{ []byte("GET"), []byte("a"), }) if bulkRet, ok := result.(*reply.BulkReply); ok { if string(bulkRet.Arg) != "a" { t.Error("`get` failed, result: " + string(bulkRet.Arg)) } } }
Keep working, we will find a way out.This is Finley, welcome to join us.
到此這篇關(guān)于Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端的文章就介紹到這了,更多相關(guān)Golang實(shí)現(xiàn)pipeline模式的redis客戶端內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:黑龍江 張掖 武漢 新余 江西 嘉峪關(guān) 延邊 宜賓
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端》,本文關(guān)鍵詞 Golang,實(shí)現(xiàn),Redis,系列,六,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。