python的multiprocessing包是標(biāo)準(zhǔn)庫提供的多進(jìn)程并行計(jì)算包,提供了和threading(多線程)相似的API函數(shù),但是相比于threading,將任務(wù)分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。
下面我們對(duì)multiprocessing中的Pool和Process類做介紹。
Pool
采用Pool進(jìn)程池對(duì)任務(wù)并行處理更加方便,我們可以指定并行的CPU個(gè)數(shù),然后 Pool 會(huì)自動(dòng)把任務(wù)放到進(jìn)程池中運(yùn)行。 Pool 包含了多個(gè)并行函數(shù)。
apply apply_async
apply 要逐個(gè)執(zhí)行任務(wù),在python3中已經(jīng)被棄用,而apply_async是apply的異步執(zhí)行版本。并行計(jì)算一定要采用apply_async函數(shù)。
import multiprocessing
import time
from random import randint, seed
def f(num):
seed()
rand_num = randint(0,10) # 每次都隨機(jī)生成一個(gè)停頓時(shí)間
time.sleep(rand_num)
return (num, rand_num)
start_time = time.time()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
pool_list = []
result_list = []
start_time = time.time()
for xx in xrange(10):
pool_list.append(pool.apply_async(f, (xx, ))) # 這里不能 get, 會(huì)阻塞進(jìn)程
result_list = [xx.get() for xx in pool_list]
#在這里不免有人要疑問,為什么不直接在 for 循環(huán)中直接 result.get()呢?這是因?yàn)閜ool.apply_async之后的語句都是阻塞執(zhí)行的,調(diào)用 result.get() 會(huì)等待上一個(gè)任務(wù)執(zhí)行完之后才會(huì)分配下一個(gè)任務(wù)。事實(shí)上,獲取返回值的過程最好放在進(jìn)程池回收之后進(jìn)行,避免阻塞后面的語句。
# 最后我們使用一下語句回收進(jìn)程池:
pool.close()
pool.join()
print result_list
print '并行花費(fèi)時(shí)間 %.2f' % (time.time() - start_time)
print '串行花費(fèi)時(shí)間 %.2f' % (sum([xx[1] for xx in result_list]))
#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]
#并行花費(fèi)時(shí)間 14.11
#串行花費(fèi)時(shí)間 45.00
map map_async
map_async 是 map的異步執(zhí)行函數(shù)。
相比于 apply_async, map_async 只能接受一個(gè)參數(shù)。
import time
from multiprocessing import Pool
def run(fn):
#fn: 函數(shù)參數(shù)是數(shù)據(jù)列表的一個(gè)元素
time.sleep(1)
return fn*fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print '串行:' #順序執(zhí)行(也就是串行執(zhí)行,單進(jìn)程)
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print "順序執(zhí)行時(shí)間:", int(e1 - s)
print '并行:' #創(chuàng)建多個(gè)進(jìn)程,并行執(zhí)行
pool = Pool(4) #創(chuàng)建擁有5個(gè)進(jìn)程數(shù)量的進(jìn)程池
#testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù)
rl =pool.map(run, testFL)
pool.close()#關(guān)閉進(jìn)程池,不再接受新的進(jìn)程
pool.join()#主進(jìn)程阻塞等待子進(jìn)程的退出
e2 = time.time()
print "并行執(zhí)行時(shí)間:", int(e2-e1)
print rl
# 串行:
# 順序執(zhí)行時(shí)間: 6
# 并行:
# 并行執(zhí)行時(shí)間: 2
# [1, 4, 9, 16, 25, 36]
Process
采用Process必須注意的是,Process對(duì)象來創(chuàng)建進(jìn)程,每一個(gè)進(jìn)程占據(jù)一個(gè)CPU,所以要建立的進(jìn)程必須 小于等于 CPU的個(gè)數(shù)。
如果啟動(dòng)進(jìn)程數(shù)過多,特別是當(dāng)遇到CPU密集型任務(wù),會(huì)降低并行的效率。
#16.6.1.1. The Process class
from multiprocessing import Process, cpu_count
import os
import time
start_time = time.time()
def info(title):
# print(title)
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
time.sleep(3)
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
# info('main line')
p_list = [] # 保存Process新建的進(jìn)程
cpu_num = cpu_count()
for xx in xrange(cpu_num):
p_list.append(Process(target=f, args=('xx_%s' % xx,)))
for xx in p_list:
xx.start()
for xx in p_list:
xx.join()
print('spend time: %.2f' % (time.time() - start_time))
parent process: 11741
# parent process: 11741
# parent process: 11741
# process id: 12249
# process id: 12250
# parent process: 11741
# process id: 12251
# process id: 12252
# hello xx_1
# hello xx_0
# hello xx_2
# hello xx_3
# spend time: 3.04
進(jìn)程間通信
Process和Pool均支持Queues 和 Pipes 兩種類型的通信。
Queue 隊(duì)列
隊(duì)列遵循先進(jìn)先出的原則,可以在各個(gè)進(jìn)程間使用。
# 16.6.1.2. Exchanging objects between processes
# Queues
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
pipe
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
queue 與 pipe比較
Pipe() can only have two endpoints.
Queue() can have multiple producers and consumers.
When to use them
If you need more than two points to communicate, use a Queue().
If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().
參考:
https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
共享資源
多進(jìn)程應(yīng)該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數(shù)。
在多進(jìn)程情況下,由于每個(gè)進(jìn)程有自己獨(dú)立的內(nèi)存空間,以上方法并不合適。
此時(shí)我們可以通過共享內(nèi)存和Manager的方法來共享資源。
但這樣做提高了程序的復(fù)雜度,并因?yàn)橥降男枰档土顺绦虻男省?/p>
共享內(nèi)存
共享內(nèi)存僅適用于 Process 類,不能用于進(jìn)程池 Pool
# 16.6.1.4. Sharing state between processes
# Shared memory
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Manager Class
Manager Class 既可以用于Process 也可以用于進(jìn)程池 Pool。
from multiprocessing import Manager, Process
def f(d, l, ii):
d[ii] = ii
l.append(ii)
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p_list = []
for xx in range(4):
p_list.append(Process(target=f, args=(d, l, xx)))
for xx in p_list:
xx.start()
for xx in p_list:
xx.join()
print d
print l
# {0: 0, 1: 1, 2: 2, 3: 3}
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]
補(bǔ)充:python程序多進(jìn)程運(yùn)行時(shí)間計(jì)算/多進(jìn)程寫數(shù)據(jù)/多進(jìn)程讀數(shù)據(jù)
import time
time_start=time.time()
time_end=time.time()
print('time cost',time_end-time_start,'s')
單位為秒,也可以換算成其他單位輸出
注意寫測試的時(shí)候,函數(shù)名要以test開頭,否則運(yùn)行不了。
多線程中的問題:
1)多線程存數(shù)據(jù):
def test_save_features_to_db(self):
df1 = pd.read_csv('/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv')
com_list = df1['company_name'].values.tolist()
# com_list = com_list[400015:400019]
# print 'test_save_features_to_db'
# print(com_list)
p_list = [] # 進(jìn)程列表
i = 1
p_size = len(com_list)
for company_name in com_list:
# 創(chuàng)建進(jìn)程
p = Process(target=self.__save_data_iter_method, args=[company_name])
# p.daemon = True
p_list.append(p)
# 間歇執(zhí)行進(jìn)程
if i % 20 == 0 or i == p_size: # 20頁處理一次, 最后一頁處理剩余
for p in p_list:
p.start()
for p in p_list:
p.join() # 等待進(jìn)程結(jié)束
p_list = [] # 清空進(jìn)程列表
i += 1
總結(jié):多進(jìn)程寫入的時(shí)候,不需要lock,也不需要返回值。
核心p = Process(target=self.__save_data_iter_method, args=[company_name]),其中target指向多進(jìn)程的一次完整的迭代,arg則是該迭代的輸入。
注意寫法args=[company_name]才對(duì),原來寫成:args=company_name,args=(company_name)會(huì)報(bào)如下錯(cuò):只需要1個(gè)參數(shù),而給出了34個(gè)參數(shù)。
多進(jìn)程外層循環(huán)則是由輸入決定的,有多少個(gè)輸入就為多少次循環(huán),理解p.start和p.join;
def __save_data_iter_method(self, com):
# time_start = time.time()
# print(com)
f_d_t = ShiXinFeaturesDealSvc()
res = f_d_t.get_time_features(company_name=com)
# 是否失信
shixin_label = res.shixin_label
key1 = res.shixin_time
if key1:
public_at = res.shixin_time
company_name = res.time_map_features[key1].company_name
# print(company_name)
established_years = res.time_map_features[key1].established_years
industry_dx_rate = res.time_map_features[key1].industry_dx_rate
regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt
share_change_cnt = res.time_map_features[key1].share_change_cnt
industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt
address_change_cnt = res.time_map_features[key1].address_change_cnt
fr_change_cnt = res.time_map_features[key1].fr_change_cnt
judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt
bidding_cnt = res.time_map_features[key1].bidding_cnt
trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt
network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt
cancel_cnt = res.time_map_features[key1].cancel_cnt
industry_all_cnt = res.time_map_features[key1].industry_all_cnt
network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt
network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt
net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt
judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt
f_d_do = ShixinFeaturesDto(company_name=company_name, established_years=established_years,
industry_dx_rate=industry_dx_rate, regcap_change_cnt=regcap_change_cnt,
share_change_cnt=share_change_cnt, industry_all_cnt=industry_all_cnt,
industry_dx_cnt=industry_dx_cnt, address_change_cnt=address_change_cnt,
fr_change_cnt=fr_change_cnt, judgedoc_cnt=judgedoc_cnt,
bidding_cnt=bidding_cnt, trade_mark_cnt=trade_mark_cnt,
network_share_cancel_cnt=network_share_cancel_cnt, cancel_cnt=cancel_cnt,
network_share_zhixing_cnt=network_share_zhixing_cnt,
network_share_judge_doc_cnt=network_share_judge_doc_cnt,
net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt,
judge_doc_cnt=judge_doc_cnt, public_at=public_at, shixin_label=shixin_label)
# time_end = time.time()
# print('totally cost', time_end - time_start)
self.cfdbsvc.save_or_update_features(f_d_do)
def save_or_update_features(self, shixin_features_dto):
"""
添加或更新:
插入一行數(shù)據(jù), 如果不存在則插入,存在則更新
"""
self._pg_util = PgUtil()
p_id = None
if isinstance(shixin_features_dto, ShixinFeaturesDto):
p_id = str(uuid.uuid1())
self._pg_util.execute_sql(
self.s_b.insert_or_update_row(
self.model.COMPANY_NAME,
{
self.model.ID: p_id,
# 公司名
self.model.COMPANY_NAME: shixin_features_dto.company_name,
# 失信時(shí)間
self.model.PUBLIC_AT: shixin_features_dto.public_at,
self.model.SHIXIN_LABEL : shixin_features_dto.shixin_label,
self.model.ESTABLISHED_YEARS: shixin_features_dto.established_years,
self.model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate,
self.model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt,
self.model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt,
self.model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt,
self.model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt,
self.model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt,
self.model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt,
self.model.CANCEL_CNT: shixin_features_dto.cancel_cnt,
self.model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt,
self.model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt,
self.model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt,
self.model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt,
self.model.BIDDING_CNT: shixin_features_dto.bidding_cnt,
self.model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt,
self.model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt
},
[self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT,
self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT,
self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT,
self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT,
self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT,
self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]
)
)
return p_id
函數(shù)中重新初始化了self._pg_util = PgUtil(),否則會(huì)報(bào)ssl error 和ssl decryption 的錯(cuò)誤,背后原因有待研究!
**2)多進(jìn)程取數(shù)據(jù)——(思考取數(shù)據(jù)為何要多進(jìn)程)**
def flush_process(self, lock): #需要傳入lock;
"""
運(yùn)行待處理的方法隊(duì)列
:type lock Lock
:return 返回一個(gè)dict
"""
# process_pool = Pool(processes=20)
# data_list = process_pool.map(one_process, self.__process_data_list)
#
# for (key, value) in data_list:
#
# 覆蓋上期變量
self.__dct_share = self.__manager.Value('tmp', {}) # 進(jìn)程共享變量
p_list = [] # 進(jìn)程列表
i = 1
p_size = len(self.__process_data_list)
for process_data in self.__process_data_list: **#循環(huán)遍歷需要同時(shí)查找的公司列表!??!self.__process_data_list包含多個(gè)process_data,每個(gè)process_data包含三種屬性?類對(duì)象也可以循環(huán)????**
# 創(chuàng)建進(jìn)程
p = Process(target=self.__one_process, args=(process_data, lock)) #參數(shù)需要lock
# p.daemon = True
p_list.append(p)
# 間歇執(zhí)行進(jìn)程
if i % 20 == 0 or i == p_size: # 20頁處理一次, 最后一頁處理剩余
for p in p_list:
p.start()
for p in p_list:
p.join() # 等待進(jìn)程結(jié)束
p_list = [] # 清空進(jìn)程列表
i += 1
# end for
self.__process_data_list = [] # 清空訂閱
return self.__dct_share.value
def __one_process(self, process_data, lock): #迭代函數(shù)
"""
處理進(jìn)程
:param process_data: 方法和參數(shù)集等
:param lock: 保護(hù)鎖
"""
fcn = process_data.fcn
params = process_data.params
data_key = process_data.data_key
if isinstance(params, tuple):
data = fcn(*params) #**注意:*params 與 params區(qū)別**
else:
data = fcn(params)
with lock:
temp_dct = dict(self.__dct_share.value)
if data_key not in temp_dct:
temp_dct[data_key] = []
temp_dct[data_key].append(data)
self.__dct_share.value = temp_dct
主程序調(diào)用:
def exe_process(self, company_name, open_from, time_nodes):
"""
多進(jìn)程執(zhí)行pre訂閱的數(shù)據(jù)
:param company_name: 公司名
:return:
"""
mul_process_helper = MulProcessHelper()
lock = Lock()
self.__get_time_bidding_statistic(company_name, mul_process_helper)
data = mul_process_helper.flush_process(lock)
return data
def __get_time_bidding_statistic(self, company_name, mul_process_helper):
# 招投標(biāo)信息
process_data = ProcessData(f_e_t_svc.get_bidding_statistic_time_node_api, company_name,
self.__BIDDING_STATISTIC_TIME) **#此處怎么理解?ProcessData是一個(gè)類?。?!**
mul_process_helper.add_process_data_list(process_data) #同時(shí)調(diào)用多個(gè)api???將api方法當(dāng)做迭代????用于同時(shí)查找多個(gè)公司????
def add_process_data_list(self, process_data):
"""
添加用于進(jìn)程處理的方法隊(duì)列
:type process_data ProcessData
:param process_data:
:return:
"""
self.__process_data_list.append(process_data)
class ProcessData(object):
"""
用于進(jìn)程處理的的數(shù)據(jù)
"""
def __init__(self, fcn, params, data_key):
self.fcn = fcn # 方法
self.params = params # 參數(shù)
self.data_key = data_key # 存儲(chǔ)到進(jìn)程共享變量中的名字
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。
您可能感興趣的文章:- Python基礎(chǔ)之進(jìn)程詳解
- python 多進(jìn)程和多線程使用詳解
- python 實(shí)現(xiàn)多進(jìn)程日志輪轉(zhuǎn)ConcurrentLogHandler
- Python多進(jìn)程與多線程的使用場景詳解
- 解決Python 進(jìn)程池Pool中一些坑
- python多進(jìn)程執(zhí)行方法apply_async使用說明
- python 進(jìn)程池pool使用詳解
- Python使用多進(jìn)程運(yùn)行含有任意個(gè)參數(shù)的函數(shù)
- 詳解python網(wǎng)絡(luò)進(jìn)程