主頁 > 知識庫 > 一篇文章帶你從入門到精通:RabbitMQ

一篇文章帶你從入門到精通:RabbitMQ

熱門標簽:地圖標注如何弄全套標 在電子版地圖標注要收費嗎 實體店地圖標注怎么標 股票配資電銷機器人 外呼系統(tǒng)會封嗎 萬利達綜合醫(yī)院地圖標注點 武漢AI電銷機器人 南京電銷外呼系統(tǒng)哪家好 電銷機器人 深圳

1. 淺淺道來

1.1 什么是中間件?

IDC(互聯(lián)網(wǎng)數(shù)據(jù)中心)的定義:中間件是一種獨立的系統(tǒng)軟件服務程序,分布式應用軟件借助這種軟件在不同的技術之間共享資源,中間件位于客戶機服務器的操作系統(tǒng)之上,管理計算資源和網(wǎng)絡通信。

首先,中間件是某一類軟件的總稱,而不是某一種具體的軟件。它是一種位于平臺(操作系統(tǒng)硬件) 和 應用程序之間的通用服務,它屏蔽了底層操作系統(tǒng)的各種復雜性,減輕了開發(fā)人員的技術負擔,同時它的設計不針對某一具體目標,而是提供具有普遍通用特點的功能模塊服務,這些服務具有標準的程序接口和協(xié)議,根據(jù)平臺的不同,也可以有不同的實現(xiàn)。

通俗的例子(僅供參考,并不算完全一致):

我開了一家咖啡店,我身邊有 A B C 等 n 家咖啡豆的供應商,但是我肯定要挑選價格又實惠,質量還不錯的豆子,但是市場是受到多方面因素波動的,可能我現(xiàn)在的選擇,在一段時間后已經(jīng)不是最佳選項了。所以我專門找到一家市場中介,讓他幫我操心這一攤子事情,我只和你說清價格和質量要求,你去找就是了,過程我一點也不操心。這個中介的概念,就類似中間件的

1.1.1 分布式的概念(補充)

這一段,來自我之前寫的 Dubbo 入門的那篇文章哈

在百度以及維基中的定義都相對專業(yè)且晦澀,大部分博客或者教程經(jīng)常會使用《分布式系統(tǒng)原理和范型》中的定義,即:“分布式系統(tǒng)是若干獨立計算機的集合,這些計算機對于用戶來說就像是單個相關系統(tǒng)”

下面我們用一些篇幅來通俗的解釋一下什么叫做分布式

1.1.1.1 什么是集中式系統(tǒng)

提到分布式,不得不提的就是 “集中式系統(tǒng)”,這個概念最好理解了,它就是將功能,程序等安裝在同一臺設備上,就由這一臺主機設備向外提供服務

舉個最簡單的例子:你拿一臺PC主機,將其改裝成了一臺簡單的服務器,配置好各種內(nèi)容后,你將MySQL,Web服務器,F(xiàn)TP,Nginx 等等,全部安裝在其中,打包部署項目后,就可以對外提供服務了,但是一旦這臺機器無論是軟件還是硬件出現(xiàn)了問題,整個系統(tǒng)都會受到嚴重的牽連錯誤,雞蛋放在一個籃子里,要打就全打了

1.1.12 什么是分布式系統(tǒng)

既然集中式系統(tǒng)有這樣一種牽一發(fā)而動全身的問題,那么分布式的其中一個作用,自然是來解決這樣的問題了,正如定義中所知,分布式系統(tǒng)在用戶的體驗感官里,就像傳統(tǒng)的單系統(tǒng)一樣,一些變化都是這個系統(tǒng)本身內(nèi)部進行的,對于用戶并沒有什么太大的感覺

例如:淘寶,京東這種大型電商平臺,它們的主機都是數(shù)以萬計的,否則根本沒法處理大量的數(shù)據(jù)和請求,具體其中有什么劃分,以及操作,我們下面會說到,但是對于用戶的我們,我們不需要也不想關心這些,我們?nèi)钥梢詥渭兊恼J為,我們面對的就是 “淘寶” 這一臺 “主機”

所以分布式的一個相對專業(yè)一些的說法是這樣的(進程粒度)兩個或者多個程序,分別運行在不同的主機進程上,它們互相配合協(xié)調(diào),完成共同的功能,那么這幾個程序之間構成的系統(tǒng)就可以叫做分布式系統(tǒng)

這幾者都是相同的程序 —— 分布式這幾者都是不同的程序 —— 集群

1.2 什么是消息中間件/消息隊列(MQ)

消息中間件,顧名思義就是用來處理消息相關服務的中間件,它提供了一種系統(tǒng)之間通信交互的通道,例如發(fā)送方只需要把想傳輸?shù)男畔⒔唤o消息中間件,而發(fā)送的協(xié)議,方式,發(fā)送過程中出現(xiàn)的網(wǎng)絡,故障等等問題,都由中間件進行處理,因此它負責保證信息的可靠傳輸。

所以消息中間件,就是一種用來接受數(shù)據(jù),存儲數(shù)據(jù),發(fā)送數(shù)據(jù)的技術,它提供了各種功能,可以實現(xiàn)消息的高可用,高可靠,也提供了很好的容錯機制等??梢猿绦驅ο到y(tǒng)資源的占用,以及傳輸效率的提升有很大幫助。

常說的 MQ 就是指消息隊列,即 Message Quene,常見的消息隊列有,經(jīng)典的 ActivieMQ,熱門的 Kafka,阿里的 RocketMQ 等等,以及這里講解的 RabbitMQ。

不同的 MQ 有著不同的特點,以及其更加擅長的方向,倒也說不上誰好誰壞,只有誰更合適。

1.2.1 消息隊列應用場景

根據(jù)業(yè)務的需要,其實它可以有多種應用場景,例如解耦,削峰填谷,廣播等,我們舉兩個場景來梳理一下簡單的過程

1.2.1.1 業(yè)務解耦

最近在考慮買幾本書看,就以買書下訂單舉例,當我點擊購買之后,可能會有這么一串業(yè)務邏輯執(zhí)行,① 減去庫存容量 ② 生成訂單 ③ 支付 ④ 更新訂單狀態(tài) ⑤ 發(fā)送購買成功短信 ⑥ 更新商品快遞攬收狀態(tài)。在初期階段,我們完全可以讓這些業(yè)務同步執(zhí)行,但是后期為了提升效率,就可以將需要立即執(zhí)行的任務和可稍緩執(zhí)行的任務進行分離,例如 ⑤ 發(fā)送購買成功短信 ⑥ 更新商品快遞攬收狀態(tài),都可以考慮異執(zhí)行。在主流程執(zhí)行結束后,這些可稍緩的業(yè)務可以通過給 MQ 發(fā)送消息,就判定已經(jīng)執(zhí)行,保證流程先結束。然后再通過拉取 MQ 消息,或者 MQ 主動推送去異步執(zhí)行其他的業(yè)務。

1.2.1.2 削峰填谷

例如發(fā)送一條帶有已讀未讀標識的公告信息,所以需要對每一個用戶都寫一條這樣的公告消息,例如存到 MongoDB 中,即便 MongoDB 也支撐不下來瞬時寫入百萬、千萬記錄的情況,所以可以考慮使用消息隊列。比如說我們可以在Java后端系統(tǒng)上面,用異步多線程的方法,向消息隊列MQ中發(fā)送消息,這樣Web系統(tǒng)發(fā)布公告消息的時候就不占用數(shù)據(jù)庫正常的 CRUD 操作。系統(tǒng)消息保存在消息隊列中,我們只是用它來做削峰填谷,系統(tǒng)消息最終還是要存儲在數(shù)據(jù)庫上面。于是我們可以這樣設計,在用戶登陸系統(tǒng)的時候,用異步線程從消息隊列MQ中,接收該用戶的系統(tǒng)消息,然后把系統(tǒng)消息存儲在數(shù)據(jù)庫中,最后消息隊列MQ中的該條消息自動刪除。因為用戶的錯峰登錄,所以往數(shù)據(jù)庫中寫入消息的任務也變成了錯峰寫入。

1.3 什么是 RabbitMQ

RabbitMQ 是一個使用 Erlang 語言編寫,且遵循 AMQP協(xié)議的開源消息隊列系統(tǒng),支持多種客戶端(語言),用于在分布式系統(tǒng)中存儲消息,轉發(fā)消息,具有高可用,高可擴性,易用性等特征。

更詳細的介紹可以直接看一下官網(wǎng):https://www.rabbitmq.com/

