一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - 編程技術 - RPC框架編寫實踐-RPC常見限流方法的實現

RPC框架編寫實踐-RPC常見限流方法的實現

2022-01-07 22:47博海拾貝diaryso1n 編程技術

在微服務中, 雖然服務間的調用都是可信的, 但是服務端也需要堤防一些流量, 防止被意外的流量擊垮, 而通過限流可以防止問題的發生。此外, 使用不同的限流規則還能根據系統間不同服務的請求進行限制, 解決某個函數被

RPC框架編寫實踐-RPC常見限流方法的實現

前記

微服務中, 雖然服務間的調用都是可信的, 但是服務端也需要堤防一些流量, 防止被意外的流量擊垮, 而通過限流可以防止問題的發生。此外, 使用不同的限流規則還能根據系統間不同服務的請求進行限制, 解決某個函數被頻繁調用而拖垮整個系統的問題。

NOTE: 雖然本文是在編寫RPC框架有感而發, 但是也適用于常見的Web服務等有流量進出的場景。

最新修訂見閱讀原文

1 限流的簡介

1.1 限流的作用和場景

對于后端服務來說, 他們提供的服務都有一個極限的QPS(除代碼邏輯外,也跟機器配置有關), 當服務端的壓力超過這個極限值的時候, 服務端的響應性能就會快速的下降, 然后無法提供服務, 所以服務端需要一個類似于可以限制請求數的功能, 使服務端能犧牲掉部分請求, 保證還能處理一定量的請求, 防止服務端出現壓力瓶頸,無法處理所有請求。

不過這個功能還需要盡量的智能, 在設計時可以根據流量場景不同來做有差別的限制, 使其在不影響其它請求的情況下, 實現部分請求的網絡流量整形, 達到減少系統資源消耗的效果, 常見的幾種需要做差別限制的場景如下:

場景 可能造成的影響 限流的作用
總體的API有大量的并發調用, 導致系統QPS超過設計值 機器可能會扛不住, 造成系統崩潰 減少進入業務的流量, 保證QPS被限制在某個合理值, 其它請求會被丟棄
某個API耗時比較長, 其它API的QPS位于合理范圍內 由于API耗時較長, 該API的調用次數變多的情況下, 會明顯消耗系統資源, 同時也可能造成數據競爭的情況 針對性的限制耗時API, 防止該API引起系統崩潰
總體API的QPS位于合理范圍內, 但是有部分參數會引起較大的系統資源消耗 比如某個篩選參數造成查全表的情況, 此時可能造成數據庫處理能力下降,進而造成后端服務無響應 針對性的根據耗時API的參數進行限制限制, 防止該API引起系統崩潰
總體API的QPS位于合理范圍內, 但某個API的某個參數被大多數人調用, 導致整個API無法提供服務, 比如微博的話題功能, 如果有個爆炸性話題, 這個話題就會成為熱點參數 造成整個API無法使用, 嚴重時會造成整個服務不可用 通過對熱點參數的限制, 保證其它功能能正常使用

1.2 限流的組件

通過上述場景可以看到, 在這些場景中限流的作用是差不多的, 一般只涉及到兩個維度:

  • 時間:對某個時間窗口進行限流
  • 資源:針對某個API或者某個API的參數進行限流,達到保護后方對應的資源。

限流可以保證在某段時間內的某個資源的請求數量不會超過設計值, 達到保護系統的作用, 不過不同場景主要差別是限制的資源維度不一樣, 資源維度的變化從總體服務到某個API到某個API的某個參數, 資源維度越來越細, 而這個資源維度區分也就是我們要實現限流的第一步--流量匹配, 只要流量匹配了, 限流系統就可以開始工作了, 一般的限流系統流程圖如下(其中他服務核心代表微服務核心):

RPC框架編寫實踐-RPC常見限流方法的實現

限流

