進(jìn)程
什么是進(jìn)程
進(jìn)程指的是一個(gè)程序的運(yùn)行過程,或者說一個(gè)正在執(zhí)行的程序
所以說進(jìn)程一種虛擬的概念,該虛擬概念起源操作系統(tǒng)
一個(gè)CPU 同一時(shí)刻只能執(zhí)行一件事
開啟一個(gè)進(jìn)程
from multiprocessing import Process import time def task(name): print('%s is running'%name) time.sleep(3) print('%s is done'%name) # 開啟子進(jìn)程的操作必須放到 # if __name__ == '__main__'的子代碼中 # 子進(jìn)程不會(huì)再次加載 if __name__ == '__main__': p=Process(target=task,args=('小王',)) # p=Process(target=task,kwargs={'name':'小王'}) # print(p) p.start() # 主進(jìn)程只是向操作系統(tǒng)發(fā)送了一個(gè)開啟子進(jìn)程的信號(hào) # p.start() # 1.操作系統(tǒng)先申請內(nèi)存空間 # 2.把主進(jìn)程的數(shù)據(jù)拷貝到子進(jìn)程里面 # 3.調(diào)用cup才能運(yùn)行里面的代碼 # 創(chuàng)造進(jìn)程的開銷大 print('主')
JOIN方法
當(dāng)前進(jìn)程jion別的進(jìn)程。當(dāng)前進(jìn)程就會(huì)等到別的進(jìn)程執(zhí)行完畢了才會(huì)繼續(xù)開始往下執(zhí)行
from multiprocessing import Process import time def task(name, n): print('%s is running' % name) time.sleep(n) print('%s is done' % name) if __name__ == '__main__': start = time.time() p_l = [] for i in range(1, 4): p = Process(target=task, args=('小王%s' % i, i)) p_l.append(p) p.start() # 主進(jìn)程等待子進(jìn)程 for p in p_l: p.join() print('主', (time.time() - start))
進(jìn)程之間空間隔離
from multiprocessing import Process # 這個(gè)n是主進(jìn)程里面的值 n = 100 def task(): global n # 改的是子進(jìn)程里面的全局變量 # 主進(jìn)程里面沒有改 n = 0 if __name__ == '__main__': p=Process(target=task) p.start() p.join() print(n)
進(jìn)程的常用方法
current_process 查看pid(進(jìn)程id)
# 1. 進(jìn)程pid:每一個(gè)進(jìn)程在操作系統(tǒng)內(nèi)都有一個(gè)唯一的id號(hào),稱之為pid from multiprocessing import Process, current_process import time def task(): print('%s is running' % current_process().pid) time.sleep(3) print('%s is done' % current_process().pid) # 開啟子進(jìn)程的操作必須放到 # if __name__ == '__main__'的子代碼中 # 子進(jìn)程不會(huì)再次加載 if __name__ == '__main__': p = Process(target=task) p.start() print('主', current_process().pid)
os.getpid() 查看進(jìn)程id
# os模塊也可以 from multiprocessing import Process, current_process import time, os def task(): print('%s is running 爹是%s' % (os.getpid(), os.getppid())) time.sleep(3) print('%s is done爹是%s' % (os.getpid(), os.getppid())) # 開啟子進(jìn)程的操作必須放到 # if __name__ == '__main__'的子代碼中 # 子進(jìn)程不會(huì)再次加載 if __name__ == '__main__': p = Process(target=task) p.start() # 誰把主進(jìn)程創(chuàng)造出來的 # 用pycharm就是pycharm創(chuàng)造的 print('主%s爹是%s' % (os.getpid(), os.getppid()))
進(jìn)程其他方法和屬性
from multiprocessing import Process,current_process import time,os def task(): print('%s is running 爹是%s'%(os.getpid(),os.getppid())) time.sleep(30) print('%s is done爹是%s'%(os.getpid(),os.getppid())) # 開啟子進(jìn)程的操作必須放到 # if __name__ == '__main__'的子代碼中 # 子進(jìn)程不會(huì)再次加載 if __name__ == '__main__': p=Process(target=task) p.start() # 誰把主進(jìn)程創(chuàng)造出來的 # 用pycharm就是pycharm創(chuàng)造的 # 進(jìn)程的名字 print(p.name) # 殺死子進(jìn)程 p.terminate() # 需要時(shí)間 time.sleep(0.1) # 判斷子進(jìn)程是否存活 print(p.is_alive()) print('主%s爹是%s'%(os.getpid(),os.getppid()))
守護(hù)進(jìn)程
本質(zhì)就是一個(gè)"子進(jìn)程",該"子進(jìn)程"的生命周期<=被守護(hù)進(jìn)程的生命周期
當(dāng)被守護(hù)的進(jìn)程執(zhí)行完了。它也會(huì)被殺死
# 主進(jìn)程運(yùn)行完了,子進(jìn)程沒有存在的意義 # 皇帝和太監(jiān)不是同生,但是是同死 from multiprocessing import Process import time def task(name): print('%s活著'%name) time.sleep(3) print('%s正常死亡'%name) if __name__ == '__main__': p1=Process(target=task,args=('老太監(jiān)',)) # 聲明子進(jìn)程為守護(hù)進(jìn)程 p1.daemon = True p1.start() time.sleep(1) print('皇帝正在死亡')
互斥鎖
進(jìn)程之間內(nèi)存空間互相隔離,怎樣實(shí)現(xiàn)共享數(shù)據(jù)
進(jìn)程之間內(nèi)存數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問同一個(gè)文件,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結(jié)果就是錯(cuò)亂,如何控制,就是加鎖處理
''' 搶票 查票 購票 互斥鎖: 在程序中進(jìn)行加鎖處理 必須要釋放鎖下一個(gè)鎖才能獲取,所以程序在合適的時(shí)候必須要有釋放鎖 所以用文件來處理共享數(shù)據(jù) 1.速度慢 2.必須有互斥鎖 ''' import json import time,random from multiprocessing import Process,Lock # 查票 def search(name): with open('db.json','rt',encoding='utf-8')as f: dic = json.load(f) # 模擬查票時(shí)間 time.sleep(1) print('%s 查看到余票為 %s'%(name,dic['count'])) # 購票 # 第二個(gè)get子進(jìn)程不會(huì)是第一個(gè)get子進(jìn)程修改后count的結(jié)果 # 加互斥鎖,把這一部分并發(fā)變成串行, # 但是犧牲了效率,保證了數(shù)據(jù)安全 def get(name): with open('db.json','rt',encoding='utf-8')as f: dic = json.load(f) if dic['count']>0: dic['count']-=1 time.sleep(random.randint(1,3)) with open('db.json', 'wt', encoding='utf-8')as f: json.dump(dic,f) print('%s 購票成功'%name) else: print('%s 查看到?jīng)]有票了'%name) def task(name,mutex): # 并發(fā) search(name) # 串行 # 加互斥鎖 mutex.acquire() get(name) # 釋放互斥鎖 mutex.release() # if __name__ == '__main__': # for i in range(10): # p=Process(target=task,args=('路人%s'%i,)) # p.start() # # join只能將進(jìn)程的任務(wù)整體變成串行 # # 互斥鎖可以局部串行 # p.join() # # 數(shù)據(jù)安全,是指讀的時(shí)候無所謂,寫的(改的)時(shí)候必須安全 # # 寫的時(shí)候是串行,讀的時(shí)候并發(fā) # 加鎖 if __name__ == '__main__': # 主進(jìn)程加鎖 mutex=Lock() for i in range(10): # 鎖傳入子進(jìn)程 p=Process(target=task,args=('路人%s'%i,mutex)) p.start() # join只能將進(jìn)程的任務(wù)整體變成串行 # 互斥鎖可以局部串行 # p.join() # 數(shù)據(jù)安全,是指讀的時(shí)候無所謂,寫的(改的)時(shí)候必須安全 # 寫的時(shí)候是串行,讀的時(shí)候并發(fā)
db.json 中只有10張票。如果沒有加鎖。則可能會(huì)出現(xiàn)票唄多賣的情況
進(jìn)程間通信(IPC機(jī)制)
''' 速度快 鎖問題解決 ipc機(jī)制 進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC), multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的 共享內(nèi)存空間 隊(duì)列=管道+鎖 ''' from multiprocessing import Queue # 占用的內(nèi)存,最好小數(shù)據(jù),消息數(shù)據(jù),下載地址 # Queue(限制隊(duì)列里面的個(gè)數(shù)) # 先進(jìn)先出 q=Queue(3) # 添加 q.put('a') q.put('b') q.put({'x':2}) print('籃子滿了') # 隊(duì)列滿了,相當(dāng)于鎖了 # q.put({'x':2}) # 提取 print(q.get()) print(q.get()) print(q.get()) # # 隊(duì)列為空,等待加入,也會(huì)阻塞,相當(dāng)于鎖了 print('隊(duì)列為空') print(q.get())
隊(duì)列被取完了 后面的q.get() 會(huì)阻塞直到有新的元素。所以程序不會(huì)結(jié)束
JoinableQueue 來實(shí)現(xiàn)生產(chǎn)消費(fèi)者
JoinableQueue#task_done()方法當(dāng)隊(duì)列里面沒有元素會(huì)結(jié)束線程
''' 小王和小周每人生產(chǎn)10分包子和土豆絲 小戴和小楊一直吃,當(dāng)隊(duì)列里面沒有食物時(shí)。終結(jié)進(jìn)程 ''' import time, random from multiprocessing import Process, JoinableQueue def producer(name, food, q): for i in range(10): res = '%s%s' % (food, i) # 模擬生產(chǎn)數(shù)據(jù)的時(shí)間 time.sleep(random.randint(1, 3)) q.put(res) print('廚師%s生成了%s' % (name, res)) def consumer(name, q): while True: # 訂單都沒了還在等,隊(duì)列里面空了 res = q.get() # 模擬處理數(shù)據(jù)的時(shí)間 time.sleep(random.randint(1, 3)) print('吃貨%s吃了%s' % (name, res)) # 1每次完成隊(duì)列取一次,往q.join() ,取干凈了q.join()運(yùn)行完 q.task_done() # 多個(gè)生產(chǎn)者和消費(fèi)者 if __name__ == '__main__': q = JoinableQueue() # 生產(chǎn)者 p1 = Process(target=producer, args=('小王', '包子', q)) p3 = Process(target=producer, args=('小周', '土豆絲', q)) # 消費(fèi)者 c1 = Process(target=consumer, args=('小戴', q)) c2 = Process(target=consumer, args=('小楊', q)) # #3.守護(hù)進(jìn)程的作用: 主進(jìn)程死了,消費(fèi)者子進(jìn)程也跟著死 # #把消費(fèi)者變成守護(hù)進(jìn)程 c1.daemon = True c2.daemon = True p1.start() p3.start() c1.start() c2.start() p1.join() p3.join() # 2消費(fèi)者task_done給q.join()發(fā)信號(hào) q.join() print('主') # 生產(chǎn)者運(yùn)行完?1,2 # 消費(fèi)者運(yùn)行完?1,2
當(dāng)隊(duì)列為空時(shí),不會(huì)傻傻等待而是結(jié)束進(jìn)程
總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注服務(wù)器之家的更多內(nèi)容!
原文鏈接:https://blog.csdn.net/weixin_39313241/article/details/120607154