總之這就是一種常見的消息隊列,它的這些特點,都會在后面逐條講解到,我們首先從入門下載安裝部分先說起,然后再到使用。

2. 下載與安裝

一般來說,安裝的方式有手動安裝和 Docker 安裝,大部分場景下,都會使用 Docker 安裝,但是作為學習階段,如果不是特別著急,學習一下手動安裝,也不是什么壞事。

注:云服務器和虛擬機都可以,演示的 Linux 版本為 CentOS 7.9

2.1 手動安裝

2.1.1 下載安裝過程

注:可以在 Linux 中通過 yum 直接下載安裝,這里選擇了在自己的 Windows 主機先下載文件,然后再通過 FTP 傳到 Linux 上,直接安裝。可以避免虛擬機上因為網(wǎng)絡而造成的一些下載問題。

首先打開官網(wǎng)的下載目錄,然后根據(jù)自己 Linux 的版本,選擇版本。

1.地址:https://www.rabbitmq.com/download.html

2.因為 RabbitMQ 是 Erlang 語言編寫的,所以還需要提供 Erlang 環(huán)境,接著去下載 Erlang。

  • 地址:https://www.erlang-solutions.com/downloads
    • A:此網(wǎng)站訪問速度極慢,請耐心等待,或者需要掛上梯子
    • B:Erlang 版本需要與 RabbitMQ 匹配(如圖,有最大和最小版本的限制)
      • 版本查看地址:https://www.rabbitmq.com/which-erlang.html
      • 這里選擇了 RabbitMQ 3.8.14 和 Erlang 23.2.3

3.將文件上傳到 Linux 中(我這里指定位置是 /usr/local/bin/rabbitmq ,可以自己更改選擇)

  • 現(xiàn)在很多 Shell 軟件都自帶內(nèi)置的 FTP 上傳,例如 FinalShell,MobaXterm 等等
  • 上傳后的文件和目錄位置如下
[root@centos7 rabbitmq]# ls
esl-erlang_23.2.3-1_centos_7_amd64.rpm  rabbitmq-server-3.8.14-1.el7.noarch.rpm
[root@centos7 rabbitmq]# pwd
/usr/local/bin/rabbitmq

4.安裝 Erlang 、Socat 和 RabbitMQ

  • Erlang 、Socat 都是 RabbitMQ 所依賴的
# 安裝 Erlang,安裝后執(zhí)行 erl -v 顯示版本號則代表成功
rpm -ivh esl-erlang_23.2.3-1_centos_7_amd64.rpm
# 安裝 Socat 這里沒有下載源文件,而是直接通過 yum 在線安裝,因為它并不大
yum install -y socat
# 安裝 RabbitMQ
rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm

5.安裝結束,啟動服務查看 RabbitMQ 是否可以啟動成功

# 啟動服務
systemctl start rabbitmq-server
# 開機自啟
systemctl enable rabbitmq-server
# 停止服務
systemctl stop rabbitmq-server
# 查看服務狀態(tài)
systemctl status rabbitmq-server.service

如圖所示,即安裝啟動成功

2.1.2 配置 Web 界面管理

上面的安裝其實已經(jīng)結束了,但是 RabbitMQ 提供給了我們一個 Web 形式的管理界面,默認是沒有的,需要進行安裝。

1.安裝 Web 管理插件,然后重啟服務

# 安裝命令
rabbitmq-plugins enable rabbitmq_management
# 重啟服務
systemctl restart rabbitmq-server

2.一定要開放 Linux 防火墻 的 15672 端口,否則就會無法訪問,在學習階段,你甚至可以去查詢命令把防火墻關掉

對應服務器(阿里云,騰訊云等)就是在安全組中開放 15672 端口

訪問 Linux IP:15672 ,例如 http://192.168.122.1:15672

# 查詢 15672 是否開放,一般默認都是 no
firewall-cmd --query-port=15672/tcp
# 開放指定端口 15672 
firewall-cmd --add-port=15672/tcp --permanent
# 重新載入
firewall-cmd --reload
# 再次查詢,結果就是 yes 了
firewall-cmd --query-port=15672/tcp

3.添加遠程登錄的賬戶

  • RabbitMQ 有一個默認賬號和密碼都是 guest 但是只能在 localhost 下訪問
# 新增用戶 用戶名和密碼都是 admin
rabbitmqctl add_user admin admin

4.為遠程登錄的賬戶添加權限

  • administrator(超級管理員):登錄控制臺、查看所有信息、操作用戶、操作策略
  • monitoring(監(jiān)控者): 登錄控制臺、查看所有信息
  • policymaker(策略制定者): 登錄控制臺、指定策略
  • managment(普通管理員): 登錄控制臺
    復制代碼 代碼如下:
# 設置用戶分配操作權限,admin 用戶的權限為 administrator
rabbitmqctl set_user_tags admin administrator

5.為用戶添加資源權限

  • 因為 admin 已經(jīng)是超級管理員權限了,所以其實不分配資源權限也可以,會默認去做。
# 命令格式為: set_permissions [-p vhostpath>] user> conf> write> read>
# 這里即為 admin 用戶開啟 配置文件和讀寫的權限
rabbitmqctl set_permissions -p / admin ".*"".*"".*"

6.訪問 Linux IP:15672 ,例如 http://192.168.122.1:15672 ,輸入剛才設置好的用戶名密碼 admin

  • 如圖:訪問成功

2.1.2.1 命令小結

1.添加用戶:rabbitmqctl add_user username> password>

2.修改密碼:rabbitmqctl change_password username> newpass>

3.刪除用戶:rabbitmqctl delete_user username>

4.用戶列表:rabbitmqctl list_users

5.設置用戶角色:rabbitmqctl set_user_tags username> tag1,tag2>

6.刪除用戶所有角色:rabbitmqctl set_user_tags username>

7.為用戶添加資源權限:set_permissions [-p vhostpath>] user> conf> write> read>

使用:輸入 rabbitmqctl ,則會提示可能使用的命令,然后 使用 rabbitmqctl hepl 命令> 可以查看具體命令的使用方法和參數(shù)。

2.1.3 簡單介紹 Web 界面管理

  • Connections(連接):此處用來管理與
  • RabbitMQ 建立連接后的生產(chǎn)者和消費者
  • Channels(通道):連接建立后,會形成通道,消息的投遞獲取依賴通道。
  • Exchanges(交換機):用來實現(xiàn)消息的路由
  • Queues(隊列):存放消息的隊列,消息等待被消費,消費后被移除隊列。
  • Admin(管理):用于對管理用戶,以及對應權限進行設置,如下圖所示

Tags 就是用來指定用戶的角色

  • administrator(超級管理員):登錄控制臺、查看所有信息、操作用戶、操作策略
  • monitoring(監(jiān)控者): 登錄控制臺、查看所有信息
  • policymaker(策略制定者): 登錄控制臺、指定策略
  • managment(普通管理員): 登錄控制臺

2.2 Docker 安裝

在 Docker 中安裝 RabbitMQ 不需要自己去考慮版本,環(huán)境等的各種沖突不兼容問題,是非常便捷的,我演示的這臺虛擬機是一個 CentOS 7.9 裸機,所以我們從更新 yum,到安裝 Docker 和 安裝 RabbitMQ 按步驟都講一下

2.2.1 配置 yum

1.更新 yum 到最新版

# 更新 yum
yum update
# 檢查yum依賴的幾個包 yum-utils 提供 yum-config-manager 功能, 后面兩個是 devicemapper 用到的
yum install -y yum-utils device-mapper-persistent-data lvm2

2.設置 yum 源為阿里云

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

2.2.2 安裝 docker

2.2.2.1 步驟

1.使用 yum 安裝 docker

  • docker-ce 是社區(qū)版的意思,ee為企業(yè)版
yum install docker-ce -y

2.通過查看版本,檢查安裝是否成功

