一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語(yǔ)言編程技術(shù)及教程分享平臺(tái)!
分類導(dǎo)航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務(wù)器之家 - 腳本之家 - Python - Python多進(jìn)程之進(jìn)程同步及通信詳解

Python多進(jìn)程之進(jìn)程同步及通信詳解

2022-03-08 00:10程序員-夏天 Python

這篇文章主要為大家介紹了Python多進(jìn)程之進(jìn)程同步及通信,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助

上篇文章介紹了什么是進(jìn)程、進(jìn)程與程序的關(guān)系、進(jìn)程的創(chuàng)建與使用、創(chuàng)建進(jìn)程池等,接下來就來介紹一下進(jìn)程同步及進(jìn)程通信

進(jìn)程同步

當(dāng)多個(gè)進(jìn)程使用同一份數(shù)據(jù)資源的時(shí)候,因?yàn)檫M(jìn)程的運(yùn)行沒有順序,運(yùn)行起來也無法控制,如果不加以干預(yù),往往會(huì)引發(fā)數(shù)據(jù)安全或順序混亂的問題,所以要在多個(gè)進(jìn)程讀寫共享數(shù)據(jù)資源的時(shí)候加以適當(dāng)?shù)牟呗裕瑏肀WC數(shù)據(jù)的一致性問題。

Lock(鎖)

一個(gè)Lock對(duì)象有兩個(gè)方法:acquire()和release()來控制共享數(shù)據(jù)的讀寫權(quán)限, 看下面這張圖片,使用多進(jìn)程的時(shí)候會(huì)經(jīng)常出現(xiàn)這種情況,這是因?yàn)槎鄠€(gè)進(jìn)程都在搶占輸出資源,共享同一打印終端,從而造成了輸出信息的錯(cuò)亂。

Python多進(jìn)程之進(jìn)程同步及通信詳解

那么就可以使用Lock機(jī)制:

import multiprocessing
import random
import time
def work(lock, i):
  lock.acquire()
  print("work'{}'執(zhí)行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
  time.sleep(random.randint(0, 2))
  print("work'{}'執(zhí)行完畢......".format(i))
  lock.release()
if __name__ == '__main__':
  lock = multiprocessing.Lock()
  for i in range(5):
      p = multiprocessing.Process(target=work, args=(lock, i))
      p.start()

由于引入了Lock機(jī)制,同一時(shí)間只能有一個(gè)進(jìn)程搶占到輸出資源,其他進(jìn)程等待該進(jìn)程結(jié)束,鎖釋放到,才可以搶占,這樣會(huì)解決多進(jìn)程間資源競(jìng)爭(zhēng)導(dǎo)致數(shù)據(jù)錯(cuò)亂的問題,但是由并發(fā)執(zhí)行變成了串行執(zhí)行,會(huì)犧牲運(yùn)行效率。

 

進(jìn)程通信

上篇文章說過,進(jìn)程之間互相隔離,數(shù)據(jù)是獨(dú)立的,默認(rèn)情況下互不影響,那要如何實(shí)現(xiàn)進(jìn)程間通信呢?Python提供了多種進(jìn)程通信的方式,下面就來說一下。

Queue(隊(duì)列)

multiprocessing模塊提供的Queue多進(jìn)程安全的消息隊(duì)列,可以實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。

說明

  • 初始化Queue()對(duì)象時(shí)(例如:q=Queue()),若括號(hào)中沒有指定最?可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接受的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)。
  • Queue.qsize():返回當(dāng)前隊(duì)列包含的消息數(shù)量。
  • Queue.empty():如果隊(duì)列為空,返回True,反之False。
  • Queue.full():如果隊(duì)列滿了,返回True,反之False。
  • Queue.get(block, timeout):獲取隊(duì)列中的?條消息,然后將其從列隊(duì)中移除,block默認(rèn)值為True。如果block使?默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息列隊(duì)如果為空,此時(shí)程序?qū)⒈蛔枞ㄍT谧x取狀態(tài)),直到從消息列隊(duì)讀到消息為?,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒讀取到任何消息,則拋出Queue.Empty異常;如果block值為False,消息列隊(duì)如果為空,則會(huì)?刻拋出Queue.Empty異常。
  • Queue.get_nowait():相當(dāng)Queue.get(False)。
  • Queue.put(item, block, timeout):將item消息寫?隊(duì)列,block默認(rèn)值為True,如果block使?默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息列隊(duì)如果已經(jīng)沒有空間可寫?,此時(shí)程序?qū)⒈蛔枞ㄍT趯?狀態(tài)),直到消息列隊(duì)騰出空間為?,如果設(shè)置了timeout,則會(huì)等待timeout秒,若還沒空間,則拋出Queue.Full異常;如果block值為False,消息列隊(duì)如果沒有空間可寫?,則會(huì)?刻拋出Queue.Full異常。
  • Queue.put_nowait(item):相當(dāng)于Queue.put(item, False)。
