一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python的消息隊列包SnakeMQ使用初探

Python的消息隊列包SnakeMQ使用初探

2020-08-30 10:04Python教程網 Python

使用消息隊列在數據的通信中擁有很多優點,SnakeMQ是一個開源的用Python實現的跨平臺MQ庫,well,Python的消息隊列包SnakeMQ使用初探,here we go:

一、關于snakemq的官方介紹
SnakeMQ的GitHub項目頁:https://github.com/dsiroky/snakemq
1.純python實現,跨平臺

2.自動重連接

3.可靠發送--可配置的消息方式與消息超時方式

4.持久化/臨時 兩種隊列

5.支持異步 -- poll()

6.symmetrical -- 單個TCP連接可用于雙工通訊

7.多數據庫支持 -- SQLite、MongoDB……

8.brokerless - 類似ZeroMQ的實現原理

9.擴展模塊:RPC, bandwidth throttling

以上都是官話,需要自己驗證,動手封裝了一下,感覺萌萌噠。

 

二、幾個主要問題說明

1.支持自動重連,不需要自己動手寫心跳邏輯,你只需要關注發送和接收就行

2.支持數據持久化,如果開始持久化,在重連之后會自動發送數據。

3.數據的接收,snakemq通過提供回調實現,你只需要寫個接收方法添加到回調列表里去。

4.數據的發送,在此發送的都是bytes類型(二進制),因此需要轉換。我在程序中測試的都是文本字符串,使用str.encode(‘utf-8')轉換成bytes,接收時再轉換回來。

5.術語解釋,Connector:類似于socket的TcpClient,Lisenter:類似于socket的TcpServer,每個connector或者listener都一個一個ident標識,發送和接收數據時就知道是誰的數據了。

6.使用sqlite持久化時,需要修改源碼,sqlite3.connect(filename,check_same_thread = False),用于解決多線程訪問sqlite的問題。(會不會死鎖?)

7.啟動持久化時,如果重新連上,則會自動發送,保證可靠。

8.為了封裝的需要,數據接收以后,我通過callback方式傳送出去。

 

三、代碼

說明代碼中使用了自定義的日志模塊

?
1
2
3
from common import nxlogger
 
import snakemqlogger as logger

可替換成logging的。

回調類(callbacks.py):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# -*- coding:utf-8 -*-
 
'''synchronized callback'''
 
class Callback(object):
 
  def __init__(self):
 
    self.callbacks = []
 
 
 
  def add(self, func):
 
    self.callbacks.append(func)
 
 
 
  def remove(self, func):
 
    self.callbacks.remove(func)
 
 
 
  def __call__(self, *args, **kwargs):
 
    for callback in self.callbacks:
 
      callback(*args, **kwargs)

Connector類(snakemqConnector.py):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# -*- coding:utf-8 -*-
 
import threading
 
import snakemq
 
import snakemq.link
 
import snakemq.packeter
 
import snakemq.messaging
 
import snakemq.message
 
from snakemq.storage.sqlite import SqliteQueuesStorage
 
from snakemq.message import FLAG_PERSISTENT
 
from common.callbacks import Callback
 
 
 
from common import nxlogger
 
import snakemqlogger as logger
 
 
 
class SnakemqConnector(threading.Thread):
 
     def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False):
 
         super(SnakemqConnector,self).__init__()
 
         self.messaging = None
 
         self.link = None
 
         self.snakemqident = snakemqident
 
         self.pktr = None
 
         self.remoteIp = remoteIp
 
         self.remotePort = remotePort
 
         self.persistent = persistent
 
         self.on_recv = Callback()
 
         self._initConnector()
 
 
 
     def run(self):
 
         logger.info("connector start...")
 
         
 
         if self.link != None:
 
              self.link.loop()
 
 
 
         logger.info("connector end...")
 
    
 
     def terminate(self):
 
         logger.info("connetor terminating...")
 
         if self.link != None:
 
              self.link.stop()
 
              self.link.cleanup()
 
         logger.info("connetor terminated")
 
 
 
     def on_recv_message(self, conn, ident, message):
 
         try:
 
              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data
 
         except Exception as e:
 
              logger.error("connector recv:{0}".format(e))
 
              print(e)
 
 
 
     '''send message to dest host named destIdent'''
 
     def sendMsg(self, destIdent, byteseq):
 
         msg = None
 
         if self.persistent:
 
              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)
 
         else:
 
              msg = snakemq.message.Message(byteseq, ttl=60)
 
         if self.messaging == None:
 
              logger.error("connector:messaging is not initialized, send message failed")
 
              return
 
         self.messaging.send_message(destIdent, msg)
 
 
 
     '''
 
    
 
     '''
 
     def _initConnector(self):
 
         try:
 
              self.link = snakemq.link.Link()
 
              self.link.add_connector((self.remoteIp, self.remotePort))
 
 
 
              self.pktr = snakemq.packeter.Packeter(self.link)
 
 
 
              if self.persistent:
 
                  storage = SqliteQueuesStorage("SnakemqStorage.db")
 
                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)
 
              else:
 
                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)
 
             
 
              self.messaging.on_message_recv.add(self.on_recv_message)
 
             
 
         except Exception as e:
 
              logger.error("connector:{0}".format(e))
 
         finally:
 
              logger.info("connector[{0}] loop ended...".format(self.snakemqident))

 Listener類(snakemqListener.py):

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# -*- coding:utf-8 -*-
 
import threading
 
import snakemq
 
import snakemq.link
 
import snakemq.packeter
 
import snakemq.messaging
 
import snakemq.message
 
from common import nxlogger
 
import snakemqlogger as logger
 
from common.callbacks import Callback
 
