redis redisson 限流器實(shí)例
作用:限制一段時(shí)間內(nèi)對(duì)數(shù)據(jù)的訪問數(shù)量
相關(guān)接口
RRateLimiter
1
2
3
4
5
6
7
8
9
10
11
|
public interface RRateLimiter extends RRateLimiterAsync, RObject { boolean trySetRate(RateType var1, long var2, long var4, RateIntervalUnit var6); //設(shè)置訪問速率,var2為訪問數(shù),var4為單位時(shí)間,var6為時(shí)間單位 void acquire(); //訪問數(shù)據(jù) void acquire( long var1); //占var1的速度計(jì)算值 boolean tryAcquire(); //嘗試訪問數(shù)據(jù) boolean tryAcquire( long var1); //嘗試訪問數(shù)據(jù),占var1的速度計(jì)算值 boolean tryAcquire( long var1, TimeUnit var3); //嘗試訪問數(shù)據(jù),設(shè)置等待時(shí)間var3 boolean tryAcquire( long var1, long var3, TimeUnit var5); //嘗試訪問數(shù)據(jù),占數(shù)據(jù)計(jì)算值var1,設(shè)置等待時(shí)間var3 RateLimiterConfig getConfig(); } |
RateType
:速度類型
1
2
3
4
5
6
7
|
public enum RateType { OVERALL, //所有客戶端加總限流 PER_CLIENT; //每個(gè)客戶端單獨(dú)計(jì)算流量 private RateType() { } } |
RateInternalUnit
:速度單位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public enum RateIntervalUnit { MILLISECONDS { public long toMillis( long value) { return value; } }, SECONDS { public long toMillis( long value) { return TimeUnit.SECONDS.toMillis(value); } }, MINUTES { public long toMillis( long value) { return TimeUnit.MINUTES.toMillis(value); } }, HOURS { public long toMillis( long value) { return TimeUnit.HOURS.toMillis(value); } }, DAYS { public long toMillis( long value) { return TimeUnit.DAYS.toMillis(value); } }; private RateIntervalUnit() { } public abstract long toMillis( long var1); } |
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public class MyTest8 { public static void main(String[] args){ Config config= new Config(); config.useSingleServer().setAddress( "redis://192.168.57.120:6379" ).setPassword( "123456" ); RedissonClient client= Redisson.create(config); RRateLimiter rateLimiter=client.getRateLimiter( "rate_limiter" ); rateLimiter.trySetRate(RateType.PER_CLIENT, 5 , 2 , RateIntervalUnit.MINUTES); ExecutorService executorService= Executors.newFixedThreadPool( 10 ); for ( int i= 0 ;i< 10 ;i++){ executorService.submit(()->{ try { rateLimiter.acquire(); System.out.println( "線程" +Thread.currentThread().getId()+ "進(jìn)入數(shù)據(jù)區(qū):" +System.currentTimeMillis()); } catch (Exception e){ e.printStackTrace(); } }); } } } |
控制臺(tái)輸出
線程49進(jìn)入數(shù)據(jù)區(qū):1574672546522
線程55進(jìn)入數(shù)據(jù)區(qū):1574672546522
線程56進(jìn)入數(shù)據(jù)區(qū):1574672546526
線程50進(jìn)入數(shù)據(jù)區(qū):1574672546523
線程48進(jìn)入數(shù)據(jù)區(qū):1574672546523
線程51進(jìn)入數(shù)據(jù)區(qū):1574672666627
線程53進(jìn)入數(shù)據(jù)區(qū):1574672666627
線程54進(jìn)入數(shù)據(jù)區(qū):1574672666627
線程57進(jìn)入數(shù)據(jù)區(qū):1574672666628
線程52進(jìn)入數(shù)據(jù)區(qū):1574672666628
說明:兩分鐘之內(nèi)最多只有5個(gè)線程在執(zhí)行
分布式限流redission RRateLimiter使用及原理
前提:
最近公司在做有需求在做分布式限流,調(diào)研的限流框架大概有
- 1、spring cloud gateway集成redis限流,但屬于網(wǎng)關(guān)層限流
- 2、阿里Sentinel,功能強(qiáng)大、帶監(jiān)控平臺(tái)
- 3、srping cloud hystrix,屬于接口層限流,提供線程池與信號(hào)量?jī)煞N方式
- 4、其他:redission、手?jǐn)]代碼
實(shí)際需求情況屬于業(yè)務(wù)端限流,redission更加方便,使用更加靈活,下面介紹下redission分布式限流如何使用及原理:
一、使用
使用很簡(jiǎn)單、如下
1
2
3
4
5
6
7
8
|
// 1、 聲明一個(gè)限流器 RRateLimiter rateLimiter = redissonClient.getRateLimiter(key); // 2、 設(shè)置速率,5秒中產(chǎn)生3個(gè)令牌 rateLimiter.trySetRate(RateType.OVERALL, 3 , 5 , RateIntervalUnit.SECONDS); // 3、試圖獲取一個(gè)令牌,獲取到返回true rateLimiter.tryAcquire( 1 ) |
二、原理
1、getRateLimiter
1
2
|
// 聲明一個(gè)限流器 名稱 叫key redissonClient.getRateLimiter(key) |
2、trySetRate
trySetRate方法跟進(jìn)去底層實(shí)現(xiàn)如下:
1
2
3
4
5
6
7
8
|
@Override public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);" + "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);" + "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);" , Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal()); } |
舉個(gè)例子,更容易理解:
比如下面這段代碼,5秒中產(chǎn)生3個(gè)令牌,并且所有實(shí)例共享(RateType.OVERALL所有實(shí)例共享、RateType.CLIENT單實(shí)例端共享)
1
|
trySetRate(RateType.OVERALL, 3 , 5 , RateIntervalUnit.SECONDS); |
那么redis中就會(huì)設(shè)置3個(gè)參數(shù):
hsetnx,key,rate,3
hsetnx,key,interval,5
hsetnx,key,type,0
接著看tryAcquire(1)方法:底層源碼如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" //1 + "local interval = redis.call('hget', KEYS[1], 'interval');" //2 + "local type = redis.call('hget', KEYS[1], 'type');" //3 + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" //4 + "local valueName = KEYS[2];" //5 + "if type == 1 then " + "valueName = KEYS[3];" //6 + "end;" + "local currentValue = redis.call('get', valueName); " //7 + "if currentValue ~= false then " + "if tonumber(currentValue) < tonumber(ARGV[1]) then " //8 + "return redis.call('pttl', valueName); " + "else " + "redis.call('decrby', valueName, ARGV[1]); " //9 + "return nil; " + "end; " + "else " //10 + "redis.call('set', valueName, rate, 'px', interval); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;" , Arrays.<Object>asList(getName(), getValueName(), getClientValueName()), value, commandExecutor.getConnectionManager().getId().toString()); } |
第1、2、3備注行是獲取上一步set的3個(gè)值:rate、interval、type,如果這3個(gè)值沒有設(shè)置,直接返回rateLimiter沒有被初始化。
第5備注行聲明一個(gè)變量叫valueName 值為KEYS[2],KEYS[2]對(duì)應(yīng)的值是getValueName()方法,getValueName()返回的就是上面第一步getRateLimiter我們?cè)O(shè)置的key;如果type=1,表示全局共享,那么valueName 的值改為取KEYS[3],KEYS[3]對(duì)應(yīng)的值為getClientValueName(),查看getClientValueName()源碼:
1
2
3
|
String getClientValueName() { return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString()); } |
ConnectionManager().getId()如下:
1
2
3
4
|
public interface ConnectionManager { UUID getId(); 省略... } |
這個(gè)getId()是每個(gè)客戶端初始化的時(shí)候生成的UUID,即每個(gè)客戶端的getId是唯一的,這也就驗(yàn)證了trySetRate方法中RateType.ALL與RateType.PER_CLIENT的作用。
- 接著看第7標(biāo)準(zhǔn)行,獲取valueName對(duì)應(yīng)的值currentValue;首次獲取肯定為空,那么看第10標(biāo)準(zhǔn)行else的邏輯
- set valueName 3 px 5,設(shè)置key=valueName value=3 過期時(shí)間為5秒
- decrby valueName 1,將上面valueName的值減1
- 那么如果第二次訪問,第7標(biāo)注行返回的值存在,將會(huì)走第8標(biāo)注行,緊接著走如下判斷
- 如果當(dāng)前valueName的值也就是3,小于要獲得的令牌數(shù)量(tryAcquire方法中的入?yún)?,那么說明當(dāng)前時(shí)間內(nèi)(key的有效期5秒內(nèi)),令牌的數(shù)量已經(jīng)被用完,返回pttl(key的剩余過期時(shí)間);反之說明桶中有足夠的令牌,獲取之后將會(huì)把桶中的令牌數(shù)量減1,至此結(jié)束。
總結(jié):
redission分布式限流采用令牌桶思想和固定時(shí)間窗口,trySetRate方法設(shè)置桶的大小,利用redis key過期機(jī)制達(dá)到時(shí)間窗口目的,控制固定時(shí)間窗口內(nèi)允許通過的請(qǐng)求量。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/weixin_43931625/article/details/103234206