from multiprocessing import Process, Queue
import time
def write_task(queue):
  """
  向隊(duì)列中寫入數(shù)據(jù)
  :param queue: 隊(duì)列
  :return:
  """
  for i in range(5):
      if queue.full():
          print("隊(duì)列已滿!")
      message = "消息{}".format(str(i))
      queue.put(message)
      print("消息{}寫入隊(duì)列".format(str(i)))
def read_task(queue):
  """
  從隊(duì)列讀取數(shù)據(jù)
  :param queue: 隊(duì)列
  :return:
  """
  while True:
      print("從隊(duì)列讀取:{}".format(queue.get(True)))
if __name__ == '__main__':
  print("主進(jìn)程執(zhí)行......")
  # 主進(jìn)程創(chuàng)建Queue,最大消息數(shù)量為3
  queue = Queue(3)
  pw = Process(target=write_task, args=(queue, ))
  pr = Process(target=read_task, args=(queue, ))
  pw.start()
  pr.start()

運(yùn)行結(jié)果為:

Python多進(jìn)程之進(jìn)程同步及通信詳解

從結(jié)果我們可以看出,隊(duì)列最大可以放入3條消息,后面再來消息,要等read_task從隊(duì)列里取出后才行。

Pipe(管道)

Pipe常用于兩個(gè)進(jìn)程,兩個(gè)進(jìn)程分別位于管道的兩端,Pipe(duplex)方法返回(conn1,conn2)代表一個(gè)管道的兩端,duplex參數(shù)默認(rèn)為True,即全雙工模式,若為False,conn1只負(fù)責(zé)接收信息,conn2負(fù)責(zé)發(fā)送。

send()和recv()方法分別是發(fā)送和接受消息的方法。

import multiprocessing
import time
import random
def proc_send(pipe):
  """
  發(fā)送消息
  :param pipe:管道一端
  :return:
  """
  for i in range(10):
      print("process send:{}".format(str(i)))
      pipe.send(i)
      time.sleep(random.random())
def proc_recv(pipe):
  """
  接收消息
  :param pipe:管道一端
  :return:
  """
  while True:
      print("Process recv:{}".format(pipe.recv()))
      time.sleep(random.random())
if __name__ == '__main__':
  # 主進(jìn)程創(chuàng)建pipe
  pipe = multiprocessing.Pipe()
  p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], ))
  p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], ))
  p1.start()
  p2.start()
  p1.join()
  p2.terminate()

執(zhí)行結(jié)果為:

Python多進(jìn)程之進(jìn)程同步及通信詳解

Semaphore(信號(hào)量)

Semaphore用來控制對(duì)共享資源的訪問數(shù)量,和進(jìn)程池的最大連接數(shù)類似。

