目錄
- 一、ZeroMQ概述
- 二、安裝
- 三、Request-Reply (請求響應模式)
- 3.1 Request-Reply模式概述:
- 3.2 Client端python實現(xiàn)
- 3.3 Server端python實現(xiàn)
- 四、Publish/Subscribe(訂閱-發(fā)布模式 )
- 五、Push/Pull(流水線模式)
- 5.1 流水線模式概述:
- 5.2 Ventilator
- 5.3 worker
- 5.4 sink
- 六、總結(jié)
一、ZeroMQ概述
- ZeroMQ(又名ØMQ,MQ,或zmq)像一個可嵌入的網(wǎng)絡庫,但其作用就像一個并發(fā)框架。
- ZeroMQ類似于標準Berkeley套接字,其提供了各種傳輸工具,如進程內(nèi)、進程間、TCP和組播中進行原子消息傳送的套接字
- 可以使用各種模式實現(xiàn)N對N的套接字連接,這些模式包括:發(fā)布-訂閱、任務分配、請求-應答。
- ZeroMQ的速度足夠快,因此可充當集群產(chǎn)品的結(jié)構(gòu)。
- ZeroMQ的異步I/O模型提供了可擴展的多核應用程序,用異步消息來處理任務
- ZeroMQ核心由C語言編寫,支持C、C++、java、python等多種編程語言的API,并可運行在大多數(shù)操作系統(tǒng)上
總結(jié)以下:ØMQ (ZeroMQ) 是一個基于消息隊列的多線程網(wǎng)絡庫,它封裝了網(wǎng)絡通信、消息隊列、線程調(diào)度等功能,向上層提供簡潔的API,應用程序通過加載庫文件,調(diào)用API函數(shù)來實現(xiàn)高性能網(wǎng)絡通信。
看起來有些抽象,下面我們結(jié)合ZeroMQ 的 Python 封裝———— pyzmp,用實例看一下ZeroMQ的三種最基本的工作模式。
二、安裝
安裝方法
查看是否安裝成功
>>> import zmq
>>> print(zmq.__version__)
22.0.3
三、Request-Reply (請求響應模式)
3.1 Request-Reply模式概述:
- 消息雙向的,有來有往。
- Client請求的消息,Server必須答復給Client。
- Client在請求后,Server必須回響應,注意:Server不返回響應會報錯。
- Server和Client都可以是1:N的模型。通常把1認為是Server,N認為是Client。
- 更底層的端點地址是對上層隱藏的,每個請求都隱含回應地址,而應用則不關心它。
- ZMQ 可以很好的支持路由功能(實現(xiàn)路由功能的組件叫做 Device),把 1:N 擴展為 N:M(只需要加入若干路由節(jié)點)。
3.2 Client端python實現(xiàn)
#client.py
import zmq
context = zmq.Context()
# Socket to talk to server
print("Connecting to hello world server…")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send(b"Hello")
# Get the reply.
message = socket.recv()
print(f"Received reply [ {message} ]")
3.3 Server端python實現(xiàn)
#server.py
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print("Received request: %s" % message)
# Do some 'work'
time.sleep(1)
# Send reply back to client
socket.send(b"World")
- 啟動client.py 首先會打印Connecting to hello world server… 但不會受到任何消息。
- 然后啟動server.py ,客戶端收到來自客戶端的request: b'Hello'
- 此時client端收到來自server端的 reply: [ b'World' ]
python client.py
Connecting to hello world server…
Received reply [ b'World' ]
python server.py
Received request: b'Hello'
可以試一下,多運行幾個client.py,看看情況是什么樣的。
四、Publish/Subscribe(訂閱-發(fā)布模式 )
4.1 Pub-Subs模式概述:
- 消息單向,有去無回
- 一個發(fā)布端,多個訂閱端;發(fā)布端只管產(chǎn)生數(shù)據(jù),發(fā)布端發(fā)布一條消息,可被多個訂閱端同時收到。
- 發(fā)布者不必關心訂閱者的加入和離開,消息會以 1:N 的方式擴散到每個訂閱者。
- 廣播所有client,沒有隊列緩存,斷開連接數(shù)據(jù)將永遠丟失。
- 如果Publish端開始發(fā)布信息時,Subscribe端尚未連接進來,則這些信息會被直接丟棄。
- PUB和SUB誰bind誰connect并無嚴格要求(雖本質(zhì)并無區(qū)別),但仍建議PUB使用bind,SUB使用connect
- 使用SUB設置一個訂閱時,必須使用zmq_setsockopt()對消息進行過濾
這里直接引用官方文檔的例子:
發(fā)布者:類似于一個天氣更新服務器,向訂閱者發(fā)送天氣更新,內(nèi)容包括郵政編碼、溫度、濕度等信息
#Publisher.py
import zmq
from random import randrange
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
zipcode = randrange(1, 100000)
temperature = randrange(-80, 135)
relhumidity = randrange(10, 60)
socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
訂閱者:它監(jiān)聽發(fā)布者更新的數(shù)據(jù)流,過濾只接收與特定郵政編碼相關的天氣信息,默認接收接收10條數(shù)據(jù)
#Subscribe.py
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
# Process 5 updates
total_temp = 0
for update_nbr in range(5):
string = socket.recv_string()
zipcode, temperature, relhumidity = string.split()
total_temp += int(temperature)
print(
"Average temperature for zipcode '%s' was %dF"
% (zip_filter, total_temp / (update_nbr + 1))
)
五、Push/Pull(流水線模式)
5.1 流水線模式概述:
- 主要用于多任務并行。
- 消息單向,有去無回。
- Push的任何一個消息,始終只會有一個Pull端收到消息。
- Push 端還是 Pull 端都可以做 server,bind 到某個地址等待對方訪問。
- 如果有多個PULL端同時連接到PUSH端,則PUSH端會在內(nèi)部做一個負載均衡,采用平均分配的算法,將所有消息均衡發(fā)布到PULL端上。
- 由三部分組成,Push進行數(shù)據(jù)推送,work進行數(shù)據(jù)緩存,Pull進行數(shù)據(jù)競爭獲取處理。
- 存在一個數(shù)據(jù)緩存和處理負載,當連接被斷開,數(shù)據(jù)不會丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對端。
ventilator 使用的是 SOCKET_PUSH,將任務分發(fā)到 Worker 節(jié)點上。Worker 節(jié)點上,使用 SOCKET_PULL 從上游接受任務,并使用 SOCKET_PUSH 將結(jié)果匯集到 Sink。值得注意的是,任務的分發(fā)的時候也同樣有一個負載均衡的路由功能,worker 可以隨時自由加入,ventilator 可以均衡將任務分發(fā)出去。
Push/Pull模式還是蠻常用的,這里我們主要測試一下它的負載均衡。
5.2 Ventilator
# ventilator.py
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
while True:
socket.send(b"test")
print("已發(fā)送")
time.sleep(1)
5.3 worker
# worker.py
import zmq
context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')
while True:
data = recive.recv()
print("work1 正在轉(zhuǎn)發(fā)...")
sender.send(data)
5.4 sink
# sink.py
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")
while True:
response = socket.recv()
print("response: %s" % response)
打開4個Terminal,分別運行
python sink.py
python worker.py
python worker.py
python ventilator.py
六、總結(jié)
消息模型可以根據(jù)需要組合使用,后續(xù)的代理模式和路由模式等都是在三種基本模式上面的擴展或變異。
到此這篇關于Python網(wǎng)絡編程之ZeroMQ知識總結(jié)的文章就介紹到這了,更多相關Python ZeroMQ知識總結(jié)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- python神經(jīng)網(wǎng)絡編程之手寫數(shù)字識別
- python Socket網(wǎng)絡編程實現(xiàn)C/S模式和P2P
- python神經(jīng)網(wǎng)絡編程實現(xiàn)手寫數(shù)字識別
- python網(wǎng)絡編程:socketserver的基本使用方法實例分析
- python網(wǎng)絡編程socket實現(xiàn)服務端、客戶端操作詳解
- Python網(wǎng)絡編程之使用TCP方式傳輸文件操作示例
- Python 網(wǎng)絡編程之UDP發(fā)送接收數(shù)據(jù)功能示例【基于socket套接字】
- python網(wǎng)絡編程之多線程同時接受和發(fā)送
- python socket網(wǎng)絡編程之粘包問題詳解
- python 網(wǎng)絡編程要點總結(jié)