最近上線的項目中數據庫數據已經臨近飽和,最大的一張表數據已經接近3000w,百萬數據的表也有幾張,項目要求讀數據(select)時間不能超過0.05秒,但實際情況已經不符合要求,explain建立索引,使用redis,ehcache緩存技術也已經滿足不了要求,所以開始使用讀寫分離技術,可能以后數據量上億或者更多的時候,需要再去考慮分布式數據庫的部署,但目前來看,讀寫分離+緩存+索引+表分區+sql優化+負載均衡是可以滿足億級數據量的查詢工作的,現在就一起來看一下親測可用的使用spring實現讀寫分離的步驟:
1. 背景
我們一般應用對數據庫而言都是“讀多寫少”,也就說對數據庫讀取數據的壓力比較大,有一個思路就是說采用數據庫集群的方案,
其中一個是主庫,負責寫入數據,我們稱之為:寫庫;
其它都是從庫,負責讀取數據,我們稱之為:讀庫;
那么,對我們的要求是:
1、讀庫和寫庫的數據一致;(這個是很重要的一個問題,處理業務邏輯要放在service層去處理,不要在dao或者mapper層面去處理)
2、寫數據必須寫到寫庫;
3、讀數據必須到讀庫;
2. 方案
解決讀寫分離的方案有兩種:應用層解決和中間件解決。
2.1. 應用層解決:
優點:
1、多數據源切換方便,由程序自動完成;
2、不需要引入中間件;
3、理論上支持任何數據庫;
缺點:
1、由程序員完成,運維參與不到;
2、不能做到動態增加數據源;
2.2. 中間件解決
優缺點:
優點:
1、源程序不需要做任何改動就可以實現讀寫分離;
2、動態添加數據源不需要重啟程序;
缺點:
1、程序依賴于中間件,會導致切換數據庫變得困難;
2、由中間件做了中轉代理,性能有所下降;
3. 使用spring基于應用層實現
3.1. 原理
在進入service之前,使用aop來做出判斷,是使用寫庫還是讀庫,判斷依據可以根據方法名判斷,比如說以query、find、get等開頭的就走讀庫,其他的走寫庫。
3.2. dynamicdatasource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import org.springframework.jdbc.datasource.lookup.abstractroutingdatasource; /** * 定義動態數據源,實現通過集成spring提供的abstractroutingdatasource,只需要實現determinecurrentlookupkey方法即可 * * 由于dynamicdatasource是單例的,線程不安全的,所以采用threadlocal保證線程安全,由dynamicdatasourceholder完成。 * * @author zhijun * */ public class dynamicdatasource extends abstractroutingdatasource{ @override protected object determinecurrentlookupkey() { // 使用dynamicdatasourceholder保證線程安全,并且得到當前線程中的數據源key return dynamicdatasourceholder.getdatasourcekey(); } } |
3.3. dynamicdatasourceholder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
/** * * 使用threadlocal技術來記錄當前線程中的數據源的key * * @author zhijun * */ public class dynamicdatasourceholder { //寫庫對應的數據源key private static final string master = "master" ; //讀庫對應的數據源key private static final string slave = "slave" ; //使用threadlocal記錄當前線程的數據源key private static final threadlocal<string> holder = new threadlocal<string>(); /** * 設置數據源key * @param key */ public static void putdatasourcekey(string key) { holder.set(key); } /** * 獲取數據源key * @return */ public static string getdatasourcekey() { return holder.get(); } /** * 標記寫庫 */ public static void markmaster(){ putdatasourcekey(master); } /** * 標記讀庫 */ public static void markslave(){ putdatasourcekey(slave); } } |
3.4. datasourceaspect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import org.apache.commons.lang3.stringutils; import org.aspectj.lang.joinpoint; /** * 定義數據源的aop切面,通過該service的方法名判斷是應該走讀庫還是寫庫 * * @author zhijun * */ public class datasourceaspect { /** * 在進入service方法之前執行 * * @param point 切面對象 */ public void before(joinpoint point) { // 獲取到當前執行的方法名 string methodname = point.getsignature().getname(); if (isslave(methodname)) { // 標記為讀庫 dynamicdatasourceholder.markslave(); } else { // 標記為寫庫 dynamicdatasourceholder.markmaster(); } } /** * 判斷是否為讀庫 * * @param methodname * @return */ private boolean isslave(string methodname) { // 方法名以query、find、get開頭的方法名走從庫 return stringutils.startswithany(methodname, "query" , "find" , "get" ); } } |
3.5. 配置2個數據源
3.5.1. jdbc.properties
1
2
3
4
5
6
7
8
9
|
jdbc.master.driver=com.mysql.jdbc.driver jdbc.master.url=jdbc:mysql: //127.0.0.1:3306/mybatis_1128?useunicode=true&characterencoding=utf8&autoreconnect=true&allowmultiqueries=true jdbc.master.username=root jdbc.master.password= 123456 jdbc.slave01.driver=com.mysql.jdbc.driver jdbc.slave01.url=jdbc:mysql: //127.0.0.1:3307/mybatis_1128?useunicode=true&characterencoding=utf8&autoreconnect=true&allowmultiqueries=true jdbc.slave01.username=root jdbc.slave01.password= 123456 |
3.5.2. 定義連接池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
<!-- 配置連接池 --> <bean id= "masterdatasource" class = "com.jolbox.bonecp.bonecpdatasource" destroy-method= "close" > <!-- 數據庫驅動 --> <property name= "driverclass" value= "${jdbc.master.driver}" /> <!-- 相應驅動的jdbcurl --> <property name= "jdbcurl" value= "${jdbc.master.url}" /> <!-- 數據庫的用戶名 --> <property name= "username" value= "${jdbc.master.username}" /> <!-- 數據庫的密碼 --> <property name= "password" value= "${jdbc.master.password}" /> <!-- 檢查數據庫連接池中空閑連接的間隔時間,單位是分,默認值: 240 ,如果要取消則設置為 0 --> <property name= "idleconnectiontestperiod" value= "60" /> <!-- 連接池中未使用的鏈接最大存活時間,單位是分,默認值: 60 ,如果要永遠存活設置為 0 --> <property name= "idlemaxage" value= "30" /> <!-- 每個分區最大的連接數 --> <property name= "maxconnectionsperpartition" value= "150" /> <!-- 每個分區最小的連接數 --> <property name= "minconnectionsperpartition" value= "5" /> </bean> <!-- 配置連接池 --> <bean id= "slave01datasource" class = "com.jolbox.bonecp.bonecpdatasource" destroy-method= "close" > <!-- 數據庫驅動 --> <property name= "driverclass" value= "${jdbc.slave01.driver}" /> <!-- 相應驅動的jdbcurl --> <property name= "jdbcurl" value= "${jdbc.slave01.url}" /> <!-- 數據庫的用戶名 --> <property name= "username" value= "${jdbc.slave01.username}" /> <!-- 數據庫的密碼 --> <property name= "password" value= "${jdbc.slave01.password}" /> <!-- 檢查數據庫連接池中空閑連接的間隔時間,單位是分,默認值: 240 ,如果要取消則設置為 0 --> <property name= "idleconnectiontestperiod" value= "60" /> <!-- 連接池中未使用的鏈接最大存活時間,單位是分,默認值: 60 ,如果要永遠存活設置為 0 --> <property name= "idlemaxage" value= "30" /> <!-- 每個分區最大的連接數 --> <property name= "maxconnectionsperpartition" value= "150" /> <!-- 每個分區最小的連接數 --> <property name= "minconnectionsperpartition" value= "5" /> </bean> |
3.5.3. 定義datasource
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<!-- 定義數據源,使用自己實現的數據源 --> <bean id= "datasource" class = "cn.itcast.usermanage.spring.dynamicdatasource" > <!-- 設置多個數據源 --> <property name= "targetdatasources" > <map key-type= "java.lang.string" > <!-- 這個key需要和程序中的key一致 --> <entry key= "master" value-ref= "masterdatasource" /> <entry key= "slave" value-ref= "slave01datasource" /> </map> </property> <!-- 設置默認的數據源,這里默認走寫庫 --> <property name= "defaulttargetdatasource" ref= "masterdatasource" /> </bean> |
3.6. 配置事務管理以及動態切換數據源切面
3.6.1. 定義事務管理器
1
2
3
4
5
|
<!-- 定義事務管理器 --> <bean id= "transactionmanager" class = "org.springframework.jdbc.datasource.datasourcetransactionmanager" > <property name= "datasource" ref= "datasource" /> </bean> |
3.6.2. 定義事務策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<!-- 定義事務策略 --> <tx:advice id= "txadvice" transaction-manager= "transactionmanager" > <tx:attributes> <!--定義查詢方法都是只讀的 --> <tx:method name= "query*" read-only= "true" /> <tx:method name= "find*" read-only= "true" /> <tx:method name= "get*" read-only= "true" /> <!-- 主庫執行操作,事務傳播行為定義為默認行為 --> <tx:method name= "save*" propagation= "required" /> <tx:method name= "update*" propagation= "required" /> <tx:method name= "delete*" propagation= "required" /> <!--其他方法使用默認事務策略 --> <tx:method name= "*" /> </tx:attributes> </tx:advice> |
3.6.3. 定義切面
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<!-- 定義aop切面處理器 --> <bean class = "cn.itcast.usermanage.spring.datasourceaspect" id= "datasourceaspect" /> <aop:config> <!-- 定義切面,所有的service的所有方法 --> <aop:pointcut id= "txpointcut" expression= "execution(* xx.xxx.xxxxxxx.service.*.*(..))" /> <!-- 應用事務策略到service切面 --> <aop:advisor advice-ref= "txadvice" pointcut-ref= "txpointcut" /> <!-- 將切面應用到自定義的切面處理器上,- 9999 保證該切面優先級最高執行 --> <aop:aspect ref= "datasourceaspect" order= "-9999" > <aop:before method= "before" pointcut-ref= "txpointcut" /> </aop:aspect> </aop:config> |
4. 改進切面實現,使用事務策略規則匹配
之前的實現我們是將通過方法名匹配,而不是使用事務策略中的定義,我們使用事務管理策略中的規則匹配。
4.1. 改進后的配置
1
2
3
4
5
6
7
|
<!-- 定義aop切面處理器 --> <bean class = "cn.itcast.usermanage.spring.datasourceaspect" id= "datasourceaspect" > <!-- 指定事務策略 --> <property name= "txadvice" ref= "txadvice" /> <!-- 指定slave方法的前綴(非必須) --> <property name= "slavemethodstart" value= "query,find,get" /> </bean> |
4.2. 改進后的實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
import java.lang.reflect.field; import java.util.arraylist; import java.util.list; import java.util.map; import org.apache.commons.lang3.stringutils; import org.aspectj.lang.joinpoint; import org.springframework.transaction.interceptor.namematchtransactionattributesource; import org.springframework.transaction.interceptor.transactionattribute; import org.springframework.transaction.interceptor.transactionattributesource; import org.springframework.transaction.interceptor.transactioninterceptor; import org.springframework.util.patternmatchutils; import org.springframework.util.reflectionutils; /** * 定義數據源的aop切面,該類控制了使用master還是slave。 * * 如果事務管理中配置了事務策略,則采用配置的事務策略中的標記了readonly的方法是用slave,其它使用master。 * * 如果沒有配置事務管理的策略,則采用方法名匹配的原則,以query、find、get開頭方法用slave,其它用master。 * * @author zhijun * */ public class datasourceaspect { private list<string> slavemethodpattern = new arraylist<string>(); private static final string[] defaultslavemethodstart = new string[]{ "query" , "find" , "get" }; private string[] slavemethodstart; /** * 讀取事務管理中的策略 * * @param txadvice * @throws exception */ @suppresswarnings ( "unchecked" ) public void settxadvice(transactioninterceptor txadvice) throws exception { if (txadvice == null ) { // 沒有配置事務管理策略 return ; } //從txadvice獲取到策略配置信息 transactionattributesource transactionattributesource = txadvice.gettransactionattributesource(); if (!(transactionattributesource instanceof namematchtransactionattributesource)) { return ; } //使用反射技術獲取到namematchtransactionattributesource對象中的namemap屬性值 namematchtransactionattributesource matchtransactionattributesource = (namematchtransactionattributesource) transactionattributesource; field namemapfield = reflectionutils.findfield(namematchtransactionattributesource. class , "namemap" ); namemapfield.setaccessible( true ); //設置該字段可訪問 //獲取namemap的值 map<string, transactionattribute> map = (map<string, transactionattribute>) namemapfield.get(matchtransactionattributesource); //遍歷namemap for (map.entry<string, transactionattribute> entry : map.entryset()) { if (!entry.getvalue().isreadonly()) { //判斷之后定義了readonly的策略才加入到slavemethodpattern continue ; } slavemethodpattern.add(entry.getkey()); } } /** * 在進入service方法之前執行 * * @param point 切面對象 */ public void before(joinpoint point) { // 獲取到當前執行的方法名 string methodname = point.getsignature().getname(); boolean isslave = false ; if (slavemethodpattern.isempty()) { // 當前spring容器中沒有配置事務策略,采用方法名匹配方式 isslave = isslave(methodname); } else { // 使用策略規則匹配 for (string mappedname : slavemethodpattern) { if (ismatch(methodname, mappedname)) { isslave = true ; break ; } } } if (isslave) { // 標記為讀庫 dynamicdatasourceholder.markslave(); } else { // 標記為寫庫 dynamicdatasourceholder.markmaster(); } } /** * 判斷是否為讀庫 * * @param methodname * @return */ private boolean isslave(string methodname) { // 方法名以query、find、get開頭的方法名走從庫 return stringutils.startswithany(methodname, getslavemethodstart()); } /** * 通配符匹配 * * return if the given method name matches the mapped name. * <p> * the default implementation checks for "xxx*", "*xxx" and "*xxx*" matches, as well as direct * equality. can be overridden in subclasses. * * @param methodname the method name of the class * @param mappedname the name in the descriptor * @return if the names match * @see org.springframework.util.patternmatchutils#simplematch(string, string) */ protected boolean ismatch(string methodname, string mappedname) { return patternmatchutils.simplematch(mappedname, methodname); } /** * 用戶指定slave的方法名前綴 * @param slavemethodstart */ public void setslavemethodstart(string[] slavemethodstart) { this .slavemethodstart = slavemethodstart; } public string[] getslavemethodstart() { if ( this .slavemethodstart == null ){ // 沒有指定,使用默認 return defaultslavemethodstart; } return slavemethodstart; } } |
5. 一主多從的實現
很多實際使用場景下都是采用“一主多從”的架構的,所以我們現在對這種架構做支持,目前只需要修改dynamicdatasource即可。
5.1. 實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
import java.lang.reflect.field; import java.util.arraylist; import java.util.list; import java.util.map; import java.util.concurrent.atomic.atomicinteger; import javax.sql.datasource; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.jdbc.datasource.lookup.abstractroutingdatasource; import org.springframework.util.reflectionutils; /** * 定義動態數據源,實現通過集成spring提供的abstractroutingdatasource,只需要實現determinecurrentlookupkey方法即可 * * 由于dynamicdatasource是單例的,線程不安全的,所以采用threadlocal保證線程安全,由dynamicdatasourceholder完成。 * * @author zhijun * */ public class dynamicdatasource extends abstractroutingdatasource { private static final logger logger = loggerfactory.getlogger(dynamicdatasource. class ); private integer slavecount; // 輪詢計數,初始為-1,atomicinteger是線程安全的 private atomicinteger counter = new atomicinteger(- 1 ); // 記錄讀庫的key private list<object> slavedatasources = new arraylist<object>( 0 ); @override protected object determinecurrentlookupkey() { // 使用dynamicdatasourceholder保證線程安全,并且得到當前線程中的數據源key if (dynamicdatasourceholder.ismaster()) { object key = dynamicdatasourceholder.getdatasourcekey(); if (logger.isdebugenabled()) { logger.debug( "當前datasource的key為: " + key); } return key; } object key = getslavekey(); if (logger.isdebugenabled()) { logger.debug( "當前datasource的key為: " + key); } return key; } @suppresswarnings ( "unchecked" ) @override public void afterpropertiesset() { super .afterpropertiesset(); // 由于父類的resolveddatasources屬性是私有的子類獲取不到,需要使用反射獲取 field field = reflectionutils.findfield(abstractroutingdatasource. class , "resolveddatasources" ); field.setaccessible( true ); // 設置可訪問 try { map<object, datasource> resolveddatasources = (map<object, datasource>) field.get( this ); // 讀庫的數據量等于數據源總數減去寫庫的數量 this .slavecount = resolveddatasources.size() - 1 ; for (map.entry<object, datasource> entry : resolveddatasources.entryset()) { if (dynamicdatasourceholder.master.equals(entry.getkey())) { continue ; } slavedatasources.add(entry.getkey()); } } catch (exception e) { logger.error( "afterpropertiesset error! " , e); } } /** * 輪詢算法實現 * * @return */ public object getslavekey() { // 得到的下標為:0、1、2、3…… integer index = counter.incrementandget() % slavecount; if (counter.get() > 9999 ) { // 以免超出integer范圍 counter.set(- 1 ); // 還原 } return slavedatasources.get(index); } } |
6. mysql主從復制
6.1. 原理
mysql主(稱master)從(稱slave)復制的原理:
1、master將數據改變記錄到二進制日志(binarylog)中,也即是配置文件log-bin指定的文件(這些記錄叫做二進制日志事件,binary log events)
2、slave將master的binary logevents拷貝到它的中繼日志(relay log)
3、slave重做中繼日志中的事件,將改變反映它自己的數據(數據重演)
6.2. 主從配置需要注意的地方
1、主db server和從db server數據庫的版本一致
2、主db server和從db server數據庫數據一致[ 這里就會可以把主的備份在從上還原,也可以直接將主的數據目錄拷貝到從的相應數據目錄]
3、主db server開啟二進制日志,主db server和從db server的server_id都必須唯一
6.3. 主庫配置(windows,linux下也類似)
可能有些朋友主從數據庫的ip地址、用戶名和賬號配置不是很清楚,下面是我測試的主從配置,ip都是127.0.0.1,我在講完自己的例子后,還會寫
一個主從ip是不相同的配置的例子,大家可以通過這個例子去更加直觀的了解配置方法。
在my.ini [mysqld] 下面修改(從庫也是如此):
1
2
3
4
5
6
|
#開啟主從復制,主庫的配置 log-bin= mysql3306-bin #指定主庫serverid server-id= 101 #指定同步的數據庫,如果不指定則同步全部數據庫 binlog- do -db=mybatis_1128 |
(my.ini中輸入的這些命令一定要和下面有一行空格,不然mysql不識別)
執行sql語句查詢狀態:show master status
需要記錄下position值,需要在從庫中設置同步起始值。
另外我再說一點,如果您在mysql執行show master status 發現配置在my.ini中的內容沒有起到效果,可能原因是并沒有選擇對my.ini文件,也可能是您沒有重啟服務,很大概率是后者造成的原因,
要想使配置生效,必須關掉mysql服務,再重新啟動。
關閉服務的方法:
win鍵打開,輸入services.msc調出服務:
再啟動sqlyog,發現配置已經生效了。
6.4. 在主庫創建同步用戶
1
2
3
|
#授權用戶slave01使用 123456 密碼登錄mysql grant replication slave on *.* to 'slave01' @ '127.0.0.1' identified by '123456' ; flush privileges; |
6.5. 從庫配置
在my.ini修改:
1
2
|
#指定serverid,只要不重復即可,從庫也只有這一個配置,其他都在sql語句中操作 server-id= 102 |
以下執行sql(使用從機的root賬戶執行):
1
2
3
4
5
6
7
|
changematerto mater_hot= '127.0.0.1' , //主機的ip地址 mater_uer= 'lave01' , //主機的用戶(就是剛剛在主機通過ql創建出來的賬戶) mater_paword= '123456' , mater_port= 3306 , mater_log_file= 'myql3306-bin.000006' , //file mater_log_po= 1120 ; //poition |
1
2
3
4
|
#啟動slave同步 start slave; #查看同步狀態 show slave status; |
下面是ip不同的兩臺電腦的主從配置方法:
主數據庫所在的操作系統:win7
主數據庫的版本:5.0
主數據庫的ip地址:192.168.1.111
從數據庫所在的操作系統:linux
從數據的版本:5.0
從數據庫的ip地址:192.168.1.112
介紹完了環境,就聊聊配置步驟:
1、確保主數據庫與從數據庫一模一樣。
例如:主數據庫里的a的數據庫里有b,c,d表,那從數據庫里的就應該有一個模子刻出來的a的數據庫和b,c,d表
2、在主數據庫上創建同步賬號。
GRANT REPLICATION SLAVE,FILE ON *.* TO 'mstest'@'192.168.1.112' IDENTIFIED BY '123456';
192.168.1.112:是運行使用該用戶的ip地址
mstest:是新創建的用戶名
123456:是新創建的用戶名的密碼
以上命令的詳細解釋,最好百度一下,寫太多反到更加更不清思路。
3、配置主數據庫的my.ini(因為是在window下,所以是my.ini不是my.cnf)。
1
2
3
4
5
|
[mysqld] server-id= 1 log-bin=log binlog- do -db=mstest //要同步的mstest數據庫,要同步多個數據庫,就多加幾個binlog-do-db=數據庫名 binlog-ignore-db=mysql //要忽略的數據庫 |
4、配置從數據庫的my.cnf。
1
2
3
4
5
6
7
8
9
|
[mysqld] server-id= 2 master-host= 192.168 . 1.111 master-user=mstest //第一步創建賬號的用戶名 master-password= 123456 //第一步創建賬號的密碼 master-port= 3306 master-connect-retry= 60 replicate- do -db=mstest //要同步的mstest數據庫,要同步多個數據庫,就多加幾個replicate-do-db=數據庫名 replicate-ignore-db=mysql //要忽略的數據庫 |
5、驗證是否成功
進入mysql,后輸入命令:show slave status\g。將顯示下圖。如果slave_io_running和slave_sql_running都為yes,那么表明可以成功同步了
6、測試同步數據。
進入主數據庫輸入命令:insert into one(name) values('beijing');
然后進入從數據庫輸入命令:select * from one;
如果此時從數據庫有獲取到數據,說明同步成功了,主從也就實現了
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/fengwenzhee/p/7193218.html