一、關于snakemq的官方介紹
SnakeMQ的GitHub項目頁:https://github.com/dsiroky/snakemq
1.純python實現,跨平臺
2.自動重連接
3.可靠發送--可配置的消息方式與消息超時方式
4.持久化/臨時 兩種隊列
5.支持異步 -- poll()
6.symmetrical -- 單個TCP連接可用于雙工通訊
7.多數據庫支持 -- SQLite、MongoDB……
8.brokerless - 類似ZeroMQ的實現原理
9.擴展模塊:RPC, bandwidth throttling
以上都是官話,需要自己驗證,動手封裝了一下,感覺萌萌噠。
二、幾個主要問題說明
1.支持自動重連,不需要自己動手寫心跳邏輯,你只需要關注發送和接收就行
2.支持數據持久化,如果開始持久化,在重連之后會自動發送數據。
3.數據的接收,snakemq通過提供回調實現,你只需要寫個接收方法添加到回調列表里去。
4.數據的發送,在此發送的都是bytes類型(二進制),因此需要轉換。我在程序中測試的都是文本字符串,使用str.encode(‘utf-8')轉換成bytes,接收時再轉換回來。
5.術語解釋,Connector:類似于socket的TcpClient,Lisenter:類似于socket的TcpServer,每個connector或者listener都一個一個ident標識,發送和接收數據時就知道是誰的數據了。
6.使用sqlite持久化時,需要修改源碼,sqlite3.connect(filename,check_same_thread = False),用于解決多線程訪問sqlite的問題。(會不會死鎖?)
7.啟動持久化時,如果重新連上,則會自動發送,保證可靠。
8.為了封裝的需要,數據接收以后,我通過callback方式傳送出去。
三、代碼
說明代碼中使用了自定義的日志模塊
1
2
3
|
from common import nxlogger import snakemqlogger as logger |
可替換成logging的。
回調類(callbacks.py):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# -*- coding:utf-8 -*- '''synchronized callback''' class Callback( object ): def __init__( self ): self .callbacks = [] def add( self , func): self .callbacks.append(func) def remove( self , func): self .callbacks.remove(func) def __call__( self , * args, * * kwargs): for callback in self .callbacks: callback( * args, * * kwargs) |
Connector類(snakemqConnector.py):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from snakemq.storage.sqlite import SqliteQueuesStorage from snakemq.message import FLAG_PERSISTENT from common.callbacks import Callback from common import nxlogger import snakemqlogger as logger class SnakemqConnector(threading.Thread): def __init__( self , snakemqident = None , remoteIp = "localhost" , remotePort = 9090 , persistent = False ): super (SnakemqConnector, self ).__init__() self .messaging = None self .link = None self .snakemqident = snakemqident self .pktr = None self .remoteIp = remoteIp self .remotePort = remotePort self .persistent = persistent self .on_recv = Callback() self ._initConnector() def run( self ): logger.info( "connector start..." ) if self .link ! = None : self .link.loop() logger.info( "connector end..." ) def terminate( self ): logger.info( "connetor terminating..." ) if self .link ! = None : self .link.stop() self .link.cleanup() logger.info( "connetor terminated" ) def on_recv_message( self , conn, ident, message): try : self .on_recv(ident, message.data.decode( 'utf-8' )) #dispatch received data except Exception as e: logger.error( "connector recv:{0}" . format (e)) print (e) '''send message to dest host named destIdent''' def sendMsg( self , destIdent, byteseq): msg = None if self .persistent: msg = snakemq.message.Message(byteseq, ttl = 60 , flags = FLAG_PERSISTENT) else : msg = snakemq.message.Message(byteseq, ttl = 60 ) if self .messaging = = None : logger.error( "connector:messaging is not initialized, send message failed" ) return self .messaging.send_message(destIdent, msg) ''' ''' def _initConnector( self ): try : self .link = snakemq.link.Link() self .link.add_connector(( self .remoteIp, self .remotePort)) self .pktr = snakemq.packeter.Packeter( self .link) if self .persistent: storage = SqliteQueuesStorage( "SnakemqStorage.db" ) self .messaging = snakemq.messaging.Messaging( self .snakemqident, "", self .pktr, storage) else : self .messaging = snakemq.messaging.Messaging( self .snakemqident, "", self .pktr) self .messaging.on_message_recv.add( self .on_recv_message) except Exception as e: logger.error( "connector:{0}" . format (e)) finally : logger.info( "connector[{0}] loop ended..." . format ( self .snakemqident)) |
Listener類(snakemqListener.py):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
|
# -*- coding:utf-8 -*- import threading import snakemq import snakemq.link import snakemq.packeter import snakemq.messaging import snakemq.message from common import nxlogger import snakemqlogger as logger from common.callbacks import Callback class SnakemqListener(threading.Thread): def __init__( self , snakemqident = None , ip = "localhost" , port = 9090 , persistent = False ): super (SnakemqListener, self ).__init__() self .messaging = None self .link = None self .pktr = None self .snakemqident = snakemqident self .ip = ip; self .port = port self .connectors = {} self .on_recv = Callback() self .persistent = persistent self ._initlistener() ''' thread run ''' def run( self ): logger.info( "listener start..." ) if self .link ! = None : self .link.loop() logger.info( "listener end..." ) ''' terminate snakemq listener thread ''' def terminate( self ): logger.info( "listener terminating..." ) if self .link ! = None : self .link.stop() self .link.cleanup() logger.info( "listener terminated" ) ''' receive message from host named ident ''' def on_recv_message( self , conn, ident, message): try : self .on_recv(ident, message.data.decode( 'utf-8' )) #dispatch received data self .sendMsg( 'bob' , 'hello,{0}' . format (ident).encode( 'utf-8' )) except Exception as e: logger.error( "listener recv:{0}" . format (e)) print (e) def on_drop_message( self , ident, message): print ( "message dropped" , ident, message) logger.debug( "listener:message dropped,ident:{0},message:{1}" . format (ident, message)) '''client connect''' def on_connect( self , ident): logger.debug( "listener:{0} connected" . format (ident)) self .connectors[ident] = ident self .sendMsg(ident, "hello" .encode( 'utf-8' )) '''client disconnect''' def on_disconnect( self , ident): logger.debug( "listener:{0} disconnected" . format (ident)) if ident in self .connectors: self .connectors.pop(ident) ''' listen start loop ''' def _initlistener( self ): try : self .link = snakemq.link.Link() self .link.add_listener(( self .ip, self .port)) self .pktr = snakemq.packeter.Packeter( self .link) self .pktr.on_connect.add( self .on_connect) self .pktr.on_disconnect.add( self .on_disconnect) if self .persistent: storage = SqliteQueuesStorage( "SnakemqStorage.db" ) self .messaging = snakemq.messaging.Messaging( self .snakemqident, "", self .pktr, storage) else : self .messaging = snakemq.messaging.Messaging( self .snakemqident, "", self .pktr) self .messaging.on_message_recv.add( self .on_recv_message) self .messaging.on_message_drop.add( self .on_drop_message) except Exception as e: logger.error( "listener:{0}" . format (e)) finally : logger.info( "listener:loop ended..." ) '''send message to dest host named destIdent''' def sendMsg( self , destIdent, byteseq): msg = None if self .persistent: msg = snakemq.message.Message(byteseq, ttl = 60 , flags = FLAG_PERSISTENT) else : msg = snakemq.message.Message(byteseq, ttl = 60 ) if self .messaging = = None : logger.error( "listener:messaging is not initialized, send message failed" ) return self .messaging.send_message(destIdent, msg) |
測試代碼connector(testSnakeConnector.py):
讀取本地一個1M的文件,然后發送給listener,然后listener發回一個hello的信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
from netComm.snakemq import snakemqConnector import time import sys import os def received(ident, data): print (data) if __name__ = = "__main__" : bob = snakemqConnector.SnakemqConnector( 'bob' , "10.16.5.45" , 4002 , True ) bob.on_recv.add(received) bob.start() try : with open ( "testfile.txt" ,encoding = 'utf-8' ) as f: txt = f.read() for i in range ( 100 ): bob.sendMsg( "niess" ,txt.encode( 'utf-8' )) time.sleep( 0.1 ) except Exception as e: print (e) time.sleep( 5 ) bob.terminate() 測試代碼listener(testSnakeListener.py): from netComm.snakemq import snakemqListener import time def received(ident, data): filename = "log/recFile{0}.txt" . format (time.strftime( '%S' ,time.localtime())) file = open (filename, 'w' ) file .writelines(data) file .close() if __name__ = = "__main__" : niess = snakemqListener.SnakemqListener( "niess" , "10.16.5.45" , 4002 ) niess.on_recv.add(received) niess.start() print ( "niess start..." ) time.sleep( 60 ) niess.terminate() print ( "niess end..." ) |