流程圖中第一步是規則匹配, 它會通過一個函數把流量提取出來, 當做Key, 這個Key等于某個資源, 然后判斷這個Key是否匹配到規則, 如果命中規則就開始執行規則并結合這段規則和限流算法來判斷該流量是否限流, 如果限流就丟棄或者等待, 如果沒被限流, 就直接放行。

此外, 流程圖的最下層有一個很大的Backend, 它可以用來存儲規則以及存儲一些限流相關的計算變量。其中,限流相關的計算變量都是跟時間相關的, 且每次都要進行讀寫, 最好的情況是放在內存之中,不過它不能跟請求綁定在一起, 因為跟當前請求的生命周期不一樣, 不能在發送請求結束后就把變量回收了, 這些變量也需要有個容器可以存儲, 供不同的請求讀寫, 但是在一個集群服務中, 每個機器都只存儲自己的計算變量則會導致多臺機器沒辦法共享數據而造成限流失敗。

比如針對某個用戶可以調用某個API的規則是一秒內可以請求十次, 目前有十臺機器, 他們不會互相共享自己的限流計算變量, 那么在最壞的情況下, 用戶可以在1秒內訪問100次請求而不被限流, 這樣是達不了限流的效果的, 所以限流必定是一個中心化的應用。目前兩個比較主流的限流方案分別是網關限流和中間件限流, 網關限流場景下所有入站流量都會經過網關這個單體, 然后由網關決定是否放行;而中間件限流則是把計算變量都存在某個中間件存儲中, 然后每個服務的限流組件都可以從中間件實時寫入和讀取數據, 其中最常用的中間件是Redis, 因為Redis的速度快, 能讓限流組件很快的判斷是否需要限流, 對機器的性能開銷占比也不是很多, 同時Redis支持的數據結構和功能非常的多, 我們可以很容易的基于它來實現不同的限流算法。

至于限流的規則, 由于它只要寫入一次, 后面都是以讀為主, 所以在網關場景下都存在于內存之中, 但在中間件場景下規則都是存在一個集中式存儲中, 如Etcd, 然后每個服務會同步集中式存儲的規則, 并寫入到自己的內存中。

在實際的落地要選擇網關限流還是中間件限流主要還是取決于是服務的應用場景, 比如接口外層都有加一層網關, 那采用網關限流即可, 如果是內部服務或者該服務的通信協議是自定義的, 則采用中間件方式, 有比較強的自定義性。

在使用Redis下的某些情況下(取決于搭建方式), 有可能造成數據不準的情況, 但是限流的頻率是允許有些許誤差的, 比如限流的規則是1秒可以訪問100次, 但在某些時候只實現了1秒訪問110次也是沒太大關系的。

1.3 限流算法

上面所說的都是一些簡單的概念, 而限流的核心是在于限流算法的實現, 常見的限流算法有以下幾種(由于大多數都限流backend默認是Redis, 所以以可以在Redis運行的lua代碼示例):

1.3.1 固定窗口

固定窗口的原理比較簡單,就是將時間切分成若干個時間片,每個時間片內固定處理若干個請求。比如限流規則是10秒內最多處理5個請求, 那么就會有一個容器來統計這10秒內的請求數, 如果容器的統計數量大于5, 那么后續的請求都會被拒絕, 然后每隔10秒重置這個容器的統計。這種實現非常簡單, 但不是非常嚴謹, 假如限制規則是1秒限制100個, 但在最壞的情況下, 在第一個窗口的0.5秒后到第二個窗口的0.5秒前的這個時間點共計會放行200個請求, 所以固定窗口只適用于一些要求不嚴格的場景, 通過下圖的左圖可以看到限流的流程, 通過下圖的右圖可以看到整個限流曲線不平滑。

RPC框架編寫實踐-RPC常見限流方法的實現

固定窗口

