方案一:數據庫樂觀鎖
樂觀鎖通常實現基于數據版本(version)的記錄機制實現的,比如有一張紅包表(t_bonus),有一個字段(left_count)記錄禮物的剩余個數,用戶每領取一個獎品,對應的left_count減1,在并發的情況下如何要保證left_count不為負數,樂觀鎖的實現方式為在紅包表上添加一個版本號字段(version),默認為0。
異常實現流程
1
2
3
4
5
6
7
8
9
10
11
12
|
-- 可能會發生的異常情況 -- 線程 1 查詢,當前left_count為 1 ,則有記錄 select * from t_bonus where id = 10001 and left_count > 0 -- 線程 2 查詢,當前left_count為 1 ,也有記錄 select * from t_bonus where id = 10001 and left_count > 0 -- 線程 1 完成領取記錄,修改left_count為 0 , update t_bonus set left_count = left_count - 1 where id = 10001 -- 線程 2 完成領取記錄,修改left_count為- 1 ,產生臟數據 update t_bonus set left_count = left_count - 1 where id = 10001 |
通過樂觀鎖實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
-- 添加版本號控制字段 ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus; -- 線程 1 查詢,當前left_count為 1 ,則有記錄,當前版本號為 1234 select left_count, version from t_bonus where id = 10001 and left_count > 0 -- 線程 2 查詢,當前left_count為 1 ,有記錄,當前版本號為 1234 select left_count, version from t_bonus where id = 10001 and left_count > 0 -- 線程 1 ,更新完成后當前的version為 1235 ,update狀態為 1 ,更新成功 update t_bonus set version = 1235 , left_count = left_count- 1 where id = 10001 and version = 1234 -- 線程 2 ,更新由于當前的version為 1235 ,udpate狀態為 0 ,更新失敗,再針對相關業務做異常處理 update t_bonus set version = 1235 , left_count = left_count- 1 where id = 10001 and version = 1234 |
方案二:基于Redis的分布式鎖
SETNX命令(SET if Not eXists)\
語法:SETNX key value\
功能:原子性操作,當且僅當 key 不存在,將 key 的值設為 value ,并返回1;若給定的 key 已經存在,則 SETNX 不做任何動作,并返回0。\
Expire命令\
語法:expire(key, expireTime)\
功能:key設置過期時間\
GETSET命令\
語法:GETSET key value\
功能:將給定 key 的值設為 value ,并返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。\
GET命令\
語法:GET key\
功能:返回 key 所關聯的字符串值,如果 key 不存在那么返回特殊值 nil 。\
DEL命令\
語法:DEL key [KEY …]\
功能:刪除給定的一個或多個 key ,不存在的 key 會被忽略。
第一種:使用redis的setnx()、expire()方法,用于分布式鎖
- setnx(lockkey, 1) 如果返回0,則說明占位失敗;如果返回1,則說明占位成功
- expire()命令對lockkey設置超時時間,為的是避免死鎖問題。
- 執行完業務代碼后,可以通過delete命令刪除key。
這個方案其實是可以解決日常工作中的需求的,但從技術方案的探討上來說,可能還有一些可以完善的地方。比如,如果在第一步setnx執行成功后,在expire()命令執行成功前,發生了宕機的現象,那么就依然會出現死鎖的問題
第二種:使用redis的setnx()、get()、getset()方法,用于分布式鎖,解決死鎖問題
- setnx(lockkey, 當前時間+過期超時時間) ,如果返回1,則獲取鎖成功;如果返回0則沒有獲取到鎖,轉向2。
- get(lockkey)獲取值oldExpireTime ,并將這個value值與當前的系統時間進行比較,如果小于當前系統時間,則認為這個鎖已經超時,可以允許別的請求重新獲取,轉向3。
- 計算newExpireTime=當前時間+過期超時時間,然后getset(lockkey, newExpireTime) 會返回當前lockkey的值currentExpireTime。
- 判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明當前getset設置成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那么當前請求可以直接返回失敗,或者繼續重試。
- 在獲取到鎖之后,當前線程可以開始自己的業務處理,當處理完畢后,比較自己的處理時間和對于鎖設置的超時時間,如果小于鎖設置的超時時間,則直接執行delete釋放鎖;如果大于鎖設置的超時時間,則不需要再鎖進行處理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import cn.com.tpig.cache.redis.RedisService; import cn.com.tpig.utils.SpringUtils; /** * Created by IDEA * User: shma1664 * Date: 2016-08-16 14:01 * Desc: redis分布式鎖 */ public final class RedisLockUtil { private static final int defaultExpire = 60 ; private RedisLockUtil() { // } /** * 加鎖 * @param key redis key * @param expire 過期時間,單位秒 * @return true:加鎖成功,false,加鎖失敗 */ public static boolean lock(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService. class ); long status = redisService.setnx(key, "1" ); if (status == 1 ) { redisService.expire(key, expire); return true ; } return false ; } public static boolean lock(String key) { return lock2(key, defaultExpire); } /** * 加鎖 * @param key redis key * @param expire 過期時間,單位秒 * @return true:加鎖成功,false,加鎖失敗 */ public static boolean lock2(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService. class ); long value = System.currentTimeMillis() + expire; long status = redisService.setnx(key, String.valueOf(value)); if (status == 1 ) { return true ; } long oldExpireTime = Long.parseLong(redisService.get(key, "0" )); if (oldExpireTime < System.currentTimeMillis()) { //超時 long newExpireTime = System.currentTimeMillis() + expire; long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime))); if (currentExpireTime == oldExpireTime) { return true ; } } return false ; } public static void unLock1(String key) { RedisService redisService = SpringUtils.getBean(RedisService. class ); redisService.del(key); } public static void unLock2(String key) { RedisService redisService = SpringUtils.getBean(RedisService. class ); long oldExpireTime = Long.parseLong(redisService.get(key, "0" )); if (oldExpireTime > System.currentTimeMillis()) { redisService.del(key); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public void drawRedPacket( long userId) { String key = "draw.redpacket.userid:" + userId; boolean lock = RedisLockUtil.lock2(key, 60 ); if (lock) { try { //領取操作 } finally { //釋放鎖 RedisLockUtil.unLock(key); } } else { new RuntimeException( "重復領取獎勵" ); } } |
Spring AOP基于注解方式和SpEL實現開箱即用的redis分布式鎖策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * RUNTIME * 定義注解 * 編譯器將把注釋記錄在類文件中,在運行時 VM 將保留注釋,因此可以反射性地讀取。 * @author shma1664 * */ @Retention (RetentionPolicy.RUNTIME) @Target (ElementType.METHOD) public @interface RedisLockable { String[] key() default "" ; long expiration() default 60 ; } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import javax.annotation.Resource; import java.lang.reflect.Method; import com.autohome.api.dealer.util.cache.RedisClient; import com.google.common.base.Joiner; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Component; /** * Created by IDEA * User: mashaohua * Date: 2016-09-28 18:08 * Desc: */ @Aspect @Component public class RedisLockAop { @Resource private RedisClient redisClient; @Pointcut ( "execution(* com.autohome.api.dealer.tuan.service.*.*(..))" ) public void pointcut(){} @Around ( "pointcut()" ) public Object doAround(ProceedingJoinPoint point) throws Throwable{ Signature signature = point.getSignature(); MethodSignature methodSignature = (MethodSignature) signature; Method method = methodSignature.getMethod(); String targetName = point.getTarget().getClass().getName(); String methodName = point.getSignature().getName(); Object[] arguments = point.getArgs(); if (method != null && method.isAnnotationPresent(RedisLockable. class )) { RedisLockable redisLock = method.getAnnotation(RedisLockable. class ); long expire = redisLock.expiration(); String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments); boolean isLock = RedisLockUtil.lock2(redisKey, expire); if (!isLock) { try { return point.proceed(); } finally { unLock2(redisKey); } } else { throw new RuntimeException( "您的操作太頻繁,請稍后再試" ); } } return point.proceed(); } private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) { StringBuilder sb = new StringBuilder(); sb.append( "lock." ).append(targetName).append( "." ).append(methodName); if (keys != null ) { String keyStr = Joiner.on( "." ).skipNulls().join(keys); String[] parameters = ReflectParamNames.getNames(targetName, methodName); ExpressionParser parser = new SpelExpressionParser(); Expression expression = parser.parseExpression(keyStr); EvaluationContext context = new StandardEvaluationContext(); int length = parameters.length; if (length > 0 ) { for ( int i = 0 ; i < length; i++) { context.setVariable(parameters[i], arguments[i]); } } String keysValue = expression.getValue(context, String. class ); sb.append( "#" ).append(keysValue); } return sb.toString(); } |
1
2
3
4
5
6
|
<!-- https://mvnrepository.com/artifact/javassist/javassist --> < dependency > < groupId >org.javassist</ groupId > < artifactId >javassist</ artifactId > < version >3.18.1-GA</ version > </ dependency > |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
import javassist.*; import javassist.bytecode.CodeAttribute; import javassist.bytecode.LocalVariableAttribute; import javassist.bytecode.MethodInfo; import org.apache.log4j.Logger; /** * Created by IDEA * User: mashaohua * Date: 2016-09-28 18:39 * Desc: */ public class ReflectParamNames { private static Logger log = Logger.getLogger(ReflectParamNames. class ); private static ClassPool pool = ClassPool.getDefault(); static { ClassClassPath classPath = new ClassClassPath(ReflectParamNames. class ); pool.insertClassPath(classPath); } public static String[] getNames(String className,String methodName) { CtClass cc = null ; try { cc = pool.get(className); CtMethod cm = cc.getDeclaredMethod(methodName); // 使用javaassist的反射方法獲取方法的參數名 MethodInfo methodInfo = cm.getMethodInfo(); CodeAttribute codeAttribute = methodInfo.getCodeAttribute(); LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag); if (attr == null ) return new String[ 0 ]; int begin = 0 ; String[] paramNames = new String[cm.getParameterTypes().length]; int count = 0 ; int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1 ; for ( int i = 0 ; i < attr.tableLength(); i++){ // 為什么 加這個判斷,發現在windows 跟linux執行時,參數順序不一致,通過觀察,實際的參數是從this后面開始的 if (attr.variableName(i).equals( "this" )){ begin = i; break ; } } for ( int i = begin+ 1 ; i <= begin+paramNames.length; i++){ paramNames[count] = attr.variableName(i); count++; } return paramNames; } catch (Exception e) { e.printStackTrace(); } finally { try { if (cc != null ) cc.detach(); } catch (Exception e2) { log.error(e2.getMessage()); } } return new String[ 0 ]; } } |
在需要使用分布式鎖的地方添加注解
1
2
3
4
5
6
7
8
9
10
11
12
|
/** * 抽獎接口 * 添加redis分布式鎖保證一個訂單只有一個請求處理,防止用戶刷禮物,支持SpEL表達式 * redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId * @param orderId 訂單id * @return 抽中的獎品信息 */ @RedisLockable (key = { "#orderId" }, expiration = 120 ) @Override public BonusConvertBean drawBonus(Integer orderId) throws BonusException{ // 業務邏輯 } |
第三種方案:基于Zookeeper的分布式鎖
利用節點名稱的唯一性來實現獨占鎖
ZooKeeper機制規定同一個目錄下只能有一個唯一的文件名,zookeeper上的一個znode看作是一把鎖,通過createznode的方式來實現。所有客戶端都去創建/lock/${lock_name}_lock節點,最終成功創建的那個客戶端也即擁有了這把鎖,創建失敗的可以選擇監聽繼續等待,還是放棄拋出異常實現獨占鎖。
package com.shma.example.zookeeper.lock;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; /** * Created by IDEA * User: mashaohua * Date: 2016-09-30 16:09 * Desc: */ public class ZookeeperLock implements Lock, Watcher { private ZooKeeper zk; private String root = "/locks" ; //根 private String lockName; //競爭資源的標志 private String myZnode; //當前鎖 private int sessionTimeout = 30000 ; private List<Exception> exception = new ArrayList<Exception>(); /** * 創建分布式鎖,使用前請確認config配置的zookeeper服務可用 * @param config 127.0.0.1:2181 * @param lockName 競爭資源標志,lockName中不能包含單詞lock */ public ZookeeperLock(String config, String lockName){ this .lockName = lockName; // 創建一個與服務器的連接 try { zk = new ZooKeeper(config, sessionTimeout, this ); Stat stat = zk.exists(root, false ); if (stat == null ){ // 創建根節點 zk.create(root, new byte [ 0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } @Override public void lock() { if (exception.size() > 0 ){ throw new LockException(exception.get( 0 )); } if (!tryLock()) { throw new LockException( "您的操作太頻繁,請稍后再試" ); } } @Override public void lockInterruptibly() throws InterruptedException { this .lock(); } @Override public boolean tryLock() { try { myZnode = zk.create(root + "/" + lockName, new byte [ 0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true ; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false ; } @Override public boolean tryLock( long time, TimeUnit unit) throws InterruptedException { return tryLock(); } @Override public void unlock() { try { zk.delete(myZnode, - 1 ); myZnode = null ; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public Condition newCondition() { return null ; } @Override public void process(WatchedEvent watchedEvent) { // } } |
1
2
3
4
5
6
7
8
9
10
11
|
ZookeeperLock lock = null ; try { lock = new ZookeeperLock( "127.0.0.1:2182" , "test1" ); lock.lock(); //業務邏輯處理 } catch (LockException e) { throw e; } finally { if (lock != null ) lock.unlock(); } |
利用臨時順序節點控制時序實現
/lock已經預先存在,所有客戶端在它下面創建臨時順序編號目錄節點,和選master一樣,編號最小的獲得鎖,用完刪除,依次方便。\
算法思路:對于加鎖操作,可以讓所有客戶端都去/lock目錄下創建臨時順序節點,如果創建的客戶端發現自身創建節點序列號是/lock/目錄下最小的節點,則獲得鎖。否則,監視比自己創建節點的序列號小的節點(比自己創建的節點小的最大節點),進入等待。
對于解鎖操作,只需要將自身創建的節點刪除即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
package com.shma.example.zookeeper.lock; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * Created by IDEA * User: mashaohua * Date: 2016-09-30 16:09 * Desc: */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks" ; //根 private String lockName; //競爭資源的標志 private String waitNode; //等待前一個鎖 private String myZnode; //當前鎖 private CountDownLatch latch; //計數器 private int sessionTimeout = 30000 ; private List<Exception> exception = new ArrayList<Exception>(); /** * 創建分布式鎖,使用前請確認config配置的zookeeper服務可用 * @param config 127.0.0.1:2181 * @param lockName 競爭資源標志,lockName中不能包含單詞lock */ public DistributedLock(String config, String lockName){ this .lockName = lockName; // 創建一個與服務器的連接 try { zk = new ZooKeeper(config, sessionTimeout, this ); Stat stat = zk.exists(root, false ); if (stat == null ){ // 創建根節點 zk.create(root, new byte [ 0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper節點的監視器 */ public void process(WatchedEvent event) { if ( this .latch != null ) { this .latch.countDown(); } } public void lock() { if (exception.size() > 0 ){ throw new LockException(exception.get( 0 )); } try { if ( this .tryLock()){ System.out.println( "Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true" ); return ; } else { waitForLock(waitNode, sessionTimeout); //等待鎖 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_" ; if (lockName.contains(splitStr)) throw new LockException( "lockName can not contains \\u000B" ); //創建臨時子節點 myZnode = zk.create(root + "/" + lockName + splitStr, new byte [ 0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created " ); //取出所有子節點 List<String> subNodes = zk.getChildren(root, false ); //取出所有lockName的鎖 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[ 0 ]; if (_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get( 0 )); if (myZnode.equals(root+ "/" +lockObjNodes.get( 0 ))){ //如果是最小的節點,則表示取得鎖 return true ; } //如果不是最小的節點,找到比自己小1的節點 String subMyZnode = myZnode.substring(myZnode.lastIndexOf( "/" ) + 1 ); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1 ); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false ; } public boolean tryLock( long time, TimeUnit unit) { try { if ( this .tryLock()){ return true ; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false ; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower, true ); //判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時注冊監聽 if (stat != null ){ System.out.println( "Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this .latch = new CountDownLatch( 1 ); this .latch.await(waitTime, TimeUnit.MILLISECONDS); this .latch = null ; } return true ; } public void unlock() { try { System.out.println( "unlock " + myZnode); zk.delete(myZnode,- 1 ); myZnode = null ; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this .lock(); } public Condition newCondition() { return null ; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super (e); } public LockException(Exception e){ super (e); } } } |
以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持服務器之家!
原文鏈接:http://www.jianshu.com/p/535efcab356d