一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python+Pika+RabbitMQ環境部署及實現工作隊列的實例教程

Python+Pika+RabbitMQ環境部署及實現工作隊列的實例教程

2020-08-30 10:08陳杰斌 Python

RabbitMQ是一個消息隊列服務器,在本文中我們將學習到Python+Pika+RabbitMQ環境部署及實現工作隊列的實例教程,需要的朋友可以參考下

rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,即消息隊列的意思。前面還有個rabbit單詞,就是兔子的意思,和python語言叫python一樣,老外還是蠻幽默的。rabbitmq服務類似于mysql、apache服務,只是提供的功能不一樣。rabbimq是用來提供發送消息的服務,可以用在不同的應用程序之間進行通信。

安裝rabbitmq
先來安裝下rabbitmq,在ubuntu 12.04下可以直接通過apt-get安裝:

sudo apt-get install rabbitmq-server

安裝好后,rabbitmq服務就已經啟動好了。接下來看下python編寫Hello World!的實例。實例的內容就是從send.py發送“Hello World!”到rabbitmq,receive.py從rabbitmq接收send.py發送的信息。

Python+Pika+RabbitMQ環境部署及實現工作隊列的實例教程

其中P表示produce,生產者的意思,也可以稱為發送者,實例中表現為send.py;C表示consumer,消費者的意思,也可以稱為接收者,實例中表現為receive.py;中間紅色的表示隊列的意思,實例中表現為hello隊列。

python使用rabbitmq服務,可以使用現成的類庫pika、txAMQP或者py-amqplib,這里選擇了pika。

安裝pika

安裝pika可以使用pip來進行安裝,pip是python的軟件管理包,如果沒有安裝,可以通過apt-get安裝

sudo apt-get install python-pip

通過pip安裝pika:

sudo pip install pika

send.py代碼

連接到rabbitmq服務器,因為是在本地測試,所以就用localhost就可以了。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

聲明消息隊列,消息將在這個隊列中進行傳遞。如果將消息發送到不存在的隊列,rabbitmq將會自動清除這些消息。

channel.queue_declare(queue='hello')

發送消息到上面聲明的hello隊列,其中exchange表示交換器,能精確指定消息應該發送到哪個隊列,routing_key設置為隊列的名稱,body就是發送的內容,具體發送細節暫時先不關注。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

關閉連接

connection.close()

完整代碼

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

先來執行下這個程序,執行成功的話,rabbitmqctl應該成功增加了hello隊列,并且隊列里應該有一條信息,用rabbitmqctl命令來查看下

rabbitmqctl list_queues

在筆者的電腦上輸出如下信息:

Python+Pika+RabbitMQ環境部署及實現工作隊列的實例教程

 

確實有一個hello隊列,并且隊列里有一條信息。接下來用receive.py來獲取隊列里的信息。

receive.py代碼

和send.py的前面兩個步驟一樣,都是要先連接服務器,然后聲明消息的隊列,這里就不再貼同樣代碼了。

接收消息更為復雜一些,需要定義一個回調函數來處理,這邊的回調函數就是將信息打印出來。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

告訴rabbitmq使用callback來接收信息

channel.basic_consume(callback, queue='hello', no_ack=True)

開始接收信息,并進入阻塞狀態,隊列里有信息才會調用callback進行處理。按ctrl+c退出。

channel.start_consuming()

完整代碼

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

執行程序,就能夠接收到隊列hello里的消息Hello World!,然后打印在屏幕上。換一個終端,再次執行send.py,可以看到receive.py這邊會再次接收到信息。

工作隊列示例

1.準備工作(Preparation)

在實例程序中,用new_task.py來模擬任務分配者, worker.py來模擬工作者。

修改send.py,從命令行參數里接收信息,并發送

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

修改receive.py的回調函數。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

這邊先打開兩個終端,都運行worker.py,處于監聽狀態,這邊就相當于兩個工作者。打開第三個終端,運行new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

觀察worker.py接收到任務,其中一個工作者接收到3個任務 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

另外一個工作者接收到2個任務 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

從上面來看,每個工作者,都會依次分配到任務。那么如果一個工作者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其他工作者處理。所以應當有一種機制,當一個工作者完成任務時,會反饋消息。

2.消息確認(Message acknowledgment)

消息確認就是當工作者完成任務后,會反饋給rabbitmq。修改worker.py中的回調函數:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

這邊停頓5秒,可以方便ctrl+c退出。

去除no_ack=True參數或者設置為False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用這個代碼運行,即使其中一個工作者ctrl+c退出后,正在執行的任務也不會丟失,rabbitmq會將任務重新分配給其他工作者。

3.消息持久化存儲(Message durability)

雖然有了消息反饋機制,但是如果rabbitmq自身掛掉的話,那么任務還是會丟失。所以需要將任務持久化存儲起來。聲明持久化存儲:

channel.queue_declare(queue='hello', durable=True)

但是這個程序會執行錯誤,因為hello這個隊列已經存在,并且是非持久化的,rabbitmq不允許使用不同的參數來重新定義存在的隊列。重新定義一個隊列:

channel.queue_declare(queue='task_queue', durable=True)

在發送任務的時候,用delivery_mode=2來標記任務為持久化存儲:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平調度(Fair dispatch)

上面實例中,雖然每個工作者是依次分配到任務,但是每個任務不一定一樣。可能有的任務比較重,執行時間比較久;有的任務比較輕,執行時間比較短。如果能公平調度就最好了,使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,即只有工作者完成任務之后,才會再次接收到任務。

channel.basic_qos(prefetch_count=1)

new_task.py完整代碼

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代碼

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 肉色欧美久久久久久久蜜桃 | 第一次破学生处破 | 亚洲欧美午夜 | 国模一区二区三区视频一 | 天天爽天天干天天操 | 欧美视频一区二区三区四区 | 爱色综合v | 无人区在线观看免费完整版免费 | 亚洲精品成人A8198A片漫画 | 蜜桃破解版免费看nba | 欧美男女交配 | 日韩成人在线免费视频 | 动漫美女隐私尿口图片 | 精品无人区乱码1区2区3区在线 | 国产色资源| 亚洲免费视频一区 | 9420高清视频在线观看网百度 | 色中色破解版 | 青草娱乐极品免费视频 | 十六一下岁女子毛片免费 | 好男人在线观看免费高清2019韩剧 | 色综合久久天天综合观看 | 精品小视频在线 | 国产麻豆传媒在线观看 | 公园暴露娇妻小说 | 热色综合| 亚洲欧美日韩高清 | 99午夜高清在线视频在观看 | 国产精品2 | 欧美一区二区三区精品国产 | 国产精品亚洲片在线不卡 | 亚洲精品短视频 | 成年人视频在线免费观看 | 午夜视频一区 | 日本艳鉧动漫1~6完整版在 | 成年视频在线观看 | 高h辣文小说网 烧书阁 | 美女秘密网站 | 国产资源中文字幕 | 国产麻豆精品原创 | 香蕉视频在线观看网站 |