一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

服務器之家 - 數據庫 - Mysql - Mysql數據庫監聽binlog的開啟步驟

Mysql數據庫監聽binlog的開啟步驟

2020-11-28 16:44呼延十 Mysql

這篇文章主要給大家介紹了關于Mysql數據庫監聽binlog的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Mysql具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧

前言

我們經常需要根據用戶對自己數據的一些操作來做一些事情.

比如如果用戶刪除了自己的賬號,我們就給他發短信罵他,去發短信求他回來.

類似于這種功能,當然可以在業務邏輯層實現,在收到用戶的刪除請求之后執行這一操作,但是數據庫binlog為我們提供了另外一種操作方法.

監聽binlog,需要兩步,第一步當然是你的mysql需要開啟這一個功能,第二個是要寫程序來對日志進行讀取.

mysql開啟binlog.

首先mysql的binlog日常是不打開的,因此我們需要:

找到mysql的配置文件my.cnf,這個因操作系統不一樣,位置也不一定一樣,可以自己找一下,

在其中加入以下內容:

?
1
2
3
4
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW

之后重啟mysql.

?
1
2
3
4
/ ubuntu
service mysql restart
// mac
mysql.server restart

監測是否開啟成功

進入mysql命令行,執行:

?
1
show variables like '%log_bin%' ;

如果結果如下圖,則說明成功了:

Mysql數據庫監聽binlog的開啟步驟

查看正在寫入的binlog狀態:

Mysql數據庫監聽binlog的開啟步驟

代碼讀取binlog

引入依賴

我們使用開源的一些實現,這里因為一些奇怪的原因,我選用了mysql-binlog-connector-java這個包,(官方github倉庫)[github.com/shyiko/mysq…]具體依賴如下:

?
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
 <dependency>
 <groupId>com.github.shyiko</groupId>
 <artifactId>mysql-binlog-connector-java</artifactId>
 <version>0.17.0</version>
 </dependency>

當然,對binlog的處理有很多開源實現,阿里的cancl就是一個,也可以使用它.

寫個demo

根據官方倉庫中readme里面,來簡單的寫個demo.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new BinaryLogClient.EventListener() {
 
@Override
public void onEvent(Event event) {
// TODO
dosomething();
logger.info(event.toString());
}
});
client.connect();
}

這個完全是根據官方教程里面寫的,在onEvent里面可以寫自己的業務邏輯,由于我只是測試,所以我在里面將每一個event都打印了出來.

之后我手動登錄到mysql,分別進行了增加,修改,刪除操作,監聽到的log如下:

00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
    {before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}

根據自己的業務,封裝一個更好使,更定制的工具類

開始的時候打算貼代碼的,,,但是代碼越寫越多,索性傳在github上了,這里只貼部分的實現.代碼傳送門

實現思路

  1. 支持對單個表的監聽,因為我們不想真的對所有數據庫中的所有數據表進行監聽.
  2. 可以多線程消費.
  3. 把監聽到的內容轉換成我們喜聞樂見的形式(文中的數據結構不一定很好,我沒想到更加合適的了).

所以實現思路大致如下:

  1. 封裝個客戶端,對外只提供獲取方法,屏蔽掉初始化的細節代碼.
  2. 提供注冊監聽器(偽)的方法,可以注冊對某個表的監聽(重新定義一個監聽接口,所有注冊的監聽器實現這個就好).
  3. 真正的監聽器只有客戶端,他將此數據庫實例上的所有操作,全部監聽到并轉換成我們想要的格式LogItem放進阻塞隊列里面.
  4. 啟動多個線程,消費阻塞隊列,對某一個LogItem調用對應的數據表的監聽器,做一些業務邏輯.

初始化代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public MysqlBinLogListener(Conf conf) {
BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
this.parseClient = client;
this.queue = new ArrayBlockingQueue<>(1024);
this.conf = conf;
listeners = new ConcurrentHashMap<>();
dbTableCols = new ConcurrentHashMap<>();
this.consumer = Executors.newFixedThreadPool(consumerThreads);
}

