本文介紹了詳解Python實現多進程異步事件驅動引擎,分享給大家,具體如下:
多進程異步事件驅動邏輯
邏輯
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# -*- coding: utf-8 -*- ''' author: Jimmy contact: [email protected] file: eventEngine.py time: 2017/8/25 上午10:06 description: 多進程異步事件驅動引擎 ''' __author__ = 'Jimmy' from multiprocessing import Process, Queue class EventEngine( object ): # 初始化事件事件驅動引擎 def __init__( self ): #保存事件列表 self .__eventQueue = Queue() #引擎開關 self .__active = False #事件處理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]} self .__handlers = {} #保存事件處理進程池 self .__processPool = [] #事件引擎主進程 self .__mainProcess = Process(target = self .__run) #執行事件循環 def __run( self ): while self .__active: #事件隊列非空 if not self .__eventQueue.empty(): #獲取隊列中的事件 超時1秒 event = self .__eventQueue.get(block = True ,timeout = 1 ) #執行事件 self .__process(event) else : # print('無任何事件') pass #執行事件 def __process( self , event): if event. type in self .__handlers: for handler in self .__handlers[event. type ]: #開一個進程去異步處理 p = Process(target = handler, args = (event, )) #保存到進程池 self .__processPool.append(p) p.start() #開啟事件引擎 def start( self ): self .__active = True self .__mainProcess.start() #暫停事件引擎 def stop( self ): """停止""" # 將事件管理器設為停止 self .__active = False # 等待事件處理進程退出 for p in self .__processPool: p.join() self .__mainProcess.join() #終止事件引擎 def terminate( self ): self .__active = False #終止所有事件處理進程 for p in self .__processPool: p.terminate() self .__mainProcess.join() #注冊事件 def register( self , type , handler): """注冊事件處理函數監聽""" # 嘗試獲取該事件類型對應的處理函數列表,若無則創建 try : handlerList = self .__handlers[ type ] except KeyError: handlerList = [] self .__handlers[ type ] = handlerList # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件 if handler not in handlerList: handlerList.append(handler) def unregister( self , type , handler): """注銷事件處理函數監聽""" # 嘗試獲取該事件類型對應的處理函數列表,若無則忽略該次注銷請求 try : handlerList = self .__handlers[ type ] # 如果該函數存在于列表中,則移除 if handler in handlerList: handlerList.remove(handler) # 如果函數列表為空,則從引擎中移除該事件類型 if not handlerList: del self .__handlers[ type ] except KeyError: pass def sendEvent( self , event): #發送事件 像隊列里存入事件 self .__eventQueue.put(event) class Event( object ): #事件對象 def __init__( self , type = None ): self . type = type self . dict = {} #測試 if __name__ = = '__main__' : import time EVENT_ARTICAL = "Event_Artical" # 事件源 公眾號 class PublicAccounts: def __init__( self , eventManager): self .__eventManager = eventManager def writeNewArtical( self ): # 事件對象,寫了新文章 event = Event(EVENT_ARTICAL) event. dict [ "artical" ] = u '如何寫出更優雅的代碼\n' # 發送事件 self .__eventManager.sendEvent(event) print (u '公眾號發送新文章\n' ) # 監聽器 訂閱者 class ListenerTypeOne: def __init__( self , username): self .__username = username # 監聽器的處理函數 讀文章 def ReadArtical( self , event): print (u '%s 收到新文章' % self .__username) print (u '%s 正在閱讀新文章內容:%s' % ( self .__username, event. dict [ "artical" ])) class ListenerTypeTwo: def __init__( self , username): self .__username = username # 監聽器的處理函數 讀文章 def ReadArtical( self , event): print (u '%s 收到新文章 睡3秒再看' % self .__username) time.sleep( 3 ) print (u '%s 正在閱讀新文章內容:%s' % ( self .__username, event. dict [ "artical" ])) def test(): listner1 = ListenerTypeOne( "thinkroom" ) # 訂閱者1 listner2 = ListenerTypeTwo( "steve" ) # 訂閱者2 ee = EventEngine() # 綁定事件和監聽器響應函數(新文章) ee.register(EVENT_ARTICAL, listner1.ReadArtical) ee.register(EVENT_ARTICAL, listner2.ReadArtical) for i in range ( 0 , 20 ): listner3 = ListenerTypeOne( "Jimmy" ) # 訂閱者X ee.register(EVENT_ARTICAL, listner3.ReadArtical) ee.start() #發送事件 publicAcc = PublicAccounts(ee) publicAcc.writeNewArtical() test() |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.jianshu.com/p/5e7786166157?utm_source=tuicool&utm_medium=referral