本文主要是基于 redis 6.2 源碼進行分析定時事件的數據結構和常見操作。
數據結構
在 redis 中通過 aeTimeEvent 結構來創建定時任務事件,代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
/* Time event structure */ typedef struct aeTimeEvent { // 標識符 long long id; /* time event identifier. */ // 定時納秒數 monotime when; // 定時回調函數 aeTimeProc *timeProc; // 注銷定時器時候的回調函數 aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */ } aeTimeEvent; |
常見操作
1. 創建定時事件
redis 中最重要的定時函數且是周期執行的函數,使用的是 serverCron 函數。在 redis 中由于定時任務比較少,因此并沒有嚴格的按照過期時間來排序的,而是按照 id自增 + 頭插法 來保證基本有序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic( "Can't create event loop timers." ); exit (1); } //創建定時器對象 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc( sizeof (*te)); if (te == NULL) return AE_ERR; te->id = id; te->when = getMonotonicUs() + milliseconds * 1000; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; // 頭插法 te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; } |
2. 觸發定時事件
redis 中是采用 IO 復用來進行定時任務的。
查找距離現在最近的定時事件,見 usUntilEarliestTimer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
? /* How many microseconds until the first timer should fire. * If there are no timers, -1 is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...): * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */ static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; if (te == NULL) return -1; aeTimeEvent *earliest = NULL; while (te) { if (!earliest || te->when < earliest->when) earliest = te; te = te->next; } monotime now = getMonotonicUs(); return (now >= earliest->when) ? 0 : earliest->when - now; } ? |
這里時間復雜度可能比較高,實際中需要結合具體場景使用。
更新剩余過期時間,想想為啥呢?因為我們前面提到過,IO 復用有可能因為 IO 事件返回,所以需要更新。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop); if (usUntilTimer >= 0) { tv.tv_sec = usUntilTimer / 1000000; tv.tv_usec = usUntilTimer % 1000000; tvp = &tv; } else { if (flags & AE_DONT_WAIT) { // 不等待 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } |
3. 執行定時事件
一次性的執行完直接刪除,周期性的執行完在重新添加到鏈表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
/* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; monotime now = getMonotonicUs(); // 刪除定時器 while (te) { long long id; // 下一輪中對事件進行刪除 /* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ if (te->refcount) { te = next; continue ; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } zfree(te); te = next; continue ; } if (te->id > maxId) { te = te->next; continue ; } if (te->when <= now) { int retval; id = te->id; te->refcount++; // timeProc 函數返回值 retVal 為時間事件執行的間隔 retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { te->when = now + retval * 1000; } else { // 如果超時了,那么標記為刪除 te->id = AE_DELETED_EVENT_ID; } } // 執行下一個 te = te->next; } return processed; } |
總結
優點:實現簡單
缺點:如果定時任務很多,效率比較低。
到此這篇關于Redis定時任務原理的實現的文章就介紹到這了,更多相關Redis定時任務內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://juejin.cn/post/7072016644272291847