docker -v
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json -'EOF'
{
  "registry-mirrors": ["https://你的ID>.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

  • 國內(nèi)從 DockerHub 拉取鏡像有時會遇到困難,此時可以配置鏡像加速器。Docker 官方和國內(nèi)很多云服務商都提供了國內(nèi)加速器服務,例如:
    • 科大鏡像:https://docker.mirrors.ustc.edu.cn/
    • 網(wǎng)易:https://hub-mirror.c.163.com/
    • 阿里云:https://你的ID>.mirror.aliyuncs.com
    • 七牛云加速器:https://reg-mirror.qiniu.com

當配置某一個加速器地址之后,若發(fā)現(xiàn)拉取不到鏡像,請切換到另一個加速器地址。國內(nèi)各大云服務商均提供了 Docker 鏡像加速服務,建議根據(jù)運行 Docker 的云平臺選擇對應的鏡像加速服務。

阿里云鏡像獲取地址:https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors,登陸后,左側菜單選中鏡像加速器就可以看到你的專屬地址了

2.2.2.2 Docker 常見命令
2.2.2.2.1 管理命令
  • 就啟動,停止,重啟這些簡單的命令使用 service 也是可以的,systemctl 功能稍微強大一些
# 啟動 docker
systemctl docker start
# 停止 docker
systemctl docker stop
# 重啟 docker
systemctl docker restart
# 查看 docker 狀態(tài)
systemctl status docker
# 開機自啟
systemctl enable docker
systemctl unenable docker
2.2.2.2.2 鏡像命令
# 導入鏡像文件
docker load  xxx.tar.gz
# 查看安裝的鏡像
docker images
# 刪除鏡像
docker rmi 鏡像名

2.2.3 安裝 RabbitMQ (任選其一)

注:直接用 2.2.3.2 一句話安裝 會更好一些

2.2.3.1 一步一步安裝獲取

1.RabbitMQ 的鏡像

docker pull rabbitmq:management

2.創(chuàng)建并運行容器(具體參數(shù)在 3 中介紹)

docker run -id --name 容器名 -p 15672:15672 -p 5672:5672 rabbitmq:management
2.2.3.2 一句話安裝

上面的安裝方式,就是先獲取到 RabbitMQ 鏡像后再開始安裝,這里是沒有問題的,創(chuàng)建時會有一個問題,因為我們要安裝 management 也就是它的 web 管理,如果不做一些處理,默認裝好的是沒有用戶的,所以還需要像前面一樣自己進去配置,而 Docker Hub 已經(jīng)給出了我們配置的示例,即使用 -e 代表配置,使用 RABBITMQ_DEFAULT_USERRABBITMQ_DEFAULT_PASS 配置用戶名和密碼

更多請查看 Docker Hub 官方給予例子中的 Setting default user and password 章節(jié)https://registry.hub.docker.com/_/rabbitmq/

1.執(zhí)行安裝

docker run -di --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p
 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

2.通過容器狀態(tài),查看是否運行成功

# 查看容器運行狀態(tài)docker ps -a# 啟動docker start 容器名# 停止docker stop 容器名# 退出命令行,不停止exit# 進入到node容器(如果開啟了 -t 的情況)docker exec -it 容器名 bash

2.2.3.2.1 參數(shù)介紹

下面分別講解一下這些參數(shù)的說明:

  • -i:表示運行容器。
  • -t:表示為容器保留交互的方式(命令行),即分配一個偽終端。所以常常會見到 -it 這樣的搭配。
  • --name :為容器起個名字。
  • -v:表示目錄映射關系(前者是宿主機目錄,后者是映射到宿主機上的目錄),可以使用多個 -v 做多個目錄或文件映射。注意:推薦做目錄映射,在宿主機上做修改,然后共享到容器上。
  • -d:表示創(chuàng)建一個守護式容器在后臺運行(這樣創(chuàng)建容器后不會自動登錄容器,如果只加 -i -t 兩個參數(shù),創(chuàng)建后就會自動進去容器),即后端掛起運行。
  • -p:表示端口映射,前者是宿主機端口,后者是容器內(nèi)的映射端口。可以使用多個 -p 做多個端口映射,只有做了端口映射,才能被外界訪問。

給大家舉個例子:

# 查看容器運行狀態(tài)
docker ps -a
# 啟動
docker start 容器名
# 停止
docker stop 容器名
# 退出命令行,不停止
exit
# 進入到node容器(如果開啟了 -t 的情況)
docker exec -it 容器名 bash

因為使用了 -t 這個參數(shù),所以可以分配到一個偽終端,通過 docker exec -it 容器名 bash 進入命令行
-v 目錄映射后,進入容器后,也會有一個一模一樣的 demo 文件夾,例如在其中可以執(zhí)行 python 程序

2.2.3.2.1 端口介紹

4369 :erlang發(fā)現(xiàn)端口

5672:client端通信端口

15672:管理界面ui端口

25672:server間內(nèi)部通信端口

61613:不帶TLS和帶TLS的STOMP客戶端

1883:不啟用和啟用TLS的MQTT客戶端

比較關鍵的就是 5672 和 15672

更多端口詳情可以訪問官網(wǎng)文檔https://www.rabbitmq.com/networking.html

注:如果要通過遠程連接,例如訪問 web 管理頁面的 15672 端口,Java 客戶端連接的 5672 端口, 一定要進行一個開放操作,否則都連接不到。

  • 以下為基于 CentOS 7.9 開放 15672 端口的例子
# 查詢 15672 是否開放,一般默認都是 no
firewall-cmd --query-port=15672/tcp
# 開放指定端口 15672 
firewall-cmd --add-port=15672/tcp --permanent
# 重新載入
firewall-cmd --reload
# 再次查詢,結果就是 yes 了
firewall-cmd --query-port=15672/tcp

]以下是關閉防火墻的命令

systemctl disable firewalld
systemctl stop firewalld   

3. RabbitMQ 協(xié)議和模型

安裝結束后,就要進入主題,即用 Java 或者 Springboot 代碼來實現(xiàn) RabbitMQ的幾種方式,但是想要很好的理解這幾種路由交換方式,就需要對它的協(xié)議和架構模型有所了解。

3.1 協(xié)議

3.1.1 什么是協(xié)議?

協(xié)議,網(wǎng)絡協(xié)議的簡稱,網(wǎng)絡協(xié)議是通信計算機雙方必須共同遵從的一組約定。如怎么樣建立連接、怎么樣互相識別等。只有遵守這個約定,計算機之間才能相互通信交流。它的三要素是:語法、語義、時序。

為了使數(shù)據(jù)在網(wǎng)絡上從源到達目的,網(wǎng)絡通信的參與方必須遵循相同的規(guī)則,這套規(guī)則稱為協(xié)議(protocol),它最終體現(xiàn)為在網(wǎng)絡上傳輸?shù)臄?shù)據(jù)包的格式。

3.1.1.1 網(wǎng)絡協(xié)議的三要素

1.語法:數(shù)據(jù)與控制信息的結構和格式,以及數(shù)據(jù)出現(xiàn)的順序。

2.語義:解釋控制信息每個部分的意義,以及規(guī)定了需要發(fā)出何種控制信息以及完成的動作做出何種響應。

3.時序:對事件發(fā)生順序的詳細說明。

人們形象地把這三個要素描述為:做什么,怎么做,做的順序。

舉個例子 HTTP 協(xié)議

語法:HTTP 規(guī)定了請求報文和響應報文的格式
語義:客戶端主動發(fā)起請求稱為請求,服務端隨之返回數(shù)據(jù),稱為響應
時序: 一個請求對應一個響應,而且先有請求后有響應

3.1.1.1.1 面試題:為什么消息中間件不直接使用 HTTP 協(xié)議

對于一個消息中間件來說,其主要責任就是負責數(shù)據(jù)傳遞,存儲,分發(fā),高性能和簡潔才是我們所追求的,而 HTTP 請求報文頭和響應報文頭是比較復雜的,包含了Cookie,數(shù)據(jù)的加密解密,窗臺嗎,響應碼等附加的功能,我們并不需要這么復雜的功能。

同時大部分情況下 HTTP 大部分都是短鏈接,在實際的交互過程中,一個請求到響應都很有可能會中斷,中斷以后就不會執(zhí)行持久化,就會造成請求的丟失。這樣就不利于消息中間件的業(yè)務場景,因為消息中間件可能是一個長期的獲取信息的過程,出現(xiàn)問題和故障要對數(shù)據(jù)或消息執(zhí)行持久化等,目的是為了保證消息和數(shù)據(jù)的高可靠和穩(wěn)健的運行

3.1.2 RabbitMQ 的 AMQP 協(xié)議

RabbitMQ 的使用的協(xié)議是 AMQP(advanced message queuing protocol),它在2003年時被提出,最早用于解決金融領不同平臺之間的消息傳遞交互問題。

AMQP 更準確的說是一種 binary wire-level protocol(鏈接協(xié)議)。這是其和 JMS 的本質差別,AMQP 不從 API 層進行限定,而是直接定義網(wǎng)絡交換的數(shù)據(jù)格式。這使得實現(xiàn)了AMQP的 Provider(Producer) 天然性就是跨平臺的。

相比較其它消息協(xié)議,其特性為:

