python的multiprocessing包是標(biāo)準(zhǔn)庫提供的多進(jìn)程并行計算包,提供了和threading(多線程)相似的API函數(shù),但是相比于threading,將任務(wù)分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。
下面我們對multiprocessing中的Pool和Process類做介紹。
Pool
采用Pool進(jìn)程池對任務(wù)并行處理更加方便,我們可以指定并行的CPU個數(shù),然后 Pool 會自動把任務(wù)放到進(jìn)程池中運(yùn)行。 Pool 包含了多個并行函數(shù)。
apply apply_async
apply 要逐個執(zhí)行任務(wù),在python3中已經(jīng)被棄用,而apply_async是apply的異步執(zhí)行版本。并行計算一定要采用apply_async函數(shù)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import multiprocessing import time from random import randint, seed def f(num): seed() rand_num = randint( 0 , 10 ) # 每次都隨機(jī)生成一個停頓時間 time.sleep(rand_num) return (num, rand_num) start_time = time.time() cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes = cores) pool_list = [] result_list = [] start_time = time.time() for xx in xrange ( 10 ): pool_list.append(pool.apply_async(f, (xx, ))) # 這里不能 get, 會阻塞進(jìn)程 result_list = [xx.get() for xx in pool_list] #在這里不免有人要疑問,為什么不直接在 for 循環(huán)中直接 result.get()呢?這是因為pool.apply_async之后的語句都是阻塞執(zhí)行的,調(diào)用 result.get() 會等待上一個任務(wù)執(zhí)行完之后才會分配下一個任務(wù)。事實上,獲取返回值的過程最好放在進(jìn)程池回收之后進(jìn)行,避免阻塞后面的語句。 # 最后我們使用一下語句回收進(jìn)程池: pool.close() pool.join() print result_list print '并行花費時間 %.2f' % (time.time() - start_time) print '串行花費時間 %.2f' % ( sum ([xx[ 1 ] for xx in result_list])) #[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)] #并行花費時間 14.11 #串行花費時間 45.00 |
map map_async
map_async 是 map的異步執(zhí)行函數(shù)。
相比于 apply_async, map_async 只能接受一個參數(shù)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import time from multiprocessing import Pool def run(fn): #fn: 函數(shù)參數(shù)是數(shù)據(jù)列表的一個元素 time.sleep( 1 ) return fn * fn if __name__ = = "__main__" : testFL = [ 1 , 2 , 3 , 4 , 5 , 6 ] print '串行:' #順序執(zhí)行(也就是串行執(zhí)行,單進(jìn)程) s = time.time() for fn in testFL: run(fn) e1 = time.time() print "順序執(zhí)行時間:" , int (e1 - s) print '并行:' #創(chuàng)建多個進(jìn)程,并行執(zhí)行 pool = Pool( 4 ) #創(chuàng)建擁有5個進(jìn)程數(shù)量的進(jìn)程池 #testFL:要處理的數(shù)據(jù)列表,run:處理testFL列表中數(shù)據(jù)的函數(shù) rl = pool. map (run, testFL) pool.close() #關(guān)閉進(jìn)程池,不再接受新的進(jìn)程 pool.join() #主進(jìn)程阻塞等待子進(jìn)程的退出 e2 = time.time() print "并行執(zhí)行時間:" , int (e2 - e1) print rl # 串行: # 順序執(zhí)行時間: 6 # 并行: # 并行執(zhí)行時間: 2 # [1, 4, 9, 16, 25, 36] |
Process
采用Process必須注意的是,Process對象來創(chuàng)建進(jìn)程,每一個進(jìn)程占據(jù)一個CPU,所以要建立的進(jìn)程必須 小于等于 CPU的個數(shù)。
如果啟動進(jìn)程數(shù)過多,特別是當(dāng)遇到CPU密集型任務(wù),會降低并行的效率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
#16.6.1.1. The Process class from multiprocessing import Process, cpu_count import os import time start_time = time.time() def info(title): # print(title) if hasattr (os, 'getppid' ): # only available on Unix print 'parent process:' , os.getppid() print 'process id:' , os.getpid() time.sleep( 3 ) def f(name): info( 'function f' ) print 'hello' , name if __name__ = = '__main__' : # info('main line') p_list = [] # 保存Process新建的進(jìn)程 cpu_num = cpu_count() for xx in xrange (cpu_num): p_list.append(Process(target = f, args = ( 'xx_%s' % xx,))) for xx in p_list: xx.start() for xx in p_list: xx.join() print ( 'spend time: %.2f' % (time.time() - start_time)) parent process: 11741 # parent process: 11741 # parent process: 11741 # process id: 12249 # process id: 12250 # parent process: 11741 # process id: 12251 # process id: 12252 # hello xx_1 # hello xx_0 # hello xx_2 # hello xx_3 # spend time: 3.04 |
進(jìn)程間通信
Process和Pool均支持Queues 和 Pipes 兩種類型的通信。
Queue 隊列
隊列遵循先進(jìn)先出的原則,可以在各個進(jìn)程間使用。
1
2
3
4
5
6
7
8
9
10
11
|
# 16.6.1.2. Exchanging objects between processes # Queues from multiprocessing import Process, Queue def f(q): q.put([ 42 , None , 'hello' ]) if __name__ = = '__main__' : q = Queue() p = Process(target = f, args = (q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join() |
pipe
1
2
3
4
5
6
7
8
9
10
|
from multiprocessing import Process, Pipe def f(conn): conn.send([ 42 , None , 'hello' ]) conn.close() if __name__ = = '__main__' : parent_conn, child_conn = Pipe() p = Process(target = f, args = (child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join() |
queue 與 pipe比較
Pipe() can only have two endpoints.
Queue() can have multiple producers and consumers.
When to use them
If you need more than two points to communicate, use a Queue().
If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().
參考:
https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
共享資源
多進(jìn)程應(yīng)該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數(shù)。
在多進(jìn)程情況下,由于每個進(jìn)程有自己獨立的內(nèi)存空間,以上方法并不合適。
此時我們可以通過共享內(nèi)存和Manager的方法來共享資源。
但這樣做提高了程序的復(fù)雜度,并因為同步的需要而降低了程序的效率。
共享內(nèi)存
共享內(nèi)存僅適用于 Process 類,不能用于進(jìn)程池 Pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# 16.6.1.4. Sharing state between processes # Shared memory from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range ( len (a)): a[i] = - a[i] if __name__ = = '__main__' : num = Value( 'd' , 0.0 ) arr = Array( 'i' , range ( 10 )) p = Process(target = f, args = (num, arr)) p.start() p.join() print num.value print arr[:] # 3.1415927 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] |
Manager Class
Manager Class 既可以用于Process 也可以用于進(jìn)程池 Pool。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from multiprocessing import Manager, Process def f(d, l, ii): d[ii] = ii l.append(ii) if __name__ = = '__main__' : manager = Manager() d = manager. dict () l = manager. list ( range ( 10 )) p_list = [] for xx in range ( 4 ): p_list.append(Process(target = f, args = (d, l, xx))) for xx in p_list: xx.start() for xx in p_list: xx.join() print d print l # {0: 0, 1: 1, 2: 2, 3: 3} # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3] |
補(bǔ)充:python程序多進(jìn)程運(yùn)行時間計算/多進(jìn)程寫數(shù)據(jù)/多進(jìn)程讀數(shù)據(jù)
1
2
3
4
|
import time time_start = time.time() time_end = time.time() print ( 'time cost' ,time_end - time_start, 's' ) |
單位為秒,也可以換算成其他單位輸出
注意寫測試的時候,函數(shù)名要以test開頭,否則運(yùn)行不了。
多線程中的問題:
1)多線程存數(shù)據(jù):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def test_save_features_to_db( self ): df1 = pd.read_csv( '/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv' ) com_list = df1[ 'company_name' ].values.tolist() # com_list = com_list[400015:400019] # print 'test_save_features_to_db' # print(com_list) p_list = [] # 進(jìn)程列表 i = 1 p_size = len (com_list) for company_name in com_list: # 創(chuàng)建進(jìn)程 p = Process(target = self .__save_data_iter_method, args = [company_name]) # p.daemon = True p_list.append(p) # 間歇執(zhí)行進(jìn)程 if i % 20 = = 0 or i = = p_size: # 20頁處理一次, 最后一頁處理剩余 for p in p_list: p.start() for p in p_list: p.join() # 等待進(jìn)程結(jié)束 p_list = [] # 清空進(jìn)程列表 i + = 1 |
總結(jié):多進(jìn)程寫入的時候,不需要lock,也不需要返回值。
核心p = Process(target=self.__save_data_iter_method, args=[company_name]),其中target指向多進(jìn)程的一次完整的迭代,arg則是該迭代的輸入。
注意寫法args=[company_name]才對,原來寫成:args=company_name,args=(company_name)會報如下錯:只需要1個參數(shù),而給出了34個參數(shù)。
多進(jìn)程外層循環(huán)則是由輸入決定的,有多少個輸入就為多少次循環(huán),理解p.start和p.join;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
def __save_data_iter_method( self , com): # time_start = time.time() # print(com) f_d_t = ShiXinFeaturesDealSvc() res = f_d_t.get_time_features(company_name = com) # 是否失信 shixin_label = res.shixin_label key1 = res.shixin_time if key1: public_at = res.shixin_time company_name = res.time_map_features[key1].company_name # print(company_name) established_years = res.time_map_features[key1].established_years industry_dx_rate = res.time_map_features[key1].industry_dx_rate regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt share_change_cnt = res.time_map_features[key1].share_change_cnt industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt address_change_cnt = res.time_map_features[key1].address_change_cnt fr_change_cnt = res.time_map_features[key1].fr_change_cnt judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt bidding_cnt = res.time_map_features[key1].bidding_cnt trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt cancel_cnt = res.time_map_features[key1].cancel_cnt industry_all_cnt = res.time_map_features[key1].industry_all_cnt network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt f_d_do = ShixinFeaturesDto(company_name = company_name, established_years = established_years, industry_dx_rate = industry_dx_rate, regcap_change_cnt = regcap_change_cnt, share_change_cnt = share_change_cnt, industry_all_cnt = industry_all_cnt, industry_dx_cnt = industry_dx_cnt, address_change_cnt = address_change_cnt, fr_change_cnt = fr_change_cnt, judgedoc_cnt = judgedoc_cnt, bidding_cnt = bidding_cnt, trade_mark_cnt = trade_mark_cnt, network_share_cancel_cnt = network_share_cancel_cnt, cancel_cnt = cancel_cnt, network_share_zhixing_cnt = network_share_zhixing_cnt, network_share_judge_doc_cnt = network_share_judge_doc_cnt, net_judgedoc_defendant_cnt = net_judgedoc_defendant_cnt, judge_doc_cnt = judge_doc_cnt, public_at = public_at, shixin_label = shixin_label) # time_end = time.time() # print('totally cost', time_end - time_start) self .cfdbsvc.save_or_update_features(f_d_do) def save_or_update_features( self , shixin_features_dto): """ 添加或更新: 插入一行數(shù)據(jù), 如果不存在則插入,存在則更新 """ self ._pg_util = PgUtil() p_id = None if isinstance (shixin_features_dto, ShixinFeaturesDto): p_id = str (uuid.uuid1()) self ._pg_util.execute_sql( self .s_b.insert_or_update_row( self .model.COMPANY_NAME, { self .model. ID : p_id, # 公司名 self .model.COMPANY_NAME: shixin_features_dto.company_name, # 失信時間 self .model.PUBLIC_AT: shixin_features_dto.public_at, self .model.SHIXIN_LABEL : shixin_features_dto.shixin_label, self .model.ESTABLISHED_YEARS: shixin_features_dto.established_years, self .model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate, self .model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt, self .model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt, self .model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt, self .model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt, self .model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt, self .model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt, self .model.CANCEL_CNT: shixin_features_dto.cancel_cnt, self .model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt, self .model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt, self .model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt, self .model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt, self .model.BIDDING_CNT: shixin_features_dto.bidding_cnt, self .model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt, self .model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt }, [ self .model.ADDRESS_CHANGE_CNT, self .model.BIDDING_CNT, self .model.CANCEL_CNT, self .model.ESTABLISHED_YEARS, self .model.FR_CHANGE_CNT, self .model.INDUSTRY_ALL_CNT, self .model.INDUSTRY_DX_RATE, self .model.INDUSTRY_DX_CNT, self .model.JUDGE_DOC_CNT, self .model.JUDGEDOC_CNT, self .model.NETWORK_SHARE_CANCEL_CNT, self .model.NETWORK_SHARE_JUDGE_DOC_CNT, self .model.NETWORK_SHARE_ZHIXING_CNT, self .model.REGCAP_CHANGE_CNT, self .model.TRADE_MARK_CNT, self .model.SHARE_CHANGE_CNT, self .model.SHIXIN_LABEL, self .model.PUBLIC_AT] ) ) return p_id |
函數(shù)中重新初始化了self._pg_util = PgUtil(),否則會報ssl error 和ssl decryption 的錯誤,背后原因有待研究!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
* * 2 )多進(jìn)程取數(shù)據(jù)——(思考取數(shù)據(jù)為何要多進(jìn)程) * * def flush_process( self , lock): #需要傳入lock; """ 運(yùn)行待處理的方法隊列 :type lock Lock :return 返回一個dict """ # process_pool = Pool(processes=20) # data_list = process_pool.map(one_process, self.__process_data_list) # # for (key, value) in data_list: # # 覆蓋上期變量 self .__dct_share = self .__manager.Value( 'tmp' , {}) # 進(jìn)程共享變量 p_list = [] # 進(jìn)程列表 i = 1 p_size = len ( self .__process_data_list) for process_data in self .__process_data_list: * * #循環(huán)遍歷需要同時查找的公司列表!!!self.__process_data_list包含多個process_data,每個process_data包含三種屬性?類對象也可以循環(huán)????** # 創(chuàng)建進(jìn)程 p = Process(target = self .__one_process, args = (process_data, lock)) #參數(shù)需要lock # p.daemon = True p_list.append(p) # 間歇執(zhí)行進(jìn)程 if i % 20 = = 0 or i = = p_size: # 20頁處理一次, 最后一頁處理剩余 for p in p_list: p.start() for p in p_list: p.join() # 等待進(jìn)程結(jié)束 p_list = [] # 清空進(jìn)程列表 i + = 1 # end for self .__process_data_list = [] # 清空訂閱 return self .__dct_share.value def __one_process( self , process_data, lock): #迭代函數(shù) """ 處理進(jìn)程 :param process_data: 方法和參數(shù)集等 :param lock: 保護(hù)鎖 """ fcn = process_data.fcn params = process_data.params data_key = process_data.data_key if isinstance (params, tuple ): data = fcn( * params) #**注意:*params 與 params區(qū)別** else : data = fcn(params) with lock: temp_dct = dict ( self .__dct_share.value) if data_key not in temp_dct: temp_dct[data_key] = [] temp_dct[data_key].append(data) self .__dct_share.value = temp_dct |
主程序調(diào)用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
def exe_process( self , company_name, open_from, time_nodes): """ 多進(jìn)程執(zhí)行pre訂閱的數(shù)據(jù) :param company_name: 公司名 :return: """ mul_process_helper = MulProcessHelper() lock = Lock() self .__get_time_bidding_statistic(company_name, mul_process_helper) data = mul_process_helper.flush_process(lock) return data def __get_time_bidding_statistic( self , company_name, mul_process_helper): # 招投標(biāo)信息 process_data = ProcessData(f_e_t_svc.get_bidding_statistic_time_node_api, company_name, self .__BIDDING_STATISTIC_TIME) * * #此處怎么理解?ProcessData是一個類!!!** mul_process_helper.add_process_data_list(process_data) #同時調(diào)用多個api???將api方法當(dāng)做迭代????用于同時查找多個公司???? def add_process_data_list( self , process_data): """ 添加用于進(jìn)程處理的方法隊列 :type process_data ProcessData :param process_data: :return: """ self .__process_data_list.append(process_data) class ProcessData( object ): """ 用于進(jìn)程處理的的數(shù)據(jù) """ def __init__( self , fcn, params, data_key): self .fcn = fcn # 方法 self .params = params # 參數(shù) self .data_key = data_key # 存儲到進(jìn)程共享變量中的名字 |
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持服務(wù)器之家。如有錯誤或未考慮完全的地方,望不吝賜教。
原文鏈接:https://lipidong.blog.csdn.net/article/details/77090119