在使用Celery統計每日訪問數量的時候,發現一個任務會同時執行兩次,發現同一時間內(1s內)竟然同時發送了兩次任務,也就是同時產生了兩個worker,造成統計兩次,一直找不到原因。
參考:http://m.ythuaji.com.cn/article/225461.html
有人使用 Redis 實現了分布式鎖,然后也有人使用了 Celery Once。
Celery Once 也是利用 Redis 加鎖來實現, Celery Once 在 Task 類基礎上實現了 QueueOnce 類,該類提供了任務去重的功能,所以在使用時,我們自己實現的方法需要將 QueueOnce 設置為 base
1
|
@task (base = QueueOnce, once = { 'graceful' : True }) |
后面的 once 參數表示,在遇到重復方法時的處理方式,默認 graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設置為 True,則靜默處理。
另外如果要手動設置任務的 key,可以指定 keys 參數
1
2
3
4
|
@celery .task(base = QueueOnce, once = { 'keys' : [ 'a' ]}) def slow_add(a, b): sleep( 30 ) return a + b |
解決步驟
Celery One允許你將Celery任務排隊,防止多次執行
安裝
pip install -U celery_once
要求,需要Celery4.0,老版本可能運行,但不是官方支持的。
使用celery_once,tasks需要繼承一個名為QueueOnce的抽象base tasks
Once安裝完成后,需要配置一些關于ONCE的選項在Celery配置中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery( 'tasks' , broker = 'amqp://guest@localhost//' ) # 一般之前的配置沒有這個,需要添加上 celery.conf.ONCE = { 'backend' : 'celery_once.backends.Redis' , 'settings' : { 'url' : 'redis://localhost:6379/0' , 'default_timeout' : 60 * 60 } } # 在原本沒有參數的里面加上base @celery .task(base = QueueOnce) def slow_task(): sleep( 30 ) return "Done!" |
要確定配置,需要取決于使用哪個backend進行鎖定,查看Backends
在后端,這將覆蓋apply_async和delay。它不影響直接調用任務。
在運行任務時,celery_once檢查是否沒有鎖定(針對Redis鍵)。否則,任務將正常運行。一旦任務完成(或由于異常而結束),鎖將被清除。如果在任務完成之前嘗試再次運行該任務,將會引發AlreadyQueued異常。
example.delay(10)
example.delay(10)
Traceback (most recent call last):
..
AlreadyQueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
Traceback (most recent call last):
..
AlreadyQueued()
graceful:如果在任務的選項中設置了once={'graceful': True},或者在運行時設置了apply_async,則任務可以返回None,而不是引發AlreadyQueued異常。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from celery_once import AlreadyQueued # Either catch the exception, try : example.delay( 10 ) except AlreadyQueued: pass # Or, handle it gracefully at run time. result = example. apply (args = ( 10 ), once = { 'graceful' : True }) # or by default. @celery .task(base = QueueOnce, once = { 'graceful' : True }) def slow_task(): sleep( 30 ) return "Done!" |
其他功能請訪問:https://pypi.org/project/celery_once/
到此這篇關于通過celery_one避免Celery定時任務重復執行的文章就介紹到這了,更多相關Celery定時任務重復執行內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.jianshu.com/p/285dc3d703f4