一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python使用signal定時結束AsyncIOScheduler任務的問題

Python使用signal定時結束AsyncIOScheduler任務的問題

2021-12-15 10:32臨淵(v:superz-han) Python

這篇文章主要介紹了Python使用signal定時結束AsyncIOScheduler任務,在使用aiohttp結合apscheduler的AsyncIOScheduler模擬定點并發的時候遇到兩個問題,針對每個問題給大家詳細介紹,需要的朋友可以參考下

在使用aiohttp結合apscheduler的AsyncIOScheduler模擬定點并發的時候遇到兩個問題

  1. 在調度器scheduler.start()后,程序直接退出(在Jupiter中任務可以正常啟動)
  2. 如何在指定時間調用scheduler.shutdown()? (因為程序直接退出了)

原調試代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from datetime import datetime, timedelta
 
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()
 
async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)
 
if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10個任務
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()

Google后發現AsyncIOScheduler的使用需要在scheduler啟動后,需要自己調用asyncio.get_event_loop().run_forever()來啟動協程任務。
但是一旦run_forever()則就會阻塞至死。除非有KeyboardInterrupt, SystemExit等異常或者強殺來停止其運行。
此時想到使用Python的signal來定時發送信號,修改后程序如下,可以正常延遲停止(感覺有點像模擬Go的defer)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# -*- coding: utf-8 -*-
"""
@Time : 2021/7/23
@Auth : hanzhichao
@Desc:
"""
from datetime import datetime, timedelta
import signal
import asyncio
 
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
 
async def get(session):
    url = 'https://httpbin.org/get?a=1'
    async with session.get(url) as res:
        print('get', res.status)
        return await res.text()
 
async def post(session):
    url = 'https://httpbin.org/post?b=2'
    async with session.post(url) as res:
        print('post', res.status)
        return await res.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        await get(session)
        await post(session)
 
if __name__ == '__main__':
    jobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
    scheduler = AsyncIOScheduler(jobstores=jobstores)
    for i in range(10):  # 添加10個任務
        job = scheduler.add_job(main, 'date', run_date=datetime.now() + timedelta(seconds=10))
    scheduler.start()
    signal.alarm(20# 20秒后終止程序
    asyncio.get_event_loop().run_forever()  # 永遠運行

到此這篇關于Python使用signal定時結束AsyncIOScheduler任務的文章就介紹到這了,更多相關Python定時結束AsyncIOScheduler任務內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://www.cnblogs.com/superhin/p/15060818.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲国产网址 | 成人 在线欧美亚洲 | 校园高h | tubehdxx丝袜正片| 成人欧美一区二区三区 | 日韩资源在线 | 青青久久精品国产免费看 | 女人张开腿让男人做爽爽 | 亚洲六月丁香婷婷综合 | 欧美性一区二区三区 | 欧美高清乌克兰精品另类 | fuqer老师| 经典WC女厕所里TV | 亚洲国产一区 | 国内精品视频一区二区三区 | 亚洲 日本 天堂 国产 在线 | 亚洲精品国产AV成人毛片 | 午夜伦午夜伦锂电影 | 秋霞一级 | 国产成人激情 | 欧美一级欧美三级 | 九九大香尹人视频免费 | 久久se视频精品视频在线 | 福利片成人午夜在线 | 日韩夫妻性生活 | 奇米影视777最新在线 | 美女扒开胸罩露出奶 | 性刺激欧美三级在线现看中文 | 欧美日韩中文国产一区二区三区 | 2018久久精品热在线观看 | 久久综合久综合久久鬼色 | tube8老师| 鬼吹灯天星术在线高清观看 | 果冻传媒在线播放观看228集 | 日韩欧美不卡片 | 91制片厂制作传媒网站 | 欧美色阁 | 成年女人毛片免费观看97 | 亚洲日本中文字幕天堂网 | 高h孕交| 男人把j放进女人的p里视频 |