原理
通過線程安全findAndModify 實現(xiàn)鎖
實現(xiàn)
定義鎖存儲對象:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
/** * mongodb 分布式鎖 */ @Data @NoArgsConstructor @AllArgsConstructor @Document (collection = "distributed-lock-doc" ) public class LockDocument { @Id private String id; private long expireAt; private String token; } |
定義Lock API:
1
2
3
4
5
6
7
8
|
public interface LockService { String acquire(String key, long expiration); boolean release(String key, String token); boolean refresh(String key, String token, long expiration); } |
獲取鎖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Override public String acquire(String key, long expiration) { Query query = Query.query(Criteria.where( "_id" ).is(key)); String token = this .generateToken(); Update update = new Update() .setOnInsert( "_id" , key) .setOnInsert( "expireAt" , System.currentTimeMillis() + expiration) .setOnInsert( "token" , token); FindAndModifyOptions options = new FindAndModifyOptions().upsert( true ) .returnNew( true ); LockDocument doc = mongoTemplate.findAndModify(query, update, options, LockDocument. class ); boolean locked = doc.getToken() != null && doc.getToken().equals(token); // 如果已過期 if (!locked && doc.getExpireAt() < System.currentTimeMillis()) { DeleteResult deleted = this .mongoTemplate.remove( Query.query(Criteria.where( "_id" ).is(key) .and( "token" ).is(doc.getToken()) .and( "expireAt" ).is(doc.getExpireAt())), LockDocument. class ); if (deleted.getDeletedCount() >= 1 ) { // 成功釋放鎖, 再次嘗試獲取鎖 return this .acquire(key, expiration); } } log.debug( "Tried to acquire lock for key {} with token {} . Locked: {}" , key, token, locked); return locked ? token : null ; } |
原理:
- 先嘗試upsert鎖對象,如果成功且token一致,說明拿到鎖
- 否則加鎖失敗
-
如果未拿到鎖,但是鎖已過期,嘗試刪除鎖
- 如果刪除成功,再次嘗試拿鎖
- 如果失敗,說明鎖可能已經(jīng)續(xù)期了
釋放和續(xù)期鎖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
@Override public boolean release(String key, String token) { Query query = Query.query(Criteria.where( "_id" ).is(key) .and( "token" ).is(token)); DeleteResult deleted = mongoTemplate.remove(query, LockDocument. class ); boolean released = deleted.getDeletedCount() == 1 ; if (released) { log.debug( "Remove query successfully affected 1 record for key {} with token {}" , key, token); } else if (deleted.getDeletedCount() > 0 ) { log.error( "Unexpected result from release for key {} with token {}, released {}" , key, token, deleted); } else { log.error( "Remove query did not affect any records for key {} with token {}" , key, token); } return released; } @Override public boolean refresh(String key, String token, long expiration) { Query query = Query.query(Criteria.where( "_id" ).is(key) .and( "token" ).is(token)); Update update = Update.update( "expireAt" , System.currentTimeMillis() + expiration); UpdateResult updated = mongoTemplate.updateFirst(query, update, LockDocument. class ); final boolean refreshed = updated.getModifiedCount() == 1 ; if (refreshed) { log.debug( "Refresh query successfully affected 1 record for key {} " + "with token {}" , key, token); } else if (updated.getModifiedCount() > 0 ) { log.error( "Unexpected result from refresh for key {} with token {}, " + "released {}" , key, token, updated); } else { log.warn( "Refresh query did not affect any records for key {} with token {}. " + "This is possible when refresh interval fires for the final time " + "after the lock has been released" , key, token); } return refreshed; } |
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private LockService lockService; private void tryAcquireLockAndSchedule() { while (! this .stopSchedule) { // 嘗試拿鎖 this .token = this .lockService.acquire(SCHEDULER_LOCK, 20000 ); if ( this .token != null ) { // 拿到鎖 } else { // 等待LOCK_EXPIRATION, 再次嘗試 Thread.sleep(LOCK_EXPIRATION); } } } |
- 先嘗試拿鎖,如果獲取到token,說明拿鎖成功
- 否則可以sleep一段時間后再拿鎖
完整代碼,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java
到此這篇關于java基于mongodb實現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關java mongodb實現(xiàn)分布式鎖內(nèi)容請搜索服務器之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.cnblogs.com/xiaoqi/p/mongodb-lock.html