import multiprocessing
import random
import time
def work(s, i):
  s.acquire()
  print("work'{}'執(zhí)行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
  time.sleep(i*2)
  print("work'{}'執(zhí)行完畢......".format(i))
  s.release()
if __name__ == '__main__':
  s = multiprocessing.Semaphore(2)
  for i in range(1, 7):
      p = multiprocessing.Process(target=work, args=(s, i))
      p.start()

上面的代碼中使用Semaphore限制了最多有2個(gè)進(jìn)程同時(shí)執(zhí)行,那么來一個(gè)進(jìn)程獲得一把鎖,計(jì)數(shù)加1,當(dāng)計(jì)數(shù)等于2時(shí),后面再來的進(jìn)程均需要等待,等前面的進(jìn)程釋放掉,才可以獲得鎖。

信號(hào)量與進(jìn)程池的概念上類似,但是要區(qū)分開來,信號(hào)量涉及到加鎖的概念。

Event(事件)

Event用來實(shí)現(xiàn)進(jìn)程間同步通信的。運(yùn)行的機(jī)制是:全局定義了一個(gè)flag,如果flag值為False,當(dāng)程序執(zhí)行event.wait()方法時(shí)就會(huì)阻塞,如果flag值為True時(shí),程序執(zhí)行event.wait()方法時(shí)不會(huì)阻塞繼續(xù)執(zhí)行。

Event常?函數(shù):

  • event.wait():在進(jìn)程中插入一個(gè)標(biāo)記(flag),默認(rèn)為False,可以設(shè)置timeout。
  • event.set():使flag為Ture。
  • event.clear():使flag為False。
  • event.is_set():判斷flag是否為True。
import multiprocessing
import time
def wait_for_event(e):
  print("wait_for_event執(zhí)行")
  e.wait()
  print("wait_for_event: e.is_set():{}".format(e.is_set()))
def wait_for_event_timeout(e, t):
  print("wait_for_event_timeout執(zhí)行")
  # 只會(huì)阻塞2s
  e.wait(t)
  print("wait_for_event_timeout:e.is_set:{}".format(e.is_set()))
if __name__ == "__main__":
  e = multiprocessing.Event()
  p1 = multiprocessing.Process(target=wait_for_event, args=(e,))
  p1.start()
  p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2))
  p2.start()
  time.sleep(4)
  # 4s之后使用e.set()將flag設(shè)為Ture
  e.set()
  print("主進(jìn)程:flag設(shè)置為True")

執(zhí)行結(jié)果如下:

Python多進(jìn)程之進(jìn)程同步及通信詳解

 

總結(jié)

本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注服務(wù)器之家的更多內(nèi)容!

原文鏈接:https://blog.csdn.net/weixin_50097774/article/details/121428521

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 免费高清资源黄网站在线观看 | 亚洲成人黄色 | 国产女乱淫真高清免费视频 | 亚洲不卡视频在线观看 | 成人免费观看网欧美片 | 好大好爽好硬我要喷水了 | 深夜视频免费看 | 亚洲 小说 欧美 激情 另类 | 羞羞色男人的天堂伊人久久 | 激情视频亚洲 | 国产成人咱精品视频免费网站 | 色偷偷亚洲男人 | 日韩在线成人 | 99久久国产综合精品1尤物 | 久久视频这有精品63在线国产 | 好大用力深一点视频 | 国内亚州视频在线观看 | 免费理伦片手机在线播放 | 国产香蕉视频在线观看 | 欧美高清乌克兰精品另类 | 欧美成人aletta ocean | 91精品手机国产在线观 | 忘忧草研究院一二三 | 亚洲欧美日韩综合在线 | 91传媒在线观看 | 精品一区二区三区中文 | 韩国女主播在线大尺无遮挡 | 亚洲精品午夜久久aaa级久久久 | 日本免费不卡在线一区二区三区 | 日本一区三区 | 亚洲高清在线天堂精品 | 日韩精品一二三区 | 国产视频a区 | 日本韩国推理片免费观看网站 | china国产bbw| bl双性小说 | 人禽l交免费视频观看+视频 | 亚洲色图综合网 | 亚洲福利在线观看 | 欧美一区二区三区四区五区六区 | 亚洲AV无码专区国产乱码网站 |