一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

云服務(wù)器|WEB服務(wù)器|FTP服務(wù)器|郵件服務(wù)器|虛擬主機(jī)|服務(wù)器安全|DNS服務(wù)器|服務(wù)器知識(shí)|Nginx|IIS|Tomcat|

服務(wù)器之家 - 服務(wù)器技術(shù) - 服務(wù)器知識(shí) - RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

2019-07-13 09:54碼洞老錢(qián) 服務(wù)器知識(shí)

今天老師要給大家介紹一個(gè)比較特別的 RPC 服務(wù)器模型,這個(gè)模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多進(jìn)程并發(fā)模型。

今天老師要給大家介紹一個(gè)比較特別的 RPC 服務(wù)器模型,這個(gè)模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多進(jìn)程并發(fā)模型。

RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

Nginx 并發(fā)模型

我們知道 Nginx 的并發(fā)模型是一個(gè)多進(jìn)程并發(fā)模型,它的 Master 進(jìn)程在綁定監(jiān)聽(tīng)地址端口后 fork 出了多個(gè) Slave 進(jìn)程共同競(jìng)爭(zhēng)處理這個(gè)服務(wù)端套接字接收到的很多客戶端連接。

RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

這多個(gè) Slave 進(jìn)程會(huì)共享同一個(gè)處于操作系統(tǒng)內(nèi)核態(tài)的套接字隊(duì)列,操作系統(tǒng)的網(wǎng)絡(luò)模塊在處理完三次握手后就會(huì)將套接字塞進(jìn)這個(gè)隊(duì)列。這是一個(gè)生產(chǎn)者消費(fèi)者模型,生產(chǎn)者是操作系統(tǒng)的網(wǎng)絡(luò)模塊,消費(fèi)者是多個(gè) Slave 進(jìn)程,隊(duì)列中的對(duì)象是客戶端套接字。

這種模型在負(fù)載均衡上有一個(gè)缺點(diǎn),那就是套接字分配不均勻,形成了類似于貧富分化的局面,也就是「閑者愈閑,忙者愈忙」的狀態(tài)。這是因?yàn)楫?dāng)多個(gè)進(jìn)程競(jìng)爭(zhēng)同一個(gè)套接字隊(duì)列時(shí),操作系統(tǒng)采用了 LIFO 的策略,最后一個(gè)來(lái) accept 的進(jìn)程最優(yōu)先拿到 套接字。越是繁忙的進(jìn)程越是有更多的機(jī)會(huì)調(diào)用 accept,它能拿到的套接字也就越多。

RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

Node Cluster 并發(fā)模型

Node Cluster 為了解決負(fù)載均衡問(wèn)題,它采用了不同的策略。它也是多進(jìn)程并發(fā)模型,Master 進(jìn)程會(huì) fork 出多個(gè)子進(jìn)程來(lái)處理客戶端套接字。但是不存在競(jìng)爭(zhēng)問(wèn)題,因?yàn)樨?fù)責(zé) accept 套接字的只能是 Master 進(jìn)程,Slave 進(jìn)程只負(fù)責(zé)處理客戶端套接字請(qǐng)求。那就存在一個(gè)問(wèn)題,Master 進(jìn)程拿到的客戶端套接字如何傳遞給 Slave 進(jìn)程。

RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

這時(shí),神奇的 sendmsg 登場(chǎng)了。它是操作系統(tǒng)提供的系統(tǒng)調(diào)用,可以在不同的進(jìn)程之間傳遞文件描述符。sendmsg 會(huì)搭乘一個(gè)特殊的「管道」將 Master 進(jìn)程的套接字描述符傳遞到 Slave 進(jìn)程,Slave 進(jìn)程通過(guò) recvmsg 系統(tǒng)調(diào)用從這個(gè)「管道」中將描述符取出來(lái)。這個(gè)「管道」比較特殊,它是 Unix 域套接字。普通的套接字可以跨機(jī)器傳輸消息,Unix 域套接字只能在同一個(gè)機(jī)器的不同進(jìn)程之間傳遞消息。同管道一樣,Unix 域套接字也分為有名套接字和無(wú)名套接字,有名套接字會(huì)在文件系統(tǒng)指定一個(gè)路徑名,無(wú)關(guān)進(jìn)程之間都可以通過(guò)這個(gè)路徑來(lái)訪問(wèn) Unix 域套接字。而無(wú)名套接字一般用于父子進(jìn)程之間,父進(jìn)程會(huì)通過(guò) socketpair 調(diào)用來(lái)創(chuàng)建套接字,然后 fork 出來(lái)子進(jìn)程,這樣子進(jìn)程也會(huì)同時(shí)持有這個(gè)套接字的引用。后續(xù)父子進(jìn)程就可以通過(guò)這個(gè)套接字互相通信。

RPC 服務(wù)器之【多進(jìn)程描述符傳遞】高階模型