固定窗口的實現很簡單, 在Redis中的lua代碼如下:


  1. -- keys為傳入的命令, 其中keys[1]為限流的key  
  2. -- argv為傳入的參數, ARGV[1]為窗口限制量, ARGV[2]為窗口時間  
  3. local count 
  4. local limit = ARGV[1] 
  5. count = redis.call("incr",KEYS[1]) 
  6. if tonumber(count) == 1 then 
  7.     -- 返回1代表是第一個, 該key剛被創建, 需要設置過期時間 
  8.     redis.call("expire",KEYS[1], ARGV[2]]) 
  9. end 
  10. return count<limit  

1.3.2 滑動窗口

滑動窗口是固定窗口的改進方法, 他是通過增加窗口數量使限流算法更順滑, 本身從一個窗口變為一個先進先出的隊列, 隊列的內容是更加精細的窗口,比如原來是10秒一個窗口, 現在會改為1秒一個窗口, 然后每隔一秒鐘滑動一個窗口。只寫入最新的窗口而讀取判斷時都是取最近10個窗口, 這樣就可以通過減小粒度來讓限流算法更加精細, 可以看到波動幅度會變小(取決于精細程度):

RPC框架編寫實踐-RPC常見限流方法的實現

滑動窗口的實現也是很簡單的, 具體見:RPC框架編寫實踐--服務治理的基石, 在Redis可以采用Zset數據結構進行實現, 這里就不做代碼示例了。滑動窗口是犧牲一定的內存來讓限流變得平滑,窗口數量越多, 限流速率越精細, 占用的內存就越大, 同時獲取數據時都是獲取一批窗口的數據, 相比于固定窗口來說,它的時間復雜度也會跟著變多(O(k))。

1.3.3 漏桶

漏桶的出現可以完美的解決參差不齊的速率限制問題, 漏桶算法的核心原理是進入漏桶的請求量不限制, 但能漏出去的速率請求是恒定的, 這樣就能完美的控制請求的速率, 如果桶滿了, 在漏桶里的請求就會溢出去, 達到丟棄請求的目的, 如下圖, 整個請求的速率都是很平滑的, 沒有多少毛尖:

RPC框架編寫實踐-RPC常見限流方法的實現