1.分布式事務支持

2.消息的持久化支持

3.高性能和高可靠的消息處理優(yōu)勢

3.1.3 架構模型

想要學習后面的幾種消息具體的發(fā)送模式,這個模型圖就必須理解清楚,因為這幾種方式就是對這個模型不同程度的選擇和縮減

  • Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。
  • Connection:應用程序與Broker之間的網(wǎng)絡連接。
  • Channel:信道,即信息傳輸?shù)耐ǖ?,可以建立多個 Channel,每個 Channel 代表一個會話任務。
    • 信道是建立在 TCP 連接內(nèi)的虛擬連接,信息的讀寫都通過信道傳輸,因為對于操縱系統(tǒng)而言,建立和銷毀 TCP 是非常昂貴的,所以引入了信道的概念,以復用一條 TCP 連接。
  • Broker(Server) :標識消息隊列服務器實體,例如這里就是 RabbitMQ Server。
  • Virtual Host:虛擬主機,一個 Broker 中可以設置多個 Virtual Host,用作不同用戶的權限隔離。
    • Broker 可以理解為整個數(shù)據(jù)庫服務,而 Virtual Host 就是其中每個數(shù)據(jù)庫的感覺,不同項目可以對應不同的數(shù)據(jù)庫,其中有著項目所屬的業(yè)務表等等。
    • 每個 Virtual Host 中,可以有若干個 Exchange 和 Queue。
  • Exchange:交換機,用來接收生產(chǎn)者發(fā)送的消息,然后將這些消息根據(jù)路由鍵發(fā)送到隊列。
  • Binding:Exchange 和 Queue 之間的虛擬連接,Binding 中可以包括多個 Routing key。
  • Routing key:路由規(guī)則,虛擬機用它來確認如何路由一個特定消息。
  • Queue:消息隊列,它是消息的容器,用來保存消息,每一條消息都能傳入一個或者多個隊列中,等待消費者消費,即取出這個消息。
  • Consumer:消息的消費者(接收消息的程序)。

4. Java 實現(xiàn) RabbitMQ

4.1 環(huán)境搭建

官網(wǎng)介紹幾種模型:https://www.rabbitmq.com/getstarted.html

截止目前為止,官網(wǎng)一共提供了 7 中模型的介紹,我們主要介紹前五種基本的模式,也有人將 Direct 和 Topic模式都歸入 Routing 模式,也可以看做四大種。

4.1.1 創(chuàng)建 Java 項目

首先創(chuàng)建好一個不使用骨架的 Maven 項目,然后引入 RabbitMQ 依賴,還有單元測試依賴即可

dependency>
   groupId>com.rabbitmq/groupId>
    artifactId>amqp-client/artifactId>
    version>5.10.0/version>
/dependency>
dependency>
    groupId>junit/groupId>
    artifactId>junit/artifactId>
    version>4.11/version>
/dependency>

4.1.2 創(chuàng)建虛擬主機(可選)

在這里,我們創(chuàng)建了一個新的 Virtual Hosts,用來為這個Java項目服務,大家還可以創(chuàng)建一個新的用戶,然后對其開啟這個 Virtual Hosts 的訪問權限(即將虛擬主機與用戶綁定)。我們這里還是用 admin(我之前創(chuàng)建的一個管理員權限用戶) 來演示。

注:這部分不去做也可以,直接用 / 和 admin 用戶也行

4.1.3 創(chuàng)建連接工具類

由于我們后面要演示多種例子,而每一次獲取連接和釋放連接、關閉資源等操作代碼都是一致的,為了防止代碼冗余,優(yōu)化代碼,更易理解,提取出一個工具類,這樣大家將重心放在不同實現(xiàn)方式的對比上就行了。

  • RabbitMqUtil 工具類
    public class RabbitMqUtil {
        /**
         * 主機名 即 Linux IP地址
         */
        private static String host = "";
        /**
         *  端口號 客戶端訪問默認都是 5672
         */
        private static int port = 0;
        /**
         * 虛擬主機 可以設置為默認的 / 或者自己創(chuàng)建出指定的虛擬主機
         */
        private static String virtualHost = "";
        /**
         * 用戶名
         */
        private static String username = "";
        /**
         * 密碼
         */
        private static String password = "";
        // 使用靜態(tài)代碼塊為Properties對象賦值
        static {
            try {
                //實例化對象
                Properties properties = new Properties();
                //獲取properties文件的流對象
                InputStream in = RabbitMqUtil.class.getClassLoader().getResourceAsStream("rabbitmq.properties");
                properties.load(in);
                // 分別獲取 value
                host = properties.getProperty("host");
                port = Integer.parseInt(properties.getProperty("port"));
                virtualHost = properties.getProperty("virtualHost");
                username = properties.getProperty("username");
                password = properties.getProperty("password");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        /**
         * 獲取連接
         *
         * @return 連接
         */
        public static Connection getConnection() {
            try {
                // 創(chuàng)建連接工廠
                ConnectionFactory connectionFactory = new ConnectionFactory();
                // 設置連接 rabbitmq 主機
                connectionFactory.setHost(host);
                // 設置端口號
                connectionFactory.setPort(port);
                // 設置連接的虛擬主機(數(shù)據(jù)庫的感覺)
                connectionFactory.setVirtualHost(virtualHost);
                // 設置訪問虛擬主機的用戶名和密碼
                connectionFactory.setUsername(username);
                connectionFactory.setPassword(password);
                // 返回一個新連接
                return connectionFactory.newConnection();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
        /**
         * 關閉通道和釋放連接
         *
         * @param channel    channel
         * @param connection connection
         */
        public static void close(Channel channel, Connection connection) {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • properties
host=192.168.122.1
port=5672
virtualHost=/rabbitmq_maven_01
username=admin
password=adminv

4.2 五種實現(xiàn)方式

說明:

  • 隊列名,消息等等字符串內(nèi)容,更推薦定義成變量傳入,我文中都是直接寫在參數(shù)中的,這種魔法值的寫法,并不是很優(yōu)美。
  • 生產(chǎn)者中使用了 Junit 單元測試,但是消費者中卻在 main 函數(shù)中編寫,這是因為,我們希望消費者處于一個持續(xù)運行等待的狀態(tài),如果使用 Junit 會導致,程序在執(zhí)行一次后結束掉.
    • 除了在 main 函數(shù)中編寫,還可以考慮使用 sleep 等待或者 while(true) 讓程序不要直接終止掉。

4.2.1 簡單隊列模式(Hello Word)

  • Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。
  • Queue:消息隊列,理解為一個容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費者消費。
  • Consumer:消息的消費者(接收消息的程序)。
4.2.1.1 如何理解

由圖所示,簡單隊列模式,一個生產(chǎn)者,經(jīng)過一個隊列,對應一個消費者??梢钥醋鍪屈c對點的一種傳輸方式,相較與 3.1.3 中的模型圖,最主要的特點就是看不到 Exchange(交換機) 和 routekey(路由鍵) ,正是因為這種模式簡單,所以并不會涉及到復雜的條件分發(fā)等等,因此也不需要用戶去顯式的考慮交換機和路由鍵的問題。

  • 但是要注意,這種模式并不是生產(chǎn)者直接對接隊列,而是用了默認的交換機,默認的交換機會把消息發(fā)送到和 routekey 名稱相同的隊列中去,這也是我們在后面代碼中在 routekey 位置填寫了隊列名稱的原因
4.2.1.2 代碼實現(xiàn)
4.2.1.2.1 生產(chǎn)者代碼
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        // 通道綁定消息隊列
        channel.queueDeclare("queue1",false,false,false,null);
        // 發(fā)布消息
        channel.basicPublish("","queue1",null,"This is rabbitmq message 001 !".getBytes());
        // 通過工具關閉channel和釋放連接
        RabbitMqUtil.close(channel,connection);
    }
}public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        // 通道綁定消息隊列
        channel.queueDeclare("queue1",false,false,false,null);
        // 發(fā)布消息
        channel.basicPublish("","queue1",null,"This is rabbitmq message 001 !".getBytes());
        // 通過工具關閉channel和釋放連接
        RabbitMqUtil.close(channel,connection);
    }
}

1.通過工具類獲取連接

2.獲取連接通道:根據(jù) 3.1.3 的模型圖可知,生產(chǎn)者需要在獲取到連接后,再獲取信道,才能去訪問后面的交換機隊列等。

3.通道綁定消息隊列:綁定隊列前,應該綁定交換機,但是此模式中隱蔽了交換機的概念,背后使用了默認的交換機,所以直接綁定隊列。

  • queueDeclare 方法解釋
    • 參數(shù)1:queue(隊列名稱),如果隊列不存在,則自動創(chuàng)建。
    • 參數(shù)2:durable(隊列是否持久化),持久化可以保證服務器重啟后此隊列仍然存在。
    • 參數(shù)3:exclusive(排他隊列)即是否獨占隊列,如果此項為 true,該隊列僅對首次申明它的連接可見,并在連接斷開時自動刪除。
    • 參數(shù)4:autoDelete(自動刪除),最后一個消費者將消息消費完畢后,自動刪除隊列。
    • 參數(shù)5:arguments(攜帶附加屬性)。

4.發(fā)布消息:此處可以指定消息隊列的發(fā)送方法,以及內(nèi)容等,因為此模式比較簡單,所以沒有涉及到全部參數(shù),后面的模式會有詳細的講解

  • basicPublish 方法解釋
    • 參數(shù)1:exchange(交換機名稱)。
    • 參數(shù)2:routingKey(路由key),此處填寫隊列名,可理解為把消息發(fā)送到和 routekey 名稱相同的隊列中去。
    • 參數(shù)3:props(消息的控制狀態(tài)),可以在此處控制消息的持久化。參數(shù)為:MessageProperties.PERSISTENT_TEXT_PLAIN參數(shù)4:body(消息主體),類型是一個字節(jié)數(shù)組,要轉一下類型。

5.通過工具關閉channel和釋放連接:先關閉通道,再釋放連接。

4.2.1.2.2 消費者代碼
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException{
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        // 通道綁定消息隊列
        channel.queueDeclare("queue1", false, false, false, null);
        // 消費消息
        channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body): " + new String(body));
            }
        });
    }
}

