前言
由于公司提供的隊(duì)列實(shí)在太過于蛋疼而且還限制不能使用其他隊(duì)列,但為了保證數(shù)據(jù)安全性需要一個可以有ack功能的隊(duì)列。
原生的redis中通過L/R PUSH/POP方式來實(shí)現(xiàn)隊(duì)列的功能,這個當(dāng)然是沒辦法滿足需求的(沒有ack功能),所以需要自己對redis的list(隊(duì)列)做個小小的調(diào)整。
大體思路為在POP時將pop出的數(shù)據(jù)放到備份的地方,當(dāng)有ACK請求(確認(rèn)消息被消耗)后將備份的信息刪除掉;每次在pop前需要檢查備份隊(duì)列中有沒有過期的數(shù)據(jù)沒有ack的,如果有則PUSH到list中后再從list中POP出來。
以下腳本使用lua實(shí)現(xiàn),只需要在執(zhí)行前加載到redis中即可。
消息本身需要包含id屬性
push沒什么問題,原生即可(此處以LPUSH為例)
pop時腳本
local not_empty = function(x) return (type(x) == "table") and (not x.err) and (#x ~= 0) end local qName = ARGV[1] --隊(duì)列名稱 local currentTime = ARGV[2] --當(dāng)前時間,這個需要從外部傳入,不能使用redis自身時間,如果使用自身時間可能導(dǎo)致redis本身的backup在重放請求時出現(xiàn)不一致性 local considerAsFailMaxTimeSpan = ARGV[3] --超時時間設(shè)定,當(dāng)消息超過一定時間還沒有ack則認(rèn)為此消息需要再次入隊(duì) local zsetName= qName ..'BACKUP' local hashName= qName ..'CONTEXT' local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1) if (not_empty(tmp)) then redis.call('ZREM', zsetName, tmp[1]) --此處拿出的為消息的唯一id redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1])) end tmp = redis.call('RPOP', qName) if (tmp) then local msg = cjson.decode(tmp) local id = msg['id'] redis.call('ZADD', zsetName, tonumber(currentTime), id) redis.call('HSET',hashName , id, tmp) end return tmp
ack時候比較簡單,只需要將指定id從set和hash中刪除即可
local key = ARGV[1] local qName=ARGV[2] redis.call('ZREM', qName..'BACKUP', key) redis.call('HDEL', qName..'CONTEXT', key)
在程序中使用前需要顯示load這兩個腳本,后面直接調(diào)用這兩個腳本的sha值即可執(zhí)行。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
標(biāo)簽:澳門 贛州 景德鎮(zhèn) 香港 唐山 廣東 揚(yáng)州 林芝
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《詳解redis是如何實(shí)現(xiàn)隊(duì)列消息的ack》,本文關(guān)鍵詞 詳解,redis,是,如何,實(shí)現(xiàn),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。