從圖中可以看到, 漏桶的原理很像一個FIFO隊列, 然后有個定時器會以恒定的速率把請求取出來, 使用Python代碼實現如下:


  1. import asyncio 
  2.  
  3. # 假設容量只有10 
  4. import time 
  5.  
  6. leaky_bucket: asyncio.Queue = asyncio.Queue(10) 
  7. loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() 
  8.  
  9.  
  10. async def demo_request(cnt: int) -> None: 
  11.     """模仿請求""" 
  12.     msg: str = f"I'm mock request:{cnt}" 
  13.     future: asyncio.Future = asyncio.Future() 
  14.     try: 
  15.         leaky_bucket.put_nowait(future) 
  16.     except asyncio.QueueFull: 
  17.         # 代表桶滿了, 溢出來, 該請求要提前拋棄 
  18.         print(f"Fail Request:{msg}"
  19.     else
  20.         # 等待放行 
  21.         await future 
  22.         print(time.time(), msg) 
  23.  
  24.  
  25. def timer() -> None: 
  26.     """定時放行請求""" 
  27.     try: 
  28.         # 放行該請求 
  29.         future: asyncio.Future = leaky_bucket.get_nowait() 
  30.         future.set_result(True
  31.     except asyncio.QueueEmpty: 
  32.         pass 
  33.     # 一秒執行一次 
  34.     loop.call_later(1, timer) 
  35.  
  36. timer() 
  37. # 模擬并發12個請求 
  38. loop.run_until_complete(asyncio.gather(*[demo_request(i) for i in range(12)])) 

但是, 這樣實現的漏桶算法依然需要占用一些空間用來存儲等待放行的請求, 直到放行才被釋放。為了解決空間占用的問題, 可以采用GCRA算法, 它從另外一個角度看起來跟漏桶算法很像(GRRA應該被認為是計量器實現的漏桶版本, 而不是上面所說的隊列形漏桶), 但很省空間占用, 因為無論漏桶多大, 它的空間占用都是恒定的, 只需要存漏水速率(可以認為是該時間段可以放行的請求量)以及桶目前的容量即可。

使用GCRA算法之所以能這樣省空間, 主要還是它是基于虛擬調度實現的, 它只需要存一個漏水速率,然后每次有請求進來時判斷現在可否可以漏水, 如果可以就放行, 如果不可以則判斷桶是否滿, 滿則拋棄請求, 沒滿則讓請求等待, 直到可以放行為止。常見的GCRA限流實現一般都考慮使用redis-cell, 它的使用方法如下:


  1. # 第一個參數為命令, 第二個參數是要限流的key, 第三個參數是桶容量, 第四第五綜合起來為漏桶速率, 第五個為每次漏多少 
  2.  
  3. # 第一次請求放行, 可以發現容量變多了一個 
  4. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  5. 1) "0"     # 0表示允許, 1表示拒絕 
  6. 2) "3"     # 漏桶容量(會比輸入的多1) 
  7. 3) "2"     # 漏斗剩余空間 
  8. 4) "-1"    # 如果拒絕, 需要多長時間后重試, 單位秒 
  9. 5) "10"    # 多少時間后,漏桶完全空了, 單位秒 
  10.  
  11. # 第二次請求被放行, 但是漏斗已經被占了一個空位 
  12. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  13. 1) "0" 
  14. 2) "3" 
  15. 3) "1" 
  16. 4) "-1" 
  17. 5) "18" 
  18. # 第三次請求被放行, 但是漏斗已經被占了兩個個空位 
  19. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  20. 1) "0" 
  21. 2) "3" 
  22. 3) "0" 
  23. 4) "-1" 
  24. 5) "27" 
  25. # 第四次請求不被放行, 但是漏斗沒有空位了 
  26. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  27. 1) "1" 
  28. 2) "3" 
  29. 3) "0" 
  30. 4) "6" 
  31. 5) "26" 

從命令可以看出, 即使漏斗中還有數據沒漏出去, 返回值得第一個也還是0, 表示放行, 這樣并不是一個完善的GCRA。為了實現一個完備的GCRA, 我們需要額外的在代碼判斷漏桶是否全空, 如果放行且桶不是全空, 則需要在代碼判斷多久后才能會為空, 這個時間也就是請求的等待放行時間, 在等待這段時間后才能放行請求, 如果不是放行, 則直接丟棄請求即可。

不過, 如果不做任何判斷, 直接使用返回值的第一個值來判斷是否放行請求, 那這個實現就很像下面所說的令牌桶的實現。

1.3.4 令牌桶

漏桶能很好的控制速率, 使其變得平滑, 但是它沒辦法應對突發流量, 比如我們把規則定義為10秒內可以請求10次, 對于漏桶來說, 它會控制為1秒放行一個請求, 如果同時收到10個請求時它則會分開10秒放行每個請求。然而10秒內可以請求10次的含義是10秒內總共可以請求10次, 也就是允許在這10秒內的某個瞬間同時放行10個請求, 對于這個問題可以使用令牌桶來解決, 令牌桶和漏桶很像, 只是漏桶控制的是請求, 令牌桶控制的是令牌發放速度。

令牌桶算法規定每個請求需要從桶里拿到并消耗一個令牌才可以放行, 拿不到則會被拋棄, 同時令牌桶本身會以恒定的速率產生令牌, 直到桶滿為止, 這樣就可以保證限流的平緩, 同時又能應對突發請求, 令牌桶的原理圖和限流曲線圖如下, 其中限流曲線圖表示初始時桶里面放滿了令牌, 所以放行的請求很多, 隨著令牌被逐漸消耗并消耗光了, 限流的曲率會穩定在一條線上, 也就是令牌的生產速率:

RPC框架編寫實踐-RPC常見限流方法的實現

