今天老師要給大家介紹一個(gè)比較特別的 RPC 服務(wù)器模型,這個(gè)模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多進(jìn)程并發(fā)模型。
Nginx 并發(fā)模型
我們知道 Nginx 的并發(fā)模型是一個(gè)多進(jìn)程并發(fā)模型,它的 Master 進(jìn)程在綁定監(jiān)聽(tīng)地址端口后 fork 出了多個(gè) Slave 進(jìn)程共同競(jìng)爭(zhēng)處理這個(gè)服務(wù)端套接字接收到的很多客戶端連接。
這多個(gè) Slave 進(jìn)程會(huì)共享同一個(gè)處于操作系統(tǒng)內(nèi)核態(tài)的套接字隊(duì)列,操作系統(tǒng)的網(wǎng)絡(luò)模塊在處理完三次握手后就會(huì)將套接字塞進(jìn)這個(gè)隊(duì)列。這是一個(gè)生產(chǎn)者消費(fèi)者模型,生產(chǎn)者是操作系統(tǒng)的網(wǎng)絡(luò)模塊,消費(fèi)者是多個(gè) Slave 進(jìn)程,隊(duì)列中的對(duì)象是客戶端套接字。
這種模型在負(fù)載均衡上有一個(gè)缺點(diǎn),那就是套接字分配不均勻,形成了類似于貧富分化的局面,也就是「閑者愈閑,忙者愈忙」的狀態(tài)。這是因?yàn)楫?dāng)多個(gè)進(jìn)程競(jìng)爭(zhēng)同一個(gè)套接字隊(duì)列時(shí),操作系統(tǒng)采用了 LIFO 的策略,最后一個(gè)來(lái) accept 的進(jìn)程最優(yōu)先拿到 套接字。越是繁忙的進(jìn)程越是有更多的機(jī)會(huì)調(diào)用 accept,它能拿到的套接字也就越多。
Node Cluster 并發(fā)模型
Node Cluster 為了解決負(fù)載均衡問(wèn)題,它采用了不同的策略。它也是多進(jìn)程并發(fā)模型,Master 進(jìn)程會(huì) fork 出多個(gè)子進(jìn)程來(lái)處理客戶端套接字。但是不存在競(jìng)爭(zhēng)問(wèn)題,因?yàn)樨?fù)責(zé) accept 套接字的只能是 Master 進(jìn)程,Slave 進(jìn)程只負(fù)責(zé)處理客戶端套接字請(qǐng)求。那就存在一個(gè)問(wèn)題,Master 進(jìn)程拿到的客戶端套接字如何傳遞給 Slave 進(jìn)程。
這時(shí),神奇的 sendmsg 登場(chǎng)了。它是操作系統(tǒng)提供的系統(tǒng)調(diào)用,可以在不同的進(jìn)程之間傳遞文件描述符。sendmsg 會(huì)搭乘一個(gè)特殊的「管道」將 Master 進(jìn)程的套接字描述符傳遞到 Slave 進(jìn)程,Slave 進(jìn)程通過(guò) recvmsg 系統(tǒng)調(diào)用從這個(gè)「管道」中將描述符取出來(lái)。這個(gè)「管道」比較特殊,它是 Unix 域套接字。普通的套接字可以跨機(jī)器傳輸消息,Unix 域套接字只能在同一個(gè)機(jī)器的不同進(jìn)程之間傳遞消息。同管道一樣,Unix 域套接字也分為有名套接字和無(wú)名套接字,有名套接字會(huì)在文件系統(tǒng)指定一個(gè)路徑名,無(wú)關(guān)進(jìn)程之間都可以通過(guò)這個(gè)路徑來(lái)訪問(wèn) Unix 域套接字。而無(wú)名套接字一般用于父子進(jìn)程之間,父進(jìn)程會(huì)通過(guò) socketpair 調(diào)用來(lái)創(chuàng)建套接字,然后 fork 出來(lái)子進(jìn)程,這樣子進(jìn)程也會(huì)同時(shí)持有這個(gè)套接字的引用。后續(xù)父子進(jìn)程就可以通過(guò)這個(gè)套接字互相通信。
注意這里的傳遞描述符,本質(zhì)上不是傳遞,而是復(fù)制。父進(jìn)程的描述符并不會(huì)在 sendmsg 自動(dòng)關(guān)閉自動(dòng)消失,子進(jìn)程收到的描述符和父進(jìn)程的描述符也不是同一個(gè)整數(shù)值。但是父子進(jìn)程的描述符都會(huì)指向同一個(gè)內(nèi)核套接字對(duì)象。
有了描述符的傳遞能力,父進(jìn)程就可以將 accept 到的客戶端套接字輪流傳遞給多個(gè) Slave 進(jìn)程,負(fù)載均衡的目標(biāo)就可以順利實(shí)現(xiàn)了。
接下來(lái)我們就是用 Python 代碼來(lái)擼一遍 Node Cluster 的并發(fā)模型。因?yàn)?sendmsg 和 recvmsg 方法到了 Python3.5 才內(nèi)置進(jìn)來(lái),所以下面的代碼需要使用 Python3.5+才可以運(yùn)行。
我們看 sendmsg 方法的定義
- socket.sendmsg(buffers[, ancdata[, flags[, address]]])
我們只需要關(guān)心第二個(gè)參數(shù) ancdata,描述符是通過(guò)ancdata 參數(shù)傳遞的,它的意思是 「輔助數(shù)據(jù)」,而 buffers 表示需要傳遞的消息內(nèi)容,因?yàn)橄?nèi)容這里沒(méi)有意義,所以這個(gè)字段可以任意填寫(xiě),但是必須要有內(nèi)容,如果沒(méi)有內(nèi)容,sendmsg 方法就是一個(gè)空調(diào)用。
- import socket, structdef send_fds(sock, fd): return sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("i", fd))])# ancdata 參數(shù)是一個(gè)三元組的列表,三元組的第一個(gè)參數(shù)表示網(wǎng)絡(luò)協(xié)議棧級(jí)別 level,第二個(gè)參數(shù)表示輔助數(shù)據(jù)的類型 type,第三個(gè)參數(shù)才是攜帶的數(shù)據(jù),level=SOL_SOCKET 表示傳遞的數(shù)據(jù)處于 TCP 協(xié)議層級(jí),type=SCM_RIGHTS 就表示攜帶的數(shù)據(jù)是文件描述符。我們傳遞的描述符 fd 是一個(gè)整數(shù),需要使用 struct 包將它序列化成二進(jìn)制。
再看 recvmsg 方法的定義
- msg, ancdata, flags, addr = socket.recvmsg(bufsize[, ancbufsize[, flags]])
同樣,我們只需要關(guān)心返回的 ancdata 數(shù)據(jù),它里面包含了我們需要的文件描述符。但是需要提供消息體的長(zhǎng)度和輔助數(shù)據(jù)的長(zhǎng)度參數(shù)。輔助數(shù)據(jù)的長(zhǎng)度比較特殊,需要使用 CMSG_LEN 方法來(lái)計(jì)算,因?yàn)檩o助數(shù)據(jù)里面還有我們看不到的額外的頭部信息。
- bufsize = 1 # 消息內(nèi)容的長(zhǎng)度
- ancbufsize = socket.CMSG_LEN(struct.calcsize('i')) # 輔助數(shù)據(jù)的長(zhǎng)度
- msg, ancdata, flags, addr = socket.recvmsg(bufsize, ancbufsize) # 收取消息
- level, type, fd_bytes = ancdata[0] # 取第一個(gè)元祖,注意發(fā)送消息時(shí)我們傳遞的是一個(gè)三元組的列表
- fd = struct.unpack('i', fd_bytes) # 反序列化
代碼實(shí)現(xiàn)
下面我來(lái)獻(xiàn)上完整的服務(wù)器代碼,為了簡(jiǎn)單起見(jiàn),我們?cè)?Slave 進(jìn)程中處理 RPC 請(qǐng)求使用同步模型。
- # coding: utf
- # sendmsg recvmsg python3.5+才可以支持
- import os
- import json
- import struct
- import socket
- def handle_conn(conn, addr, handlers):
- print(addr, "comes")
- while True:
- # 簡(jiǎn)單起見(jiàn),這里就沒(méi)有使用循環(huán)讀取了
- length_prefix = conn.recv(4)
- if not length_prefix:
- print(addr, "bye")
- conn.close()
- break # 關(guān)閉連接,繼續(xù)處理下一個(gè)連接
- length, = struct.unpack("I", length_prefix)
- body = conn.recv(length)
- request = json.loads(body)
- in_ = request['in']
- params = request['params']
- print(in_, params)
- handler = handlers[in_]
- handler(conn, params)
- def loop_slave(pr, handlers):
- while True:
- bufsize = 1
- ancsize = socket.CMSG_LEN(struct.calcsize('i'))
- msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize)
- cmsg_level, cmsg_type, cmsg_data = ancdata[0]
- fd = struct.unpack('i', cmsg_data)[0]
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd)
- handle_conn(sock, sock.getpeername(), handlers)
- def ping(conn, params):
- send_result(conn, "pong", params)
- def send_result(conn, out, result):
- response = json.dumps({"out": out, "result": result}).encode('utf-8')
- length_prefix = struct.pack("I", len(response))
- conn.sendall(length_prefix)
- conn.sendall(response)
- def loop_master(serv_sock, pws):
- idx = 0
- while True:
- sock, addr = serv_sock.accept()
- pw = pws[idx % len(pws)]
- # 消息數(shù)據(jù),whatever
- msg = [b'x']
- # 輔助數(shù)據(jù),攜帶描述符
- ancdata = [(
- socket.SOL_SOCKET,
- socket.SCM_RIGHTS,
- struct.pack('i', sock.fileno()))]
- pw.sendmsg(msg, ancdata)
- sock.close() # 關(guān)閉引用
- idx += 1
- def prefork(serv_sock, n):
- pws = []
- for i in range(n):
- # 開(kāi)辟父子進(jìn)程通信「管道」
- pr, pw = socket.socketpair()
- pid = os.fork()
- if pid < 0: # fork error
- return pws
- if pid > 0:
- # 父進(jìn)程
- pr.close() # 父進(jìn)程不用讀
- pws.append(pw)
- continue
- if pid == 0:
- # 子進(jìn)程
- serv_sock.close() # 關(guān)閉引用
- pw.close() # 子進(jìn)程不用寫(xiě)
- return pr
- return pws
- if __name__ == '__main__':
- serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv_sock.bind(("localhost", 8080))
- serv_sock.listen(1)
- pws_or_pr = prefork(serv_sock, 10)
- if hasattr(pws_or_pr, '__len__'):
- if pws_or_pr:
- loop_master(serv_sock, pws_or_pr)
- else:
- # fork 全部失敗,沒(méi)有子進(jìn)程,Game Over
- serv_sock.close()
- else:
- handlers = {
- "ping": ping
- }
- loop_slave(pws_or_pr, handlers)
父進(jìn)程使用 fork 調(diào)用創(chuàng)建了多個(gè)子進(jìn)程,然后又使用 socketpair 調(diào)用為每一個(gè)子進(jìn)程都創(chuàng)建一個(gè)無(wú)名套接字用來(lái)傳遞描述符。父進(jìn)程使用 roundrobin 策略平均分配接收到的客戶端套接字。子進(jìn)程接收到的是一個(gè)描述符整數(shù),需要將描述符包裝成套接字對(duì)象后方可讀寫(xiě)。打印對(duì)比發(fā)送和接收到的描述符,你會(huì)發(fā)現(xiàn)它們倆的值并不相同,這是因?yàn)?sendmsg 將描述符發(fā)送到內(nèi)核后,內(nèi)核給描述符指向的內(nèi)核套接字又重新分配了一個(gè)新的描述符對(duì)象。