1.通過工具類獲取連接

2.獲取連接通道

3.通道綁定消息隊列

4.消費消息:此處用來指定消費哪個隊列的消息,以及一些機制和回調(diào)

  • basicConsume 方法解釋
    • 參數(shù)1:queue(隊列名稱),即消費哪個隊列的消息 。
    • 參數(shù)2:autoAck(自動應答)開始消息的自動確認機制,只要消費了就從隊列刪除消息。
    • 參數(shù)3:callback(消費時的回調(diào)接口),callback 的類型是 Consumer 這里使用了 DefaultConsumer 就是 Consumer 的一個實現(xiàn)類。其中重寫 handleDelivery 方法,就可以獲取到消費的數(shù)據(jù)內(nèi)容了,這里主要使用了其中的 body,即查看消息主體,其他三個參數(shù)暫時還沒用到,有興趣可以先打印輸出一下,能先有個大概的了解。4.2.2 工作隊列模式(Work Queue)

  • Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。
  • Queue:消息隊列,理解為一個容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費者消費。
  • Consumer:消息的消費者(接收消息的程序)。
    • 此處我們假設 Consumer1、Consumer2、Consumer3 分別為完成任務速度不一樣快的消費者,這會引出此模式的一個重點問題。
4.2.2.1 如何理解

工作模式由圖可以看出,就是在簡單隊列模式的基礎上,增加了多個消費者,也就是讓多個消費者綁定同一個隊列,共同去消費,這樣能解決簡單隊列模式中,如果生產(chǎn)速速遠大于消費速度,而導致的消息堆積現(xiàn)象。

  • 因為消息被消費后就會消失,所以不必擔心任務會重復執(zhí)行。
4.2.2.2 代碼實現(xiàn)

注:工作隊列模式有兩種

輪詢模式:每個消費者均分消息公平分發(fā)模式(能者多勞):按能力分發(fā),處理速度快的分發(fā)的多,處理速度慢的分發(fā)的少

我們首先演示的是輪詢模式,根據(jù)它的缺點,又能引出公平分發(fā)模式

下面只描述與上面有差異的部分,在簡單模式中,這些基本的方法都有介紹過

4.2.2.2.1 輪詢模式-生產(chǎn)者代碼
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 1; i = 20; i++) {
            // 發(fā)布消息
            channel.basicPublish("", "work", null, (i + "號消息").getBytes());
        }
        // 通過工具關閉channel和釋放連接
        RabbitMqUtil.close(channel, connection);
    }
}

流程和簡單隊列模式基本一致,有一些小小的改動,生產(chǎn)者中主要就是加了層循環(huán),因為有多個消費者,所以多發(fā)送一些消息,可以看出一些特點和問題。

4.2.2.2.2 輪詢模式-消費者代碼

消費者 1

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        final Channel channel = connection.createChannel();
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        // 消費消息
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消費者1號:消費-" + new String(body));
            }
        });
    }
}

消費者 2

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        final Channel channel = connection.createChannel();
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        // 消費消息
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2號:消費-" + new String(body));
            }
        });
    }

上述兩個消費者都在 basicConsume中開啟了自動 Ack 應答,這一點下面會詳述,同時在消費者 1 中,增加了 sleep 2s 的語句,模擬消費者1處理消息速度慢,而消費者2處理消息速度快的場景。

運行結果:

Consumer1

消費者1號:消費-1號消息

消費者1號:消費-3號消息

消費者1號:消費-5號消息

消費者1號:消費-7號消息

消費者1號:消費-9號消息

消費者1號:消費-11號消息

消費者1號:消費-13號消息

消費者1號:消費-15號消息

消費者1號:消費-17號消息

消費者1號:消費-19號消息

Consumer2

消費者2號:消費-2號消息

消費者2號:消費-4號消息

消費者2號:消費-6號消息

消費者2號:消費-8號消息

消費者2號:消費-10號消息

消費者2號:消費-12號消息

消費者2號:消費-14號消息

消費者2號:消費-16號消息

消費者2號:消費-18號消息

消費者2號:消費-20號消息

觀察執(zhí)行過程:發(fā)現(xiàn)兩個消費者雖然每個人最后都各自處理了一半的消息,而且是按照一人一條分配的,但是消費者2號處理速度快,一下子就全部處理完了,但是消費者1號,每一次處理都需要 2s 所以,只能緩慢的處理,而消費者2號就處于一個空閑浪費的情況了。

如何切換為公平分發(fā)模式呢?

這就和 basicConsume 中的第二個參數(shù),開啟自動確認消費有關了,它默認是 true,也就代表只要一旦拿到隊列中分發(fā)給這個消費者的消息,我就會自動返回一個確認消費的標識,隊列收到后就會自動刪除掉隊列中的消息。

  • 但是這其中有一個很重要的問題,這種方式就是將風險交給了消費者,例如消費者收到了自己需要處理的 10 條消息,剛消費了 4 個,消費者宕機,掛掉了,后面的 6 個消息就丟失了。

如果想要修改為按能力分配的方式,有兩個要點

1.設置通道一次只能消費一個消息

2.關閉消息的自動確認,手動確認消息

4.2.2.2.3 公平分發(fā)模式-生產(chǎn)者代碼
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        // 一次只發(fā)送一條消息
        channel.basicQos(1);
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 1; i = 20; i++) {
            // 發(fā)布消息
            channel.basicPublish("", "work", null, (i + "號消息").getBytes());
        }
        // 通過工具關閉channel和釋放連接
        RabbitMqUtil.close(channel, connection);
    }
4.2.2.2.4 公平分發(fā)模式-消費者代碼

消費者1

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        final Channel channel = connection.createChannel();
        // 一次只接受一條未確認的消息
        channel.basicQos(1);
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        // 消費消息
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消費者1號:消費-" + new String(body));
                // 返回 deliveryTag 代表隊列可以刪除此消息了
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

消費者2

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        final Channel channel = connection.createChannel();
        //步驟一:一次只接受一條未確認的消息
        channel.basicQos(1);
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        // 消費消息
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2號:消費-" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        final Channel channel = connection.createChannel();
        //步驟一:一次只接受一條未確認的消息
        channel.basicQos(1);
        // 通道綁定消息隊列
        channel.queueDeclare("work", true, false, false, null);
        // 消費消息
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2號:消費-" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

運行結果:

Consumer1

消費者1號:消費-1號消息

Consumer2

消費者2號:消費-2號消息

消費者2號:消費-3號消息

消費者2號:消費-4號消息

消費者2號:消費-5號消息

消費者2號:消費-6號消息

消費者2號:消費-7號消息

消費者2號:消費-8號消息

消費者2號:消費-9號消息

消費者2號:消費-10號消息

消費者2號:消費-11號消息

消費者2號:消費-12號消息

消費者2號:消費-13號消息

消費者2號:消費-14號消息