同樣的, 在實現令牌桶時為了減少空間的占用, 也會使用虛擬調度方法, 只存一個時間和容量到內存中, 每次收到請求時都會根據請求的時間和在內存中的時間差值再乘以速率計算這段時間應該產生的令牌數量并存到內存中, 然后再判斷是否有足夠的令牌來判斷是否放行請求, 具體的Redis lua代碼實現如下:


  1. local key = KEYS[1] -- key 
  2. local current_time = redis.call('TIME')[1] -- redis時間戳 
  3. local interval_per_token = tonumber(ARGV[1]) --每個單位產生多少個token 
  4. local max_token = tonumber(ARGV[2]) -- 桶最大的量 
  5. local init_token = tonumber(ARGV[3]) -- 桶初始量 
  6. local tokens 
  7. -- 上次請求時保留的桶數據 
  8. local bucket = redis.call("hmget"key"last_time""last_token"
  9. local last_time= bucket[1] 
  10. local last_token = bucket[2] 
  11. if last_time == false or last_token == false then 
  12.     -- 如果沒數據, 則代表該資源是第一次訪問, 進行初始化 
  13.     tokens = init_token 
  14.     redis.call('hset'key'last_time'current_time
  15. else 
  16.     -- 算出間隔時間 
  17.     local this_interval = current_time - tonumber(last_time) 
  18.     if this_interval > 1 then 
  19.         -- 算出該時間應該產生的令牌 
  20.         local tokens_to_add = math.floor(this_interval * interval_per_token) 
  21.         -- 算出真實可以擁有的令牌 
  22.         tokens = math.min(last_token + tokens_to_add, max_token) 
  23.         -- 保存數據 
  24.         redis.call('hset'key'last_time'current_time
  25.     else 
  26.         tokens = tonumber(last_token) 
  27.     end 
  28. end 
  29. if tokens < 1 then 
  30.     -- 令牌不夠消費 
  31.     redis.call('hset'key'last_token', tokens) 
  32.     return -1 
  33. else 
  34.     -- 消費令牌并返回令牌數, 代表可以消費 
  35.     tokens = tokens - 1 
  36.     redis.call('hset'key'last_token', tokens) 
  37.     return tokens 
  38. end 

2.具體實現

上面說完了算法實現后, 接下來來看看該如何結合算法進行實現, 由于代碼會隨時更新,具體源碼更新見:https://github.com/so1n/rap/tree/master/rap/server/plugin/processor/limit

項目的代碼結構如下, 在常見的后端服務中需要占用空間少, 然后速度盡量快點限流組件, 所以一般只用漏桶或者令牌桶且基于Redis的實現, 這里就不會去實現窗口相關的限流了:


  1. ├── backend  # 算法 
  2. │   ├── base.py  # 封裝的協議 
  3. │   └── redis.py  # 基于redis當做banckend的算法實現 
  4. ├── core.py  # 核心判斷代碼, 實際上是一個中間流量處理 
  5. ├── rule.py  # 規則聲明 
  6. └── util.py  # 其它小代碼 

首先是rule.py里的規則類, 它主要是聲明了限流速率, 初始化token數量, 最多的tokens數量以及停用時間, 其中停用時間是用來防止惡意用戶頻繁刷新, 它的邏輯是當漏桶已經滿了或者令牌桶沒有令牌的時候, 限流組件會在停用時間內不再提供服務。

然后就是backend.base.py, 它是一個限流算法的統一封裝, 代碼如下:


  1. from typing import Any, Coroutine, Union 
  2.  
  3. from rap.server.plugin.processor.limit.rule import Rule 
  4.  
  5.  
  6. class BaseLimitBackend(object): 
  7.     def can_requests(self, key: str, ruleRule, token_num: int = 1) -> Union[bool, Coroutine[AnyAny, bool]]: 
  8.         raise NotImplementedError 
  9.  
  10.     def expected_time(self, key: str, ruleRule) -> Union[float, Coroutine[AnyAnyfloat]]: 
  11.         raise NotImplementedError 

這個類它聲明了兩個方法, 一個是can_request, 它會根據算法來判斷是否放行, 如果需要等待, 則會在這個方法里進行等待, 直到到時間后才返回放行標記, 其中can_request還內嵌了一個block_time的邏輯;另外一個是expected_time, 用來獲取下次可用的時間, 具體的實現以RedisCellBackend為例子, 它是一個子類。

它的最上層實現是BaseLimitBackend, 然后就是繼承于BaseLimitBackend的BaseRedisBackend, 這個組件Redis限流算法的基礎實現, 主要是實現了一個停用時間的邏輯, 當發現不放行請求的時候, 會啟用停用邏輯, 以停用后續相同key的請求:


  1. class BaseRedisBackend(BaseLimitBackend, ABC): 
  2.     def __init__(self, redis: Union[StrictRedis, StrictRedisCluster]): 
  3.         # 初始化Redis模塊 
  4.         self._redis: "Union[StrictRedis, StrictRedisCluster]" = redis 
  5.  
  6.     async def _block_time_handle(self, key: str, ruleRule, func: Callable[..., Awaitable[bool]]) -> bool: 
  7.         """處理block_time邏輯""" 
  8.         block_time_key: str = f"{key}:block_time" 
  9.         bucket_block_time: Optional[int] = rule.block_time 
  10.  
  11.         if bucket_block_time is not None and await self._redis.exists(block_time_key): 
  12.             # 啟用block_time邏輯, 且key已經存在, 那么直接返回False告訴該請求應該被拒絕  
  13.             return False 
  14.  
  15.         # 執行正真的判斷是否限流邏輯 
  16.         can_requests: bool = await func() 
  17.  
  18.         if not can_requests and bucket_block_time is not None: 
  19.             # 啟用block_time邏輯且被限流時, 正式啟用block time邏輯  
  20.             await self._redis.set(block_time_key, bucket_block_time, ex=bucket_block_time) 
  21.  
  22.         return can_requests 

接著就是繼承于BaseRedisBackend的BaseRedisCellBackend, 它主要是提供一個命令調用的封裝以及獲取還有多久后才能請求的封裝:


  1. class BaseRedisCellBackend(BaseRedisBackend): 
  2.     ""
  3.     use redis-cell module 
  4.     learn more:https://github.com/brandur/redis-cell 
  5.  
  6.     input: CL.THROTTLE user123 15 30 60 1 
  7.         # param  |  desc 
  8.         # user123 key 
  9.         # 15 maxburst 
  10.         # 30 token 
  11.         # 60 seconds 
  12.         # 1 apply 1token 
  13.     output
  14.         1) (integer) 0        # is allowed 
  15.         2) (integer) 16       # total bucket num 
  16.         3) (integer) 15       # the remaining limit of the key
  17.         4) (integer) -1       # the number of seconds until the user should retry, 
  18.                               #   and always -1 if the action was allowed. 
  19.         5) (integer) 2        # The number of seconds until the limit will reset to its maximum capacity 
  20.     ""
  21.  
  22.     async def _call_cell(self, key: str, ruleRule, token_num: int = 1) -> List[int]: 
  23.         """調用redis_cell""" 
  24.         result: List[int] = await self._redis.execute_command( 
  25.             "CL.THROTTLE"keyrule.max_token - 1, rule.gen_token, int(rule.total_second), token_num 
  26.         ) 
  27.         return result 
  28.  
  29.     def expected_time(self, key: str, ruleRule) -> Union[float, Coroutine[AnyAnyfloat]]: 
  30.         """獲取下次可請求時間""" 
  31.         async def _expected_time() -> float
  32.             block_time_key: str = key + ":block_time" 
  33.             block_time = await self._redis.get(block_time_key) 
  34.             if block_time: 
  35.                 return await self._redis.ttl(block_time_key) 
  36.  
  37.             result: List[int] = await self._call_cell(keyrule, 0) 
  38.             return float(max(result[3], 0)) 
  39.  
  40.         return _expected_time() 