注冊代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
public void regListener(String db, String table, BinLogListener listener) throws Exception {
String dbTable = getdbTable(db, table);
Class.forName("com.mysql.jdbc.Driver");
// 保存當前注冊的表的colum信息
Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
Map<String, Colum> cols = getColMap(connection, db, table);
dbTableCols.put(dbTable, cols);
 
// 保存當前注冊的listener
List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
list.add(listener);
listeners.put(dbTable, list);
}

在這個步驟中,我們在注冊監聽者的同時,獲得了該表的schema信息,并保存到map里面去,方便后續對數據進行處理.

監聽代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
 
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String db = tableData.getDatabase();
String table = tableData.getTable();
dbTable = getdbTable(db, table);
}
 
// 只處理添加刪除更新三種操作
if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
if (isWrite(eventType)) {
WriteRowsEventData data = event.getData();
for (Serializable[] row : data.getRows()) {
 if (dbTableCols.containsKey(dbTable)) {
 LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
 e.setDbTable(dbTable);
 queue.add(e);
 }
}
}
}
}

我偷懶了,,,這里面只實現了對添加操作的處理,其他操作沒有寫.

消費代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void parse() throws IOException {
parseClient.registerEventListener(this);
 
for (int i = 0; i < consumerThreads; i++) {
consumer.submit(() -> {
while (true) {
 if (queue.size() > 0) {
 try {
 LogItem item = queue.take();
 String dbtable = item.getDbTable();
 listeners.get(dbtable).forEach(l -> {
 l.onEvent(item);
 });
 
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 Thread.sleep(1000);
}
});
}
parseClient.connect();
}

消費時,從隊列中獲取item,之后獲取對應的一個或者多個監聽者,分別消費這個item.

測試代碼:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws Exception {
Conf conf = new Conf();
conf.host = "hostname";
conf.port = 3306;
conf.username = conf.passwd = "hhsgsb";
 
MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
mysqlBinLogListener.parseArgsAndRun(args);
mysqlBinLogListener.regListener("pf", "student", item -> {
System.out.println(new String((byte[])item.getAfter().get("name")));
logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
});
mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));
 
mysqlBinLogListener.parse();
}

在這段很少的代碼里,注冊了兩個監聽者,分別監聽student和teacher表,并分別進行打印處理,經測試,在teacher表插入數據時,可以獨立的運行定義的業務邏輯.

注意:這里的工具類并不能直接投入使用,因為里面有許多的異常處理沒有做,且功能僅監聽了插入語句,可以用來做實現的參考.

參考文章

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。

原文鏈接:https://juejin.im/post/5d2c2464e51d45106b15ffc3

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲色图第四色 | 欧美视频久久 | 国内精品久久久久久野外 | 羞羞污视频 | 无码AV毛片色欲欧洲美洲 | yellow视频免费观看播放 | 国产99区 | 欧美视频黑鬼大战白妞 | 特黄特黄aaaa级毛片免费看 | 97自拍视频在线观看 | 欧美日韩国产手机在线观看视频 | 狠狠做五月深爱婷婷天天综合 | 国偷盗摄自产福利一区在线 | 国产情侣偷国语对白 | 欧美最猛性xxxxx短视频 | bt天堂在线最新版在线 | 日本xxxx在线视频免费 | 娇妻与公陈峰姚瑶小说在线阅读 | 成人黄色免费网站 | 美女下面揉出水免费视频 | 秋霞网毛片 | caoporn超碰最新地址进入 | 欧美久在线观看在线观看 | 亚洲国内精品 | 欧美人鲁交大全 | 狠狠综合网 | 午夜影院和视费x看 | 9久re在线观看视频精品 | 日韩首页| 白丝超短裙被输出娇喘不停小说 | 糖心在线观看网 | 四虎永久免费在线观看 | 农夫69小说小雨与农村老太 | 国产精品1页 | 国产成人精选免费视频 | 香蕉eeww99国产精品 | 欧美日韩国产一区二区三区不卡 | 欧美亚洲国产精品久久第一页 | 国产第9页 | 动漫人物差差插曲漫画 | 国产乱码在线精品可播放 |