注意這里的傳遞描述符,本質(zhì)上不是傳遞,而是復(fù)制。父進(jìn)程的描述符并不會(huì)在 sendmsg 自動(dòng)關(guān)閉自動(dòng)消失,子進(jìn)程收到的描述符和父進(jìn)程的描述符也不是同一個(gè)整數(shù)值。但是父子進(jìn)程的描述符都會(huì)指向同一個(gè)內(nèi)核套接字對(duì)象。

有了描述符的傳遞能力,父進(jìn)程就可以將 accept 到的客戶端套接字輪流傳遞給多個(gè) Slave 進(jìn)程,負(fù)載均衡的目標(biāo)就可以順利實(shí)現(xiàn)了。

接下來(lái)我們就是用 Python 代碼來(lái)擼一遍 Node Cluster 的并發(fā)模型。因?yàn)?sendmsg 和 recvmsg 方法到了 Python3.5 才內(nèi)置進(jìn)來(lái),所以下面的代碼需要使用 Python3.5+才可以運(yùn)行。

我們看 sendmsg 方法的定義


  1. socket.sendmsg(buffers[, ancdata[, flags[, address]]]) 

我們只需要關(guān)心第二個(gè)參數(shù) ancdata,描述符是通過(guò)ancdata 參數(shù)傳遞的,它的意思是 「輔助數(shù)據(jù)」,而 buffers 表示需要傳遞的消息內(nèi)容,因?yàn)橄?nèi)容這里沒(méi)有意義,所以這個(gè)字段可以任意填寫(xiě),但是必須要有內(nèi)容,如果沒(méi)有內(nèi)容,sendmsg 方法就是一個(gè)空調(diào)用。


  1. import socket, structdef send_fds(sock, fd): return sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("i", fd))])# ancdata 參數(shù)是一個(gè)三元組的列表,三元組的第一個(gè)參數(shù)表示網(wǎng)絡(luò)協(xié)議棧級(jí)別 level,第二個(gè)參數(shù)表示輔助數(shù)據(jù)的類型 type,第三個(gè)參數(shù)才是攜帶的數(shù)據(jù),level=SOL_SOCKET 表示傳遞的數(shù)據(jù)處于 TCP 協(xié)議層級(jí),type=SCM_RIGHTS 就表示攜帶的數(shù)據(jù)是文件描述符。我們傳遞的描述符 fd 是一個(gè)整數(shù),需要使用 struct 包將它序列化成二進(jìn)制。 

再看 recvmsg 方法的定義


  1. msg, ancdata, flags, addr = socket.recvmsg(bufsize[, ancbufsize[, flags]]) 

同樣,我們只需要關(guān)心返回的 ancdata 數(shù)據(jù),它里面包含了我們需要的文件描述符。但是需要提供消息體的長(zhǎng)度和輔助數(shù)據(jù)的長(zhǎng)度參數(shù)。輔助數(shù)據(jù)的長(zhǎng)度比較特殊,需要使用 CMSG_LEN 方法來(lái)計(jì)算,因?yàn)檩o助數(shù)據(jù)里面還有我們看不到的額外的頭部信息。


  1. bufsize = 1  # 消息內(nèi)容的長(zhǎng)度 
  2. ancbufsize = socket.CMSG_LEN(struct.calcsize('i'))  # 輔助數(shù)據(jù)的長(zhǎng)度 
  3. msg, ancdata, flags, addr = socket.recvmsg(bufsize, ancbufsize) # 收取消息 
  4. level, type, fd_bytes = ancdata[0] # 取第一個(gè)元祖,注意發(fā)送消息時(shí)我們傳遞的是一個(gè)三元組的列表 
  5. fd = struct.unpack('i', fd_bytes) # 反序列化 

代碼實(shí)現(xiàn)