最后就是真正的對外使用的限流組件實現, 這個實現是基于漏桶算法的, 它繼承于BaseRedisCellBackend(另外一個繼承于BaseRedisCellBackend的實現是基于令牌桶算法的, 可以通過源碼了解), 可以看到非常的簡單, 本質上是基于redis-cell的返回判斷是否放行。


  1. class RedisCellBackend(BaseRedisCellBackend): 
  2.  
  3.     def can_requests(self, key: str, ruleRule, token_num: int = 1) -> Union[bool, Coroutine[AnyAny, bool]]: 
  4.         """通過redis-cell判斷是否可以請求,以及是否需要休眠等待, 如果需要則休眠固定的時間后再放行""" 
  5.         async def _can_requests() -> bool: 
  6.             result: List[int] = await self._call_cell(keyrule, token_num) 
  7.             can_requests: bool = result[0] == 0 
  8.             if can_requests and result[4]: 
  9.                 await asyncio.sleep(result[4]) 
  10.             return can_requests 
  11.  
  12.         return self._block_time_handle(keyrule, _can_requests) 

了解完算法的實現后, 接下來就是核心的判斷邏輯, 具體見注釋:


  1. class LimitProcessor(BaseProcessor): 
  2.     def __init__(self, backend: BaseLimitBackend, rule_list: List[Tuple[RULE_FUNC_TYPE, Rule]]): 
  3.         """初始化規則和算法邏輯, 這里的規則之所以是使用傳參的方式是僅供參考, 后續整個框架的配置都會抽離成一個config, 供其它組件調用""" 
  4.         self._backend: BaseLimitBackend = backend 
  5.         self._rule_list: List[Tuple[RULE_FUNC_TYPE, Rule]] = rule_list 
  6.  
  7.     async def process_request(self, request: Request) -> Request: 
  8.         # not limit client event 
  9.         if request.msg_type == constant.CLIENT_EVENT: 
  10.             # 屏蔽event請求 
  11.             return request 
  12.  
  13.         for func, rule in self._rule_list: 
  14.             # 獲取該請求的key 
  15.             if inspect.iscoroutinefunction(func): 
  16.                 key, is_ignore_limit = await func(request)  # type: ignore 
  17.             else
  18.                 key, is_ignore_limit = func(request) 
  19.             if is_ignore_limit: 
  20.                 # 如果該請求不應該限流, 直接跳過限流邏輯 
  21.                 return request 
  22.             if key
  23.                 # 匹配到key, 進入限流邏輯 
  24.                 break 
  25.         else
  26.             raise TooManyRequest() 
  27.  
  28.         # 通過backend判斷是否限流 
  29.         key = f"rap:processor:{self.__class__.__name__}:{key}" 
  30.         can_requests: Union[bool, Awaitable[bool]] = self._backend.can_requests(keyrule
  31.         if inspect.isawaitable(can_requests): 
  32.             can_requests = await can_requests  # type: ignore 
  33.         if not can_requests: 
  34.             # 如果被限流, 返回異常, 并告知要何時后才可以再次請求 
  35.             expected_time: Union[float, Awaitable[float]] = self._backend.expected_time(keyrule
  36.             if inspect.isawaitable(expected_time): 
  37.                 expected_time = await expected_time  # type: ignore 
  38.             raise TooManyRequest(extra_msg=f"expected time: {expected_time}"
  39.         return request 

至此, 整個限流邏輯實現完畢, 本章內容完。

3.其它碎碎念

3.1.熱點參數實現

由于大部分的限流實現的backend都只要依賴于Redis, 所以代碼倉里面只有Redis一種類型的backend,但是有一些限流實現需要依賴于一些特殊的backend,比如熱點參數限流, 還有蜜罐之類的場景。

以熱點參數限流場景為例子, 熱點參數是一個寫大于讀的應用場景, 而且跟時間強相關, 所以選用時序數據庫做backend,之前選用過Graphite當做backend, 具體實現如圖后端服務會把每次請求參數都記錄到時序數據庫中, 并使用一個定時腳本每隔一段時間把最近的熱點參數數據拉取到緩存中, 供后端服務的限流組件判斷是否該放行。其中, 這個間隔一般控制在1秒左右, 所以這是一個近實時的實現, 具體的實現圖如下:

RPC框架編寫實踐-RPC常見限流方法的實現

當請求進來的時候, 限流中間件會通過異步的方法把數據記錄到時序數據庫中, 比如一個請求為http://127.0.0.1:80?q=1&b=2,中間件就會發送一個以{prefix}.hot_param.b=2&b=2為key, value為1的標準Statsd的count類型數據到Statsd組件中。

這個Key采用標準的Statsd命令, 以.分割有三個值, 第一個是前綴它與業務相關, 如業務名, 函數名,namespace等等; 第二個是代表是熱點參數的業務;第三個是參數Key, 這里以&為分割號, 然后按Key順序排序,重新拼接為一個字符串, 這樣即使請求時順序不一致也能識別到時同種請求。

Statsd組件收到了數據后會自行進行統計, 統計一個時間區間都數據并寫入到Graphite中, 然后通過定時腳本使用Graphite API拉取統計次數大于條件的數據寫入到Redis緩存中, 其中Statsd組件的時間區間和定時腳本的定時時間都會控制在一秒左右, 所以這是一個近實時的實現, 在計算性能消耗和實現效果直接做取舍。

數據寫入到Redis后, 這個寫入數據和統計的異步流程就結束了, 中間件在記錄數據后, 會通過Redis判斷是否是熱點參數, 并根據規則判斷是否放行, 到了這里就跟上面的限流流程差不多了。

3.2.限流算法拓展

限流算法不止是用于算法, 也可以用于別的地方,比如有一些游戲活動,體力值滿時為5, 然后玩家每次出發活動會減少1個體力值, 然后可以使用限流算法每隔一個固定時間則增加一個體力值等等。在遇到業務需求有跟時間相關且像上述所說的體力值會恢復的情況時, 可以往限流算法思考。

原文鏈接:https://mp.weixin.qq.com/s/AmBcmyLNq_3UqL5a4wXcSw

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日产免费自线一二区 | free性丰满hd性欧美厨房 | 深夜福利在线播放 | 车上小婕子系列辣文小说 | 福利视频一区二区思瑞 | 91污污视频 | 好男人在线观看免费高清2019韩剧 | 精品福利一区二区免费视频 | 干美女视频 | 欧洲网色偷偷亚洲男人的天堂 | 无人影院免费观看 | 狠狠做五月深爱婷婷天天综合 | 国产 日韩欧美 | 亚洲va欧美va国产综合久久 | 青青草原在线 | 亚洲国产精品嫩草影院永久 | 91精品国产色综合久久不卡蜜 | 日韩毛片在线视频 | 我的妹妹最近有点怪在线观看 | 亚洲欧美乱| 95在线观看精品视频 | 日韩在线观看网址 | 国产精品福利在线观看免费不卡 | 青草国产福利视频免费观看 | 暖暖的免费观看高清视频韩国 | 暖暖免费高清完整版观看日本 | 30分钟的高清视频在线观看 | 热99re久久精品精品免费 | 啪一啪在线视频 | 男人捅女人漫画 | 天天干天天色综合网 | 亚洲高清在线视频 | 男人j放进女人的p视频免费 | 2020精品极品国产色在线观看 | 日本护士撒尿xxxx18 | 国产亚洲成归v人片在线观看 | 韩国伊人 | 性刺激欧美三级在线现看中文 | 亚洲欧美自偷自拍另类小说 | 国产免费看黄的私人影院 | 午夜久久精品 |