消費者2號:消費-15號消息

消費者2號:消費-16號消息

消費者2號:消費-17號消息

消費者2號:消費-18號消息

消費者2號:消費-19號消息

消費者2號:消費-20號消息

4.2.3 發(fā)布與訂閱模式(Fanout 廣播)

  • Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。
  • Exchange :交換機,負責發(fā)送消息給指定隊列。
  • Queue:消息隊列,理解為一個容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費者消費。
  • Consumer:消息的消費者(接收消息的程序)。
4.2.3.1 如何理解

Fanout 直譯為 “扇出” 但是大家更多的會把它叫做廣播或者發(fā)布與訂閱,它是一種沒有路由key的模式,生產(chǎn)者將消息發(fā)送給交換機,交換機會把所有消息復制同步到所有與它綁定過的隊列上,而每個隊列只能有一個消費者拿到這條消息,如果在一個消費者連接中,創(chuàng)建多個通道,則會出現(xiàn)爭搶消息的結果。

4.2.3.2 代碼實現(xiàn)

注:下面只描述與上面有差異的部分,在簡單模式中,這些基本的方法都有介紹過

4.2.3.2.1 生產(chǎn)者代碼
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        final Channel channel = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare("order", "fanout");
        for (int i = 1; i = 20; i++) {
            // 發(fā)布消息
            channel.basicPublish("order", "", null, "fanout!".getBytes());
        }
        // 通過工具關閉channel和釋放連接
        RabbitMqUtil.close(channel, connection);
    }
}

1.聲明交換機

  • exchangeDeclare 方法解釋
    • 參數(shù)1:exchange(交換機名稱),如果交換機不存在,則自動創(chuàng)建
    • 參數(shù)2:type(類型),此處選擇 fanout 模式

2.發(fā)布消息:在 basicPublish 方法的第一個參數(shù)中輸入上述定義好的交換機的名字,第二個參數(shù),路由鍵為空

  • 循環(huán) 20 條是為了演示消費者
4.2.3.2.2 消費者代碼
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare("order", "fanout");
        // 創(chuàng)建臨時隊列
        String queue = channel.queueDeclare().getQueue();
        // 綁定臨時隊列和交換機
        channel.queueBind(queue, "order", "");
        // 消費消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1號:消費-" + new String(body));
            }
        });
    }
}

1.聲明交換機

2.創(chuàng)建臨時隊列

3..綁定臨時隊列和交換機

  • queueBind 方法解釋
    • 參數(shù)1:queue(臨時隊列)
    • 參數(shù)2:exchange(交換機)
    • 參數(shù)3:routingKey(路由key)
  • 消費者2:演示了一個連接中,多個通道的情況
public class Consumer2 {
    public static void main(String[] args) throws IOException {
       // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        Channel channel2 = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare("order", "fanout");
        channel2.exchangeDeclare("order", "fanout");
        // 創(chuàng)建臨時隊列
        String queue = channel.queueDeclare().getQueue();
        System.out.println(queue);
        // 綁定臨時隊列和交換機
        channel.queueBind(queue, "order", "");
        channel2.queueBind(queue, "order", "");
        // 消費消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2號:消費-" + new String(body));
            }
        });
        // 消費消息
        channel2.basicConsume(queue, true, new DefaultConsumer(channel2) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2-2號:消費-" + new String(body));
            }
        });
    }
}

運行結果:

消費者2號:消費-2號消息

消費者2號:消費-3號消息

消費者2號:消費-4號消息

消費者2號:消費-5號消息

消費者2號:消費-6號消息

消費者2號:消費-7號消息

消費者2號:消費-8號消息

消費者2號:消費-9號消息

消費者2號:消費-10號消息

消費者2號:消費-11號消息

消費者2號:消費-12號消息

消費者2號:消費-13號消息

消費者2號:消費-14號消息

消費者2號:消費-15號消息

消費者2號:消費-16號消息

消費者2號:消費-17號消息

消費者2號:消費-18號消息

消費者2號:消費-19號消息

消費者2號:消費-20號消息

4.2.3.2.3 為什么消費者中也聲明交換機?

從上面的代碼中可以看出,在 Producer 和 Conusmer 中我們都分別聲明了交換機,但是消費者由圖可知,并不會與交換機有直接的接觸,為什么消費者中也聲明交換機呢?

這是為了保證 Producer 或者 Producer 執(zhí)行的時候,永遠不會因為交換機還沒被聲明而出錯,例如你只在 Producer 聲明了交換機,那么你就必須先啟動 Producer ,如果直接執(zhí)行 Conusmer 此時交換機就還不存在,就會報錯。而全部寫入聲明,則可以保證不論先啟動誰,都會聲明到交換機。

4.2.4 路由模式( Routing / Direct)

  • Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。
  • Exchange :交換機,負責發(fā)送消息給指定隊列。
  • routingKey:路由key,即上圖的 key1,key2 等,相當于在交換機和隊列之間又加了一層限制
  • Queue:消息隊列,理解為一個容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費者消費。
  • Consumer:消息的消費者(接收消息的程序)。
4.2.4.1 如何理解

路由模式的交換機類型是 direct,與 fanout 模式相比,多了路由 key 這個概念。生產(chǎn)者發(fā)送攜帶指定 routingKey(路由key) 的消息到交換機,交換機拿著此 routingKey 去找到綁定了這個 routingKey 的隊列,然后發(fā)送到此隊列,一個隊列可以綁定多個 routingKey 。

4.2.4.2 代碼實現(xiàn)
4.2.4.2.1 生產(chǎn)者代碼
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare("order_direct", "direct");
        // 指定 routingKey 
        String key = "info";
        // 發(fā)布消息
        channel.basicPublish("order_direct", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes());
        // 通過工具關閉channel和釋放連接
        RabbitMqUtil.close(channel, connection);
    }
}

1.指定 routingKey ,即在 basicPublish 方法 的第二個參數(shù)中,指定 key 的值

4.2.4.2.2 消費者代碼
  • 消費者 1
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare("order_direct", "direct");
        // 獲取臨時隊列
        String queue = channel.queueDeclare().getQueue();
        // 綁定臨時隊列和交換機
        channel.queueBind(queue, "order_direct", "info");
        channel.queueBind(queue, "order_direct", "error");
        channel.queueBind(queue, "order_direct", "warn");
        // 消費消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:消費-" + new String(body));
            }
        });
    }
}

1.只是在綁定隊列和交換機的時候,增加了 key 這個值

  • 消費者2
public class Consumer2 {
    public static void main(String[] args) throws IOException {
         // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare("order_direct", "direct");
        // 獲取臨時隊列
        String queue = channel.queueDeclare().getQueue();
        // 綁定臨時隊列和交換機
        channel.queueBind(queue, "order_direct", "error");
        // 消費消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:消費-" + new String(body));
            }
        });
    }
}

運行結果:只有消費者 1 收到了消息

[code]消費者1:消費-發(fā)送給指定路由info的消息

4.2.5 通配符匹配模式(Topic)

  • Producer:消息的生產(chǎn)者(發(fā)送消息的程序)。
  • Exchange :交換機,負責發(fā)送消息給指定隊列。
  • routingKey:路由key,即上圖的 key1,key2 等,相當于在交換機和隊列之間又加了一層限制但是 Topic 中的 key 為通配符的形式,這樣可以大大的提高效率
  • Queue:消息隊列,理解為一個容器,生產(chǎn)者向它發(fā)送消息,它把消息存儲,等待消費者消費。
  • Consumer:消息的消費者(接收消息的程序)。
4.2.5.1 如何理解

通配符匹配模式的交換機類型為 topic,因為它與 Direct 模式很相似,所以大家有時候也會把 Direct 模式和 Topic 共同歸入路由模式下,它們的區(qū)別就是,Direct 模式的 routingKey 是一個指定的值,而 Topic 模式的 routingKey 可以使用通配符, 而且一般都是由一個或多個單詞組成,多個單詞之間以”.”分割,例如: ideal.insert。

  • * :匹配正好一個詞,例如: order.* 可以匹配到 order.insert
  • #:匹配一個或者多個詞,例如:order.# 可以匹配到 order.insert.common
    • # 就像一個多層的概念,而 * 只是一個單層的概念
4.2.5.2 代碼實現(xiàn)
4.2.5.2.1 生產(chǎn)者代碼
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order_topic", "topic");
        // 聲明交換機
        String key = "user.query.all";
        // 發(fā)布消息
        channel.basicPublish("order_topic", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes());
        RabbitMqUtil.close(channel, connection);
    }
}
4.2.5.2.2 消費者代碼
  • 消費者1
