一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數(shù)據(jù)庫技術(shù)|

服務(wù)器之家 - 數(shù)據(jù)庫 - Mysql - MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

2022-01-12 18:14李雷 Mysql

mysql要同步原始全量數(shù)據(jù),也要實時同步MySQL特定庫的特定表增量數(shù)據(jù),同時對應(yīng)的修改、刪除也要對應(yīng),下面就為大家分享一下

1、原始需求

既要同步原始全量數(shù)據(jù),也要實時同步mysql特定庫的特定表增量數(shù)據(jù),同時對應(yīng)的修改、刪除也要對應(yīng)。

數(shù)據(jù)同步不能有侵入性:不能更改業(yè)務(wù)程序,并且不能對業(yè)務(wù)側(cè)有太大性能壓力。

應(yīng)用場景:數(shù)據(jù)etl同步、降低業(yè)務(wù)服務(wù)器壓力。

2、解決方案

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

3、canal介紹、安裝

canal是阿里巴巴旗下的一款開源項目,純java開發(fā)。基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費,目前主要支持了mysql(也支持mariadb)。

工作原理:mysql主備復(fù)制實現(xiàn)

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

從上層來看,復(fù)制分成三步:

  1. master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
  2. slave將master的binary log events拷貝到它的中繼日志(relay log);
  3. slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。

canal的工作原理

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

原理相對比較簡單:

  1. canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log對象(原始為byte流)

架構(gòu)

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

說明:

  • server代表一個canal運行實例,對應(yīng)于一個jvm
  • instance對應(yīng)于一個數(shù)據(jù)隊列 (1個server對應(yīng)1..n個instance)

instance模塊:

  • eventparser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進行交互,協(xié)議解析)
  • eventsink (parser和store鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作)
  • eventstore (數(shù)據(jù)存儲)
  • metamanager (增量訂閱&消費信息管理器)

安裝

1、mysql、kafka環(huán)境準備

2、canal下載:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz

4、對目錄conf里文件參數(shù)配置

對canal.properties配置:

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

進入conf/example里,對instance.properties配置:

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

5、啟動:bin/startup.sh

6、日志查看:

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

4、驗證

1、開發(fā)對應(yīng)的kafka消費者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package org.kafka;
 
import java.util.arrays;
import java.util.properties;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;
 
 
/**
 *
 * title: kafkaconsumertest
 * description:
 *  kafka消費者 demo
 * version:1.0.0
 * @author pancm
 * @date 2018年1月26日
 */
public class kafkaconsumertest implements runnable {
 
    private final kafkaconsumer<string, string> consumer;
    private consumerrecords<string, string> msglist;
    private final string topic;
    private static final string groupid = "groupa";
 
    public kafkaconsumertest(string topicname) {
        properties props = new properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", stringdeserializer.class.getname());
        props.put("value.deserializer", stringdeserializer.class.getname());
        this.consumer = new kafkaconsumer<string, string>(props);
        this.topic = topicname;
        this.consumer.subscribe(arrays.aslist(topic));
    }
 
    @override
    public void run() {
        int messageno = 1;
        system.out.println("---------開始消費---------");
        try {
            for (; ; ) {
                msglist = consumer.poll(1000);
                if (null != msglist && msglist.count() > 0) {
                    for (consumerrecord<string, string> record : msglist) {
                        //消費100條就打印 ,但打印的數(shù)據(jù)不一定是這個規(guī)律的
 
                            system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
 
 
//                            string v = decodeunicode(record.value());
 
//                            system.out.println(v);
 
                        //當消費了1000條就退出
                        if (messageno % 1000 == 0) {
                            break;
                        }
                        messageno++;
                    }
                } else {
                    thread.sleep(11);
                }
            }
        } catch (interruptedexception e) {
            e.printstacktrace();
        } finally {
            consumer.close();
        }
    }
 
