在使用 Celery 的時候發現有的時候 Celery 會將同一個任務執行兩遍,我遇到的情況是相同的任務在不同的 worker 中被分別執行,并且時間只相差幾毫秒。這問題我一直以為是自己哪里處理的邏輯有問題,后來發現其他人 也有類似的問題,然后基本上出問題的都是使用 Redis 作為 Broker 的,而我這邊一方面不想將 Redis 替換掉,就只能在 task 執行的時候加分布式鎖了。
不過在 Celery 的 issue 中搜索了一下,有人使用 Redis 實現了分布式鎖,然后也有人使用了 Celery Once。 大致看了一下 Celery Once ,發現非常符合現在的情況,就用了下。
Celery Once 也是利用 Redis 加鎖來實現, Celery Once 在 Task 類基礎上實現了 QueueOnce 類,該類提供了任務去重的功能,所以在使用時,我們自己實現的方法需要將 QueueOnce 設置為 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 參數表示,在遇到重復方法時的處理方式,默認 graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設置為 True,則靜默處理。
另外如果要手動設置任務的 key,可以指定 keys 參數
1
2
3
4
|
@celery .task(base = QueueOnce, once = { 'keys' : [ 'a' ]}) def slow_add(a, b): sleep( 30 ) return a + b |
總得來說,分為幾步
第一步,安裝
pip install -U celery_once
第二步,增加配置
1
2
3
4
5
6
7
8
9
10
11
12
|
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery( 'tasks' , broker = 'amqp://guest@localhost//' ) celery.conf.ONCE = { 'backend' : 'celery_once.backends.Redis' , 'settings' : { 'url' : 'redis://localhost:6379/0' , 'default_timeout' : 60 * 60 } } |
第三步,修改 delay 方法
1
2
3
|
example.delay( 10 ) # 修改為 result = example.apply_async(args = ( 10 )) |
第四步,修改 task 參數
1
2
3
4
|
@celery .task(base = QueueOnce, once = { 'graceful' : True , keys ': [' a']}) def slow_add(a, b): sleep( 30 ) return a + b |
參考鏈接 https://github.com/cameronmaske/celery-once
到此這篇關于使用 Celery Once 來防止 Celery 重復執行同一個任務的文章就介紹到這了,更多相關 Celery 重復執行同一個任務內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/qq_41333582/article/details/83899884