public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order_topic", "topic");
        // 聲明交換機
        String key = "user.query.all";
        // 發(fā)布消息
        channel.basicPublish("order_topic", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes());
        RabbitMqUtil.close(channel, connection);
    }
}

消費者2

public class Producer {
    @Test
    public void sendMessage() throws IOException, TimeoutException {
        // 通過工具類獲取連接
        Connection connection = RabbitMqUtil.getConnection();
        // 獲取連接通道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order_topic", "topic");
        // 聲明交換機
        String key = "user.query.all";
        // 發(fā)布消息
        channel.basicPublish("order_topic", key, null, ("發(fā)送給指定路由" + key + "的消息").getBytes());
        RabbitMqUtil.close(channel, connection);
    }
}

運行結果:只有消費者 2 收到了消息,因為消息是一個多層的結構,只有 user.# 能匹配到

消費者2:消費-發(fā)送給指定路由user.query.all的消息

5. Springboot 實現(xiàn) RabbitMQ

SpringBoot 提供 Spring For RabbitMQ 的啟動器,同時提供了一系列注解以及 RabbitTemplate 供我們使用,能夠極大的簡化開發(fā) RabbitMQ 的步驟,下面分別演示了【5.1 基于純注解】 以及【 5.2 基于注解 + 配置類】 的寫法,其使用方式大同小異,只是聲明和綁定隊列交換機等的位置不同。一般認為后者更好維護管理,任選其一即可。

環(huán)境準備:

1.首先創(chuàng)建 SprinBoot 項目,然后選擇 RabbitMQ 的啟動器,以及單元測試等基本啟動器

2.編寫 yml 配置文件,編寫連接 RabbitMQ 需要的數(shù)據(jù)

RabbitMQ 依賴

dependency>
  groupId>org.springframework.boot/groupId>
  artifactId>spring-boot-starter-amqp/artifactId>
/dependency>

yml 配置文件

spring:
  rabbitmq:
    host: 192.168.122.1 # 服務器地址
    port: 5672 # tcp端口
    username: admin # 用戶名
    password: admin # 用戶密碼
    virtual-host: /rabbitmq_springboot_01 # 虛擬主機

5.1 基于純注解

注:此方式?jīng)]有創(chuàng)建配置類來管理隊列以及交換機的聲明和綁定等,而是全部通過注解的方式直接在消費者中寫入

5.1.1 簡單隊列模式

所有生產(chǎn)消息的代碼,我們都放到 Test 中去做

  • 生產(chǎn)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleSendMessage() {
        rabbitTemplate.convertAndSend("simple_queue", "This is a message !");
    }
}

第一步就是注入 SpringBoot 提供給我們的 RabbitTemplate

通過 RabbitTemplate 的 convertAndSend 方法用來發(fā)送消息,他有多種重載方式,今天分別會用到 2 個 和 3 個參數(shù)的

  • convertAndSend 方法詳解(兩個參數(shù))
    • 參數(shù)1:routingKey(路由key)
    • 參數(shù)2:object(發(fā)送的消息正文)
  • convertAndSend 方法詳解(三個參數(shù))
    • 參數(shù)1:exchange(交換機)
    • 參數(shù)2:routingKey(路由key)
    • 參數(shù)3:object(發(fā)送的消息正文)
  • 消費者
// 注入容器
@Component
// 監(jiān)聽 RabbitMQ
@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue", durable = "true", exclusive = "false", autoDelete = "false"))
public class SimpleConsumer {
    // 自動回調(diào)
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("消費者:" + message);
    }
}

1.注入容器

2.監(jiān)聽 RabbitMQ,在 @RabbitListener 注解中,可以實現(xiàn),隊列的聲明,以及后面交換機與隊列的綁定等

  • @Queue 可以有四個參數(shù),因為其各有默認值,所以只給定 value 值,就會按照 持久化,非獨占,非自動刪除的方式默認創(chuàng)建
    • 參數(shù)1:value(隊列名)
    • 參數(shù)2:durable ( 持久化消息隊列)RabbitMQ 重啟后,隊列仍存在,默認 true
    • 參數(shù)3:exclusive(是否獨占) 表示該消息隊列是否只在當前 Connection 生效,默認是 false
    • 參數(shù)4:auto-delete(自動刪除)表示消息隊列沒有在使用時將被自動刪除,默認是 false

3.在方法上添加 @RabbitHandler 注解,就能夠實現(xiàn)自動回調(diào),這樣我們就能拿到生產(chǎn)者中的消息了

  • 注:receiveMessage 這個方法的參數(shù)類型,取決于你在生產(chǎn)者有發(fā)送了什么類型的數(shù)據(jù)

5.1.2 工作隊列模式

5.1.2.1 輪詢模式

