一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 一日一技:使用 Asyncio 如何限制協程的并發數

一日一技:使用 Asyncio 如何限制協程的并發數

2021-08-01 23:46未聞Codekingname Java教程

如果大家要限制協程的并發數,那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動協程之前初始化它,然后傳給協程。要確保所有并發協程拿到的是同一個Semaphore對象。

一日一技:使用 Asyncio 如何限制協程的并發數

在昨天的直播中,有同學問道,如果使用 asyncio + httpx 實現并發請求,怎么限制請求的頻率呢?怎么限制最多只能有 x 個請求同時發出呢?我們今天給出兩種方案。

提出問題

 

假設如果我們同時發起12個請求,每個請求的時間不同,那么總共的請求時間大概跟最長耗時的請求差不多。我們先來寫一個用于測試的例子:

  1. import asyncio 
  2. import httpx 
  3. import time 
  4.  
  5.  
  6. async def req(delay): 
  7.     print(f'請求一個延遲為{delay}秒的接口'
  8.     async with httpx.AsyncClient(timeout=20) as client: 
  9.         resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  10.         result = resp.json() 
  11.         print(result) 
  12.  
  13.  
  14. async def main(): 
  15.     start = time.time() 
  16.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8] 
  17.     task_list = [] 
  18.     for delay in delay_list: 
  19.         task = asyncio.create_task(req(delay)) 
  20.         task_list.append(task) 
  21.     await asyncio.gather(*task_list) 
  22.     end = time.time() 
  23.     print(f'一共耗時:{end - start}'
  24.  
  25. asyncio.run(main()) 

這段代碼,使用 for 循環創建了12個協程任務,這些任務幾乎同時運行,于是,請求完成所有的接口,總共耗時如下圖所示:

一日一技:使用 Asyncio 如何限制協程的并發數

現在的問題是,由于網站有反爬蟲機制,最多只能同時發起3個請求。那么我們怎么確保同一時間最多只有3個協程在請求網絡呢?

限制協程任務數

 

第一個方案跟以前限制多線程的線程數的方案相同。我們創建一個列表,確保列表里面最多只有3個任務,然后持續循環檢查,發現有任務完成了,就移除這個完成的任務,并加入一個新的任務,直到待爬的列表為空,這個任務列表也為空。代碼如下:

  1. import asyncio 
  2. import httpx 
  3. import time 
  4.  
  5.  
  6. async def req(delay): 
  7.     print(f'請求一個延遲為{delay}秒的接口'
  8.     async with httpx.AsyncClient(timeout=20) as client: 
  9.         resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  10.         result = resp.json() 
  11.         print(result) 
  12.  
  13.  
  14. async def main(): 
  15.     start = time.time() 
  16.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8] 
  17.     task_list = [] 
  18.     while True
  19.         if not delay_list and not task_list: 
  20.             break 
  21.         while len(task_list) < 3: 
  22.             if delay_list: 
  23.                 delay = delay_list.pop() 
  24.                 task = asyncio.create_task(req(delay)) 
  25.                 task_list.append(task) 
  26.             else
  27.                 break 
  28.         task_list = [task for task in task_list if not task.done()] 
  29.         await asyncio.sleep(1) 
  30.     end = time.time() 
  31.     print(f'一共耗時:{end - start}'
  32.  
  33. asyncio.run(main()) 

運行效果如下圖所示:

一日一技:使用 Asyncio 如何限制協程的并發數

總共耗時大概28秒左右。比串行需要的58秒快了一半,但比全部同時并發多了一倍。

使用 Semaphore

 

asyncio 實際上自帶了一個限制協程數量的類,叫做Semaphore。我們只需要初始化它,傳入最大允許的協程數量,然后就可以通過上下文管理器來使用。我們看一下代碼:

  1. import asyncio 
  2. import httpx 
  3. import time 
  4.  
  5.  
  6. async def req(delay, sem): 
  7.     print(f'請求一個延遲為{delay}秒的接口'
  8.     async with sem: 
  9.         async with httpx.AsyncClient(timeout=20) as client: 
  10.             resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  11.             result = resp.json() 
  12.             print(result) 
  13.  
  14.  
  15. async def main(): 
  16.     start = time.time() 
  17.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8] 
  18.     task_list = [] 
  19.     sem = asyncio.Semaphore(3) 
  20.     for delay in delay_list: 
  21.         task = asyncio.create_task(req(delay, sem)) 
  22.         task_list.append(task) 
  23.     await asyncio.gather(*task_list) 
  24.  
  25.     end = time.time() 
  26.     print(f'一共耗時:{end - start}'
  27.  
  28. asyncio.run(main()) 

運行效果如下圖所示:

一日一技:使用 Asyncio 如何限制協程的并發數

耗時為22秒,比第一個方案更快。

我們來看看Semaphore的用法,它的格式為:

  1. sem = asyncio.Semaphore(同時運行的協程數量) 
  2.  
  3. async def func(sem): 
  4.     async with sem: 
  5.         這里是并發執行的代碼 
  6.  
  7. task_list = [] 
  8. for _ in range(總共需要執行的任務數): 
  9.     task = asyncio.create_task(func(sem)) 
  10.     task_list.append(task) 
  11. await asyncio.gather(*task_list) 

當我們要限制一個協程的并發數的時候,可以在調用協程之前,先初始化一個Semaphore對象。然后把這個對象傳到需要限制并發的協程里面,在協程里面,使用異步上下文管理器包住你的正式代碼:

  1. async with sem: 
  2.     正式代碼 

這樣一來,如果并發數沒有達到限制,那么async with sem會瞬間執行完成,進入里面的正式代碼中。如果并發數已經達到了限制,那么其他的協程會阻塞在async with sem這個地方,直到正在運行的某個協程完成了,退出了,才會放行一個新的協程去替換掉這個已經完成的協程。

這個寫法其實跟多線程的加鎖很像。只不過鎖是確保同一個時間只有一個線程在運行,而Semaphore可以人為指定能有多少個協程同時運行。

如何限制1分鐘內能夠運行的協程數

 

可能同學看了上面的例子以后,只知道如何限制同時運行的協程數。但是怎么限制在一段時間里同時運行的協程數呢?

其實非常簡單,在并發的協程里面加個 asyncio.sleep 就可以了。例如上面的例子,我想限制每分鐘只能有3個協程,那么可以把代碼改為:

  1. async def req(delay, sem): 
  2.     print(f'請求一個延遲為{delay}秒的接口'
  3.     async with sem: 
  4.         async with httpx.AsyncClient(timeout=20) as client: 
  5.             resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  6.             result = resp.json() 
  7.             print(result) 
  8.     await asyncio.sleep(60) 

總結

 

如果大家要限制協程的并發數,那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動協程之前初始化它,然后傳給協程。要確保所有并發協程拿到的是同一個Semaphore對象。

當然,你的程序里面,可能有多個不同的部分,有些部分限制并發數為 a,有些部分限制并發數為 b。那么你可以初始化多個Semaphore對象,分別傳給不同的協程。

原文鏈接:https://mp.weixin.qq.com/s/zs319rzFDE5y7v4egbZOUQ

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日本在线看 | 国产啪精品视频网给免丝袜 | 欧美日韩国产一区二区三区欧 | 婷婷日韩 | 久久一本岛在免费线观看2020 | 12345国产精品高清在线 | 好男人影视社区www在线观看 | 国产91素人搭讪系列天堂 | 无码精品AV久久久奶水 | 77成人影视 | 2020国产精品亚洲综合网 | 成人网18免费网 | 波多野结衣亚洲一区 | 俺去啦最新地址 | 国产va免费精品高清在线观看 | 日本www视频在线观看 | 国产精品在线 | 日本乱中文字幕系列在线观看 | 91制片厂 果冻传媒 天美传媒 | 日韩免费一级毛片 | 午夜性色一区二区三区不卡视频 | 9热在线精品视频观看 | 亚洲国产成人精品 | 欧美成人福利视频 | 欧美一级特黄刺激大片视频 | 四虎影在线永久免费观看 | 饭冈加奈子在线播放观看 | 成人精品福利 | 天美网站传媒入口网址 | 99久久99久久免费精品蜜桃 | 国产精品suv | 国内视频一区二区三区 | 免费在线观看日韩 | 逼逼狗影院| 男人的j伸到女人的屁股眼 男人吃奶动态图 | a一级毛片录像带 录像片 | 青青国产成人久久91网 | 精品国产一区二区三区在线 | 精品免费tv久久久久久久 | 日韩大片在线 | 免费欧美一级 |