class SnakemqListener(threading.Thread):
 
     def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False):
 
         super(SnakemqListener,self).__init__()
 
         self.messaging = None
 
         self.link = None
 
         self.pktr = None
 
         self.snakemqident = snakemqident
 
         self.ip = ip;
 
         self.port = port
 
         self.connectors = {}
 
         self.on_recv = Callback()
 
         self.persistent = persistent
 
         self._initlistener()
 
 
 
     '''
 
     thread run
 
     '''
 
     def run(self):
 
         logger.info("listener start...")
 
         
 
         if self.link != None:
 
              self.link.loop()
 
 
 
         logger.info("listener end...")
 
 
 
     '''
 
     terminate snakemq listener thread
 
     '''
 
     def terminate(self):
 
         logger.info("listener terminating...")
 
         if self.link != None:
 
              self.link.stop()
 
              self.link.cleanup()
 
         logger.info("listener terminated")
 
 
 
     '''
 
     receive message from host named ident
 
     '''
 
     def on_recv_message(self, conn, ident, message):
 
         try:
 
              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data
 
              self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))
 
         except Exception as e:
 
              logger.error("listener recv:{0}".format(e))
 
              print(e)
 
 
 
     def on_drop_message(self, ident, message):
 
         print("message dropped", ident, message)
 
         logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message))
 
 
 
     '''client connect'''
 
     def on_connect(self, ident):
 
         logger.debug("listener:{0} connected".format(ident))
 
         self.connectors[ident] = ident
 
         self.sendMsg(ident, "hello".encode('utf-8'))
 
 
 
     '''client disconnect'''
 
     def on_disconnect(self, ident):
 
         logger.debug("listener:{0} disconnected".format(ident))
 
         if ident in self.connectors:
 
              self.connectors.pop(ident)
 
 
 
     '''
 
     listen start loop
 
     '''
 
     def _initlistener(self):
 
         try:
 
              self.link = snakemq.link.Link()
 
              self.link.add_listener((self.ip, self.port))
 
 
 
              self.pktr = snakemq.packeter.Packeter(self.link)
 
              self.pktr.on_connect.add(self.on_connect)
 
              self.pktr.on_disconnect.add(self.on_disconnect)
 
 
 
              if self.persistent:
 
                  storage = SqliteQueuesStorage("SnakemqStorage.db")
 
                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)
 
              else:
 
                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)
 
             
 
              self.messaging.on_message_recv.add(self.on_recv_message)
 
              self.messaging.on_message_drop.add(self.on_drop_message)
 
 
 
         except Exception as e:
 
              logger.error("listener:{0}".format(e))
 
         finally:
 
              logger.info("listener:loop ended...")
 
     '''send message to dest host named destIdent'''
 
     def sendMsg(self, destIdent, byteseq):
 
         msg = None
 
         if self.persistent:
 
              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)
 
         else:
 
              msg = snakemq.message.Message(byteseq, ttl=60)
 
         if self.messaging == None:
 
              logger.error("listener:messaging is not initialized, send message failed")
 
              return
 
         self.messaging.send_message(destIdent, msg)

測試代碼connector(testSnakeConnector.py):

讀取本地一個1M的文件,然后發送給listener,然后listener發回一個hello的信息。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from netComm.snakemq import snakemqConnector
 
import time
 
import sys
 
import os
 
def received(ident, data):
 
     print(data)
 
 
 
if __name__ == "__main__":
 
     bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)
 
     bob.on_recv.add(received)
 
     bob.start()
 
     try:
 
         with open("testfile.txt",encoding='utf-8') as f:
 
              txt = f.read()
 
              for i in range(100):
 
                  bob.sendMsg("niess",txt.encode('utf-8'))
 
                  time.sleep(0.1)
 
     except Exception as e:
 
         print(e)
 
     time.sleep(5)
 
     bob.terminate()  
 
 
 
測試代碼listener(testSnakeListener.py):
 
from netComm.snakemq import snakemqListener
 
import time
 
 
 
def received(ident, data):
 
     filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))
 
     file = open(filename,'w')
 
     file.writelines(data)
 
     file.close()
 
 
 
if __name__ == "__main__":
 
     niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002)
 
     niess.on_recv.add(received)
 
     niess.start()
 
     print("niess start...")
 
     time.sleep(60)
 
     niess.terminate() 
 
     print("niess end...")

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 大学生特黄特色大片免费播放 | 1377大但人文艺术包子铺 | 四虎黄色影视 | 亚洲欧美日韩久久一区 | 女子张腿让男人桶免费 | 美女模特被c免费视频 | 日韩在线一区二区三区免费视频 | 国产亚洲精品九九久在线观看 | 99精品网 | 97社区| 青青国产成人久久激情91麻豆 | 日本免费高清在线观看播放 | 好男人在线观看免费高清2019韩剧 | 97香蕉超级碰碰碰久久兔费 | 国产精品久久久久久久免费大片 | 2021小妲己永久回家地址 | 欧美精品一区二区在线观看播放 | 国产伊人网 | 国产成人精品日本亚洲网址 | 天天操天天草 | 亚洲国产免费观看视频 | 国产卡一卡二卡三乱码手机 | 免费一级特黄特色大片在线 | 男生和女生搞逼逼 | 成人午夜视频一区二区国语 | 国产福利在线免费观看 | 久久内在线视频精品mp4 | 日本暖暖视频在线观看 | 婷婷激情综合五月天 | 美女脱了内裤让男桶爽 | 99久久久久国产精品免费 | 91韩国女主播 | 婷婷婷色 | 亚洲人成网站在线观看妞妞网 | 精品卡1卡2卡三卡免费网站 | 91色在线观看国产 | 国产亚洲精aa在线观看不卡 | 变形金刚第一部 | 26uuu老色哥| 日韩一区二区三区四区五区 | 国产欧美在线播放 |