生產(chǎn)者:沒什么好說的,因為工作模式有多個消費者,所以多發(fā)送幾條消息

    @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         * 注入 RabbitTemplate
         */
        @Autowired
        @Test
        public void testWorkSendMessage() {
            for (int i = 0; i  20; i++) {
                rabbitTemplate.convertAndSend("work_queue", "This is a message !, 序號:" + i);
            }
        }
    }@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         * 注入 RabbitTemplate
         */
        @Autowired
        @Test
        public void testWorkSendMessage() {
            for (int i = 0; i  20; i++) {
                rabbitTemplate.convertAndSend("work_queue", "This is a message !, 序號:" + i);
            }
        }
    }
  • 消費者
    @Component
    public class WorkConsumer {  
        // 監(jiān)聽 RabbitMQ
        @RabbitListener(queuesToDeclare = @Queue("work_queue"))
        // 消費者1
        public void receiveMessage1(String message) {
            System.out.println("消費者1:" + message);
        // 監(jiān)聽 RabbitMQ
        @RabbitListener(queuesToDeclare = @Queue("work_queue")
        // 消費者2
        public void receiveMessage2(String message) {
            System.out.println("消費者2:" + message);
        }    
    }
  • 1.@RabbitListener 注解,既可以放在類上,也可以放在方法上,例如上述代碼,我們就分別放在了兩個方法上,用來指代不同的消費者。
    • 但是如果在類上加入 @RabbitListener 注解,而在下面兩個方法中,添加 @RabbitHandler 注解則會報錯,需要分別為每個消費者都創(chuàng)建一個類
5.1.2.2 公平模式(按能力分配)
5.1.2.2.1 修改配置文件的方式
  • 生產(chǎn)者不變
  • 修改配置文件 yml / properties
spring:
  rabbitmq:
    host: 192.168.122.1 # 服務器地址
    port: 5672 # tcp端口
    username: admin # 用戶名
    password: admin # 用戶密碼
    virtual-host: /rabbitmq_springboot_01 # 虛擬主機
	# 新增部分
    listener:
      simple:
        acknowledge-mode: manual # 開啟 ack 手動應答
        prefetch: 1 # 每次只能消費 1 條消息

acknowledge-mode 選項介紹

auto:自動確認,為默認選項

manual:手動確認(按能力分配就需要設置為手動確認)

none:不確認,發(fā)送后自動丟棄

  • 消費者
@Component
public class WorkConsumer {
	// 監(jiān)聽 RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // 消費者 1
    public void receiveMessage(String body, Message message, Channel channel) throws IOException {
        try {
            // 打印輸出消息主題
            System.out.println("消費者1:" + body);
            // 返回 deliveryTag 代表隊列可以刪除此消息了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
            // 消費者告訴隊列信息消費失敗
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
   // 監(jiān)聽 RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // 消費者 2
    public void receiveMessage2(String body, Message message, Channel channel) throws IOException{
        try {
            // 延遲 2s 代表處理業(yè)務慢
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
			// 打印輸出消息主題
            System.out.println("消費者2:" + body);
            // 返回 deliveryTag 代表隊列可以刪除此消息了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            // 消費者告訴隊列信息消費失敗
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

1.因為在 yml 配置中開啟了手動確認,所以,需要在成功和失敗后分別返回確認消息

2.basicAck 方法解釋

  • 參數(shù)1:deliveryTag(交付標志,即該消息的index),返回即代表確認收到消息,隊列可以刪除此消息了
  • 參數(shù)2:mutiple(是否批量)選擇 true 將一次性拒絕所有小于 deliveryTag 的消息

3.basicNack 方法解釋

  • 參數(shù) 1 |參數(shù) 2 同上
  • 參數(shù)3:requeue(被拒絕的是否重新進入隊列)

運行結果:

消費者1:This is a message !, 序號:2

消費者1:This is a message !, 序號:3

消費者1:This is a message !, 序號:4

消費者1:This is a message !, 序號:5

消費者1:This is a message !, 序號:6

消費者1:This is a message !, 序號:7

消費者1:This is a message !, 序號:8

消費者1:This is a message !, 序號:9

消費者1:This is a message !, 序號:10

消費者1:This is a message !, 序號:11

消費者1:This is a message !, 序號:12

消費者1:This is a message !, 序號:13

消費者1:This is a message !, 序號:14

消費者1:This is a message !, 序號:15

消費者1:This is a message !, 序號:16

消費者1:This is a message !, 序號:17

消費者1:This is a message !, 序號:18

消費者1:This is a message !, 序號:19

消費者1:This is a message !, 序號:20

消費者2:This is a message !, 序號:1

到現(xiàn)在已經(jīng)實現(xiàn)了修改配置文件的方式實現(xiàn)按能力分配,補充幾個配置的內(nèi)容,我們上面只用了一部分,其他的方便大家參考,yml 和 properties 大家自己選擇即可

# 發(fā)送確認
spring.rabbitmq.publisher-confirm-type=correlated
# spring.rabbitmq.publisher-confirms=true(舊版)
# 發(fā)送回調(diào)
spring.rabbitmq.publisher-returns=true
# 消費手動確認
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 并發(fā)消費者初始化值
spring.rabbitmq.listener.simple.concurrency=1
# 并發(fā)消費者的最大值
spring.rabbitmq.listener.simple.max-concurrency=10
# 每個消費者每次監(jiān)聽時可拉取處理的消息數(shù)量
# 在單個請求中處理的消息個數(shù),他應該大于等于事務數(shù)量(unack的最大數(shù)量)
spring.rabbitmq.listener.simple.prefetch=1
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
5.1.2.2.1 配置工廠的方式
    /**
     * 設置消費者的確認機制,并達到能者多勞的效果
     *
     * @param connectionFactory 連接工廠
     * @return
     */
    @Bean("workListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory containerFactory =
            new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        // 修改為手動確認
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 拒絕策略,true 回到隊列 false丟棄,默認是true
        containerFactory.setDefaultRequeueRejected(true);
        // 默認的PrefetchCount是250 修改為 1
        containerFactory.setPrefetchCount(1);
        return containerFactory;
    }
  • 消費者修改
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// 將上面的監(jiān)聽,增加 containerFactory 屬性,然后將配置好的工廠傳入
@RabbitListener(queuesToDeclare = @Queue("work_queue"), containerFactory = "workListenerFactory")

5.1.3 發(fā)布與訂閱模式

  • 生產(chǎn)者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    @Test
    public void testFanoutSendMessage() {
        rabbitTemplate.convertAndSend("order_exchange", "", "This is a message !");
    }
}

1.因為從這個模式開始,就涉及到交換機了,所以用的是三個參數(shù)的方法

  • 消費者
@Component
public class FanoutConsumer {
    // 綁定臨時隊列和交換機
    @RabbitListener(bindings = {
            @QueueBinding(
                value = @Queue(), // 臨時隊列
                exchange = @Exchange(name = "order_exchange", type = "fanout") // 交換機與類型
            )
    })
    public void receiveMessage1(String message) {
        System.out.println("消費者1:" + message);
    }
    // 綁定臨時隊列和交換機
    @RabbitListener(bindings = {
            @QueueBinding(
                value = @Queue(), // 臨時隊列
                exchange = @Exchange(name = "order_exchange", type = "fanout") // 交換機與類型
            )
    })
    public void receiveMessage2(String message) {
        System.out.println("消費者2:" + message);
    }
}

5.1.4 路由模式(Direct)

  • 生產(chǎn)者
    @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         * 注入 RabbitTemplate
         */
        @Autowired
        @Test
        public void testDirectSendMessage() {
            rabbitTemplate.convertAndSend("direct_exchange", "info", "This is a message !");
        }
    }
  • 消費者
@Component
public class DirectConsumer {
    // 綁定臨時隊列和交換機
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(), // 臨時隊列
                    exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交換機和類型
                    key = {"info", "warn", "error"} // 路由key
            )
    })
    public void receiveMessage1(String message) {
        System.out.println("消費者1:" + message);
    }
     // 綁定臨時隊列和交換機
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(), // 臨時隊列
                    exchange = @Exchange(name = "direct_exchange", type = "direct"), // 交換機和類型
                    key = {"info", "warn", "error"} // 路由key
            )
    })
    public void receiveMessage2(String message) {
        System.out.println("消費者2:" + message);
    }
}

5.1.5 主題模式

  • 生產(chǎn)者
    @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         * 注入 RabbitTemplate
         */
        @Autowired
        @Test
        public void testTopicSendMessage() {
            rabbitTemplate.convertAndSend("topic_exchange", "order.insert.common", "This is a message !");
        }
    }
  • 消費者
@Component
public class TopicConsumer {
    // 綁定臨時隊列和交換機
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(), // 臨時隊列
                    exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交換機和類型
                    key = {"order.*"} // 通配符路由key
            )
    })
    public void receiveMessage1(String message) {
        System.out.println("消費者1:" + message);
    }
    // 綁定臨時隊列和交換機
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(), // 臨時隊列
                    exchange = @Exchange(name = "topic_exchange", type = "topic"), // 交換機和類型
                    key = {"order.*"} // 通配符路由key
            )
    })
    public void receiveMessage2(String message) {
        System.out.println("消費者2:" + message);
    }
}

5.2 基于注解 + 配置類

其實這種方式,就是將交換機,隊列的聲明和綁定都在配置類中進行,一個是消費者中的注解變的簡潔了,再有就是統(tǒng)一管理,更加條理,而且生產(chǎn)者和消費者引用的時候也更加方便,日后修改的時候,也不需要對每一處都修改。

由于篇幅過長了,這里演示最復雜的 Topic 方式,其他的也是信手拈來。

配置類

@Configuration
public class RabbitMqConfiguration {
    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_QUEUE_NAME_2 = "test_topic_queue_2";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";
    public static final String TOPIC_ROUTINGKEY_2 = "test.#";
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE_NAME_1);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE_NAME_2);
    }
    @Bean
    public Binding bindingTopic1(){
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
    @Bean
    public Binding bindingTopic2(){
        return BindingBuilder.bind(topicQueue2())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_2);
    }
}

1.添加 @Configuration 注解:表明這是一個配置類

2.定義常量:將交換機名,隊列名,路由key 等都可以創(chuàng)建為常量,調(diào)用,管理和修改都非常方便,還可以創(chuàng)建出一個專門的 RabbitMQ 的常量類。

3.定義交換機:因為這個例子是 Topic 所以選擇 TopicExchange 類型

4.定義隊列:傳入隊列名常量即可,因為持久化等存在默認值,也可以自己自定持久化,是否獨占等參數(shù)

5.綁定交換機和隊列:利用 BindingBuilder 的 bind 方法綁定隊列,to 綁定到指定交換機,with 傳入路由key

  • 生產(chǎn)者
    @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
    @RunWith(SpringRunner.class)
    public class RabbitMqTest {
        /**
         * 注入 RabbitTemplate
         */
        @Autowired
        @Test
        public void testTopicSendMessage() {
            rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
        }
    }
  • 消費者
@Component
public class TopicConsumer {
	// 綁定隊列即可
    @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_1})
    public void receiveMessage1(String message) {
        System.out.println("消費者1:" + message);
    }
    // 綁定隊列即可
    @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_2})
    public void receiveMessage2(String message) {
        System.out.println("消費者2:" + message);
    }
}

5、總結

這篇文章就到這里了,希望大家可以多多關注腳本之家的其他文章!

您可能感興趣的文章:
  • docker部署rabbitmq集群的實現(xiàn)方法
  • Docker集群的創(chuàng)建與管理實例詳解
  • 深入淺析RabbitMQ鏡像集群原理

標簽:武威 臺州 廣東 安徽 濟寧 濟源 泰安 汕頭

巨人網(wǎng)絡通訊聲明:本文標題《一篇文章帶你從入門到精通:RabbitMQ》,本文關鍵詞  一篇,文章,帶你,從,入門,;如發(fā)現(xiàn)本文內(nèi)容存在版權問題,煩請?zhí)峁┫嚓P信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《一篇文章帶你從入門到精通:RabbitMQ》相關的同類信息!
  • 本頁收集關于一篇文章帶你從入門到精通:RabbitMQ的相關信息資訊供網(wǎng)民參考!
  • 推薦文章