下面我來(lái)獻(xiàn)上完整的服務(wù)器代碼,為了簡(jiǎn)單起見(jiàn),我們?cè)?Slave 進(jìn)程中處理 RPC 請(qǐng)求使用同步模型。


  1. # coding: utf 
  2. # sendmsg recvmsg python3.5+才可以支持 
  3.  
  4. import os 
  5. import json 
  6. import struct 
  7. import socket 
  8.  
  9.  
  10. def handle_conn(conn, addr, handlers): 
  11.     print(addr, "comes"
  12.     while True
  13.         # 簡(jiǎn)單起見(jiàn),這里就沒(méi)有使用循環(huán)讀取了 
  14.         length_prefix = conn.recv(4) 
  15.         if not length_prefix: 
  16.             print(addr, "bye"
  17.             conn.close() 
  18.             break  # 關(guān)閉連接,繼續(xù)處理下一個(gè)連接 
  19.         length, = struct.unpack("I", length_prefix) 
  20.         body = conn.recv(length) 
  21.         request = json.loads(body) 
  22.         in_ = request['in'
  23.         params = request['params'
  24.         print(in_, params) 
  25.         handler = handlers[in_] 
  26.         handler(conn, params) 
  27.  
  28.  
  29. def loop_slave(pr, handlers): 
  30.     while True
  31.         bufsize = 1 
  32.         ancsize = socket.CMSG_LEN(struct.calcsize('i')) 
  33.         msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize) 
  34.         cmsg_level, cmsg_type, cmsg_data = ancdata[0] 
  35.         fd = struct.unpack('i', cmsg_data)[0] 
  36.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) 
  37.         handle_conn(sock, sock.getpeername(), handlers) 
  38.  
  39.  
  40. def ping(conn, params): 
  41.     send_result(conn, "pong", params) 
  42.  
  43.  
  44. def send_result(conn, out, result): 
  45.     response = json.dumps({"out"out"result": result}).encode('utf-8'
  46.     length_prefix = struct.pack("I", len(response)) 
  47.     conn.sendall(length_prefix) 
  48.     conn.sendall(response) 
  49.  
  50.  
  51. def loop_master(serv_sock, pws): 
  52.     idx = 0 
  53.     while True
  54.         sock, addr = serv_sock.accept() 
  55.         pw = pws[idx % len(pws)] 
  56.         # 消息數(shù)據(jù),whatever 
  57.         msg = [b'x'
  58.         # 輔助數(shù)據(jù),攜帶描述符 
  59.         ancdata = [( 
  60.             socket.SOL_SOCKET, 
  61.             socket.SCM_RIGHTS, 
  62.             struct.pack('i', sock.fileno()))] 
  63.         pw.sendmsg(msg, ancdata) 
  64.         sock.close()  # 關(guān)閉引用 
  65.         idx += 1 
  66.  
  67.  
  68. def prefork(serv_sock, n): 
  69.     pws = [] 
  70.     for i in range(n): 
  71.         # 開(kāi)辟父子進(jìn)程通信「管道」 
  72.         pr, pw = socket.socketpair() 
  73.         pid = os.fork() 
  74.         if pid < 0:  # fork error 
  75.             return pws 
  76.         if pid > 0: 
  77.             # 父進(jìn)程 
  78.             pr.close()  # 父進(jìn)程不用讀 
  79.             pws.append(pw) 
  80.             continue 
  81.         if pid == 0: 
  82.             # 子進(jìn)程 
  83.             serv_sock.close()  # 關(guān)閉引用 
  84.             pw.close()  # 子進(jìn)程不用寫(xiě) 
  85.             return pr 
  86.     return pws 
  87.  
  88.  
  89. if __name__ == '__main__'
  90.     serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
  91.     serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
  92.     serv_sock.bind(("localhost", 8080)) 
  93.     serv_sock.listen(1) 
  94.     pws_or_pr = prefork(serv_sock, 10) 
  95.     if hasattr(pws_or_pr, '__len__'): 
  96.         if pws_or_pr: 
  97.             loop_master(serv_sock, pws_or_pr) 
  98.         else
  99.             # fork 全部失敗,沒(méi)有子進(jìn)程,Game Over 
  100.             serv_sock.close() 
  101.     else
  102.         handlers = { 
  103.             "ping": ping 
  104.         } 
  105.         loop_slave(pws_or_pr, handlers) 

父進(jìn)程使用 fork 調(diào)用創(chuàng)建了多個(gè)子進(jìn)程,然后又使用 socketpair 調(diào)用為每一個(gè)子進(jìn)程都創(chuàng)建一個(gè)無(wú)名套接字用來(lái)傳遞描述符。父進(jìn)程使用 roundrobin 策略平均分配接收到的客戶端套接字。子進(jìn)程接收到的是一個(gè)描述符整數(shù),需要將描述符包裝成套接字對(duì)象后方可讀寫(xiě)。打印對(duì)比發(fā)送和接收到的描述符,你會(huì)發(fā)現(xiàn)它們倆的值并不相同,這是因?yàn)?sendmsg 將描述符發(fā)送到內(nèi)核后,內(nèi)核給描述符指向的內(nèi)核套接字又重新分配了一個(gè)新的描述符對(duì)象。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 我半夜摸妺妺的奶C了她软件 | 非洲黑女人性xxxx | 国产午夜精品久久久久 | 欧美成人一区二区三区 | 国产成人小视频 | 狠狠久久久久综合网 | 色老板在线视频 | 午夜精品区 | 网友自拍咪咪爱 | 精品一区二区三区免费毛片 | 免费午夜网站 | 精品福利一区二区免费视频 | 国产乱人乱精一区二区视频密 | 日韩视频免费一区二区三区 | 久久这里只有精品视频9 | 色橹橹 | 大香焦在线观看 | 色琪琪原网站亚洲香蕉 | 亚洲激情在线 | 亚洲第一区二区快射影院 | 1314酒色| 女人和拘做受全过程免费 | 国产成人 免费观看 | 日韩欧美综合在线二区三区 | 91制片厂制作传媒网站破解 | 欧美大片一级片 | 午夜国产在线观看 | 色综合久久综精品 | 性xxxxⅹhd成人 | 亚洲天堂视频在线观看 | 国产亚洲成归v人片在线观看 | 欧美操屁股 | 日韩欧美一区二区三区免费观看 | 91入口免费网站大全 | 欧美一级欧美三级 | 国产精品一在线观看 | adc 我们的永久网址 | 欧美日韩国产在线人成dvd | 色戒西瓜 | 日韩福利网 | 国产在线影院 |