    public static void main(string args[]) {
        kafkaconsumertest test1 = new kafkaconsumertest("sample-data");
        thread thread1 = new thread(test1);
        thread1.start();
    }
 
 
    /*
     * 中文轉(zhuǎn)unicode編碼
     */
    public static string gbencoding(final string gbstring) {
        char[] utfbytes = gbstring.tochararray();
        string unicodebytes = "";
        for (int i = 0; i < utfbytes.length; i++) {
            string hexb = integer.tohexstring(utfbytes[i]);
            if (hexb.length() <= 2) {
                hexb = "00" + hexb;
            }
            unicodebytes = unicodebytes + "\\u" + hexb;
        }
        return unicodebytes;
    }
 
    /*
     * unicode編碼轉(zhuǎn)中文
     */
    public static string decodeunicode(final string datastr) {
        int start = 0;
        int end = 0;
        final stringbuffer buffer = new stringbuffer();
        while (start > -1) {
            end = datastr.indexof("\\u", start + 2);
            string charstr = "";
            if (end == -1) {
                charstr = datastr.substring(start + 2, datastr.length());
            } else {
                charstr = datastr.substring(start + 2, end);
            }
            char letter = (char) integer.parseint(charstr, 16); // 16進制parse整形字符串。
            buffer.append(new character(letter).tostring());
            start = end;
        }
        return buffer.tostring();
 
    }
}

2、對表bak1進行增加數(shù)據(jù)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
create table `bak1` (
  `vin` varchar(20) not null,
  `p1` double default null,
  `p2` double default null,
  `p3` double default null,
  `p4` double default null,
  `p5` double default null,
  `p6` double default null,
  `p7` double default null,
  `p8` double default null,
  `p9` double default null,
  `p0` double default null
) engine=innodb default charset=utf8mb4
 
show create table bak1;
 
insert into bak1 select '李雷abcv',
  `p1` ,
  `p2` ,
  `p3` ,
  `p4` ,
  `p5` ,
  `p6` ,
  `p7` ,
  `p8` ,
  `p9` ,
  `p0`  from moci limit 10

3、查看輸出結(jié)果:

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案

到此這篇關(guān)于mysql特定表全量、增量數(shù)據(jù)同步到消息隊列-解決方案的文章就介紹到這了,更多相關(guān)mysql特定表數(shù)據(jù)同步內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!

原文鏈接:https://www.cnblogs.com/lilei2blog/p/15608206.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 四虎黄色影视 | 草莓香蕉榴莲丝瓜秋葵绿巨人在线看 | 深夜啪啪网站 | 2020年最新国产精品视频免费 | 国产成人福利美女观看视频 | 视频一区在线观看 | 亚洲精品一二三四 | 成人涩涩屋福利视频 | miaa076深田咏美在线 | 国色天香社区视频在线观看免费完整版 | 免费观看毛片视频 | 大乳孕妇一级毛片 | 欧美在线视频 一区二区 | 青青草原国产一区二区 | 国产成人激烈叫床视频 | 忘忧草在线社区WWW日本直播 | 国产伦精品一区二区三区免 | 人人澡 人人澡碰人人看软件 | 黄 色 大 片 网站 | 国产一区二区三区日韩 | 国产高清在线精品一区 | 久久久高清国产999尤物 | 久久视频这有精品63在线国产 | 亚洲区精品 | 扒开双腿猛进入爽爽视频ai | 秋葵丝瓜茄子草莓榴莲樱桃 | 视频高清在线观看 | 91免费永久在线地址 | 极品丝袜老师h系列全文阅读 | 奇米影视77777 | 国产清纯91天堂在线观看 | bnb99八度免费影院 | 特黄aa级毛片免费视频播放 | 国产短视频精品一区二区三区 | 成人在线视频播放 | 韩剧在线观看 | 毛片免费全部免费观看 | 婷婷综合七月激情啪啪 | 99精品视频免费 | 亚洲AV国产国产久青草 | 亚洲一区二区三区久久精品 |