日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python2处理耗时任务_RabbitMQ Go客户端教程2——任务队列/工作队列

發布時間:2023/12/20 python 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python2处理耗时任务_RabbitMQ Go客户端教程2——任务队列/工作队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發于我的個人博客:liwenzhou.com,教程共分為六篇,本文是第二篇——任務隊列。

這些教程涵蓋了使用RabbitMQ創建消息傳遞應用程序的基礎知識。 你需要安裝RabbitMQ服務器才能完成這些教程,請參閱安裝指南或使用Docker鏡像。 這些教程的代碼是開源的,官方網站也是如此。

先決條件

本教程假設RabbitMQ已安裝并運行在本機上的標準端口(5672)。如果你使用不同的主機、端口或憑據,則需要調整連接設置。

任務隊列/工作隊列

(使用Go RabbitMQ客戶端)

在第一個教程中,我們編寫程序從命名的隊列發送和接收消息。在這一節中,我們將創建一個工作隊列,該隊列將用于在多個工人之間分配耗時的任務。

工作隊列(又稱任務隊列)的主要思想是避免立即執行某些資源密集型任務并且不得不等待這些任務完成。相反,我們安排任務異步地同時或在當前任務之后完成。我們將任務封裝為消息并將其發送到隊列,在后臺運行的工作進程將取出消息并最終執行任務。當你運行多個工作進程時,任務將在他們之間共享。

這個概念在Web應用中特別有用,因為在Web應用中不可能在較短的HTTP請求窗口內處理復雜的任務,(譯注:例如注冊時發送郵件或短信驗證碼等場景)。

準備工作

在本教程的上一部分,我們發送了一條包含“ Hello World!”的消息。現在,我們將發送代表復雜任務的字符串。我們沒有實際的任務,例如調整圖像大小或渲染pdf文件,所以我們通過借助time.Sleep函數模擬一些比較耗時的任務。我們會將一些包含.的字符串封裝為消息發送到隊列中,其中每有一個.就表示需要耗費1秒鐘的工作,例如,hello...表示一個將花費三秒鐘的假任務。

我們將稍微修改上一個示例中的send.go代碼,以允許從命令行發送任意消息。該程序會將任務安排到我們的工作隊列中,因此我們將其命名為new_task.go

body := bodyFrom(os.Args) // 從參數中獲取要發送的消息正文 err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte(body),}) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body)

下面是bodyFrom函數:

func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s }

我們以前的receive.go程序也需要進行一些更改:它需要為消息正文中出現的每個.偽造一秒鐘的工作。它將從隊列中彈出消息并執行任務,因此我們將其稱為worker.go:

msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args ) failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dot_count := bytes.Count(d.Body, []byte(".")) // 數一下有幾個.t := time.Duration(dot_count)time.Sleep(t * time.Second) // 模擬耗時的任務log.Printf("Done")} }()log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever

請注意,我們的假任務模擬執行時間。

然后,我們就可以打開兩個終端,分別執行new_task.go和worker.go了。

# shell 1 go run worker.go# shell 2 go run new_task.go

循環調度

使用任務隊列的優點之一是能夠輕松并行化工作。如果我們的工作正在積壓,我們可以增加更多的工人,這樣就可以輕松擴展。

首先,讓我們嘗試同時運行兩個worker.go腳本。它們都將從隊列中獲取消息,但是究竟是怎樣呢?讓我們來看看。

你需要打開三個控制臺。其中兩個將運行worker.go腳本。這些控制臺將成為我們的兩個消費者——C1和C2。

# shell 1 go run worker.go # => [*] Waiting for messages. To exit press CTRL+C# shell 2 go run worker.go # => [*] Waiting for messages. To exit press CTRL+C

在第三個控制臺中,我們將發布新任務。啟動消費者之后,你可以發布一些消息:

# shell 3 go run new_task.go msg1. go run new_task.go msg2.. go run new_task.go msg3... go run new_task.go msg4.... go run new_task.go msg5.....

然后我們在shell1和 shell2 兩個窗口看到如下輸出結果了:

# shell 1 go run worker.go # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received a message: msg1. # => [x] Received a message: msg3... # => [x] Received a message: msg5.....# shell 2 go run worker.go # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received a message: msg2.. # => [x] Received a message: msg4....

默認情況下,RabbitMQ將按順序將每個消息發送給下一個消費者。平均而言,每個消費者都會收到相同數量的消息。這種分發消息的方式稱為輪詢。使用三個或者更多worker試一下。

消息確認

work 完成任務可能需要耗費幾秒鐘,如果一個worker在任務執行過程中宕機了該怎么辦呢?我們當前的代碼中,RabbitMQ一旦向消費者傳遞了一條消息,便立即將其標記為刪除。在這種情況下,如果你終止一個worker那么你就可能會丟失這個任務,我們還將丟失所有已經交付給這個worker的尚未處理的消息。

我們不想丟失任何任務,如果一個worker意外宕機了,那么我們希望將任務交付給其他worker來處理。

為了確保消息永不丟失,RabbitMQ支持 href="https://www.rabbitmq.com/confirms.html">消息確認。消費者發送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的消息,并且RabbitMQ可以自由刪除它。

如果使用者在不發送確認的情況下死亡(其通道已關閉,連接已關閉或TCP連接丟失),RabbitMQ將了解消息未完全處理,并將對其重新排隊。如果同時有其他消費者在線,它將很快將其重新分發給另一個消費者。這樣,您可以確保即使工人偶爾死亡也不會丟失任何消息。

沒有任何消息超時;RabbitMQ將在消費者死亡時重新傳遞消息。即使處理一條消息需要很長時間也沒關系。

在本教程中,我們將使用手動消息確認,方法是為“auto-ack”參數傳遞一個false,然后在完成任務后,使用d.Ack(false)從worker發送一個正確的確認(這將確認一次傳遞)。

msgs, err := ch.Consume(q.Name, // queue"", // consumerfalse, // 注意這里傳false,關閉自動消息確認false, // exclusivefalse, // no-localfalse, // no-waitnil, // args ) if err != nil {fmt.Printf("ch.Consume failed, err:%vn", err)return }// 開啟循環不斷地消費消息 forever := make(chan bool) go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手動傳遞消息確認} }()

使用這段代碼,我們可以確保即使你在處理消息時使用CTRL+C殺死一個worker,也不會丟失任何內容。在worker死后不久,所有未確認的消息都將被重新發送。

消息確認必須在接收消息的同一通道(Channel)上發送。嘗試使用不同的通道(Channel)進行消息確認將導致通道級協議異常。有關更多信息,請參閱確認的文檔指南。

忘記確認
忘記確認是一個常見的錯誤。這是一個簡單的錯誤,但后果是嚴重的。當你的客戶機退出時,消息將被重新傳遞(這看起來像隨機重新傳遞),但是RabbitMQ將消耗越來越多的內存,因為它無法釋放任何未確認的消息。
為了調試這種錯誤,可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows平臺,去掉sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止運行,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非您告訴它不要這樣做。要確保消息不會丟失,需要做兩件事:我們需要將隊列和消息都標記為持久的。

首先,我們需要確保隊列能夠在RabbitMQ節點重新啟動后繼續運行。為此,我們需要聲明它是持久的:

q, err := ch.QueueDeclare("hello", // nametrue, // 聲明為持久隊列false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments )

雖然這個命令本身是正確的,但它在我們當前的設置中不起作用。這是因為我們已經定義了一個名為hello的隊列,它不是持久的。RabbitMQ不允許你使用不同的參數重新定義現有隊列,并將向任何嘗試重新定義的程序返回錯誤。但是有一個快速的解決方法——讓我們聲明一個具有不同名稱的隊列,例如task_queue:

q, err := ch.QueueDeclare("task_queue", // nametrue, // 聲明為持久隊列false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments )

這種持久的選項更改需要同時應用于生產者代碼和消費者代碼。

在這一點上,我們確信即使RabbitMQ重新啟動,任務隊列隊列也不會丟失。現在我們需要將消息標記為持久的——通過使用amqp.Publishing中的持久性選項amqp.Persistent。

err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // 立即false, // 強制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬態/持久)ContentType: "text/plain",Body: []byte(body),}) 有關消息持久性的說明
將消息標記為持久性并不能完全保證消息不會丟失。盡管它告訴RabbitMQ將消息保存到磁盤上,但是RabbitMQ接受了一條消息并且還沒有保存它時,仍然有一個很短的時間窗口。而且,RabbitMQ并不是對每個消息都執行fsync(2)——它可能只是保存到緩存中,而不是真正寫入磁盤。持久性保證不是很強,但是對于我們的簡單任務隊列來說已經足夠了。如果您需要更強有力的擔保,那么您可以使用publisher confirms。

公平分發

你可能已經注意到調度仍然不能完全按照我們的要求工作。例如,在一個有兩個worker的情況下,當所有的奇數消息都是重消息而偶數消息都是輕消息時,一個worker將持續忙碌,而另一個worker幾乎不做任何工作。嗯,RabbitMQ對此一無所知,仍然會均勻地發送消息。

這是因為RabbitMQ只是在消息進入隊列時發送消息。它不考慮消費者未確認消息的數量。只是盲目地向消費者發送信息。

為了避免這種情況,我們可以將預取計數設置為1。這告訴RabbitMQ不要一次向一個worker發出多個消息。或者,換句話說,在處理并確認前一條消息之前,不要向worker發送新消息。相反,它將把它發送給下一個不忙的worker。

err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global ) 關于隊列大小的說明
如果所有的worker都很忙,你的queue隨時可能會滿。你會想繼續關注這一點,也許需要增加更多的worker,或者有一些其他的策略。

完整的代碼示例

我們的new_task.go的最終代碼代入如下:

package mainimport ("fmt""log""os""strings""github.com/streadway/amqp" )func main() {// 1. 嘗試連接RabbitMQ,建立連接// 該連接抽象了套接字連接,并為我們處理協議版本協商和認證等。conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Printf("connect to RabbitMQ failed, err:%vn", err)return}defer conn.Close()// 2. 接下來,我們創建一個通道,大多數API都是用過該通道操作的。ch, err := conn.Channel()if err != nil {fmt.Printf("open a channel failed, err:%vn", err)return}defer ch.Close()// 3. 要發送,我們必須聲明要發送到的隊列。q, err := ch.QueueDeclare("task_queue", // nametrue, // 持久的false, // delete when unusedfalse, // 獨有的false, // no-waitnil, // arguments)if err != nil {fmt.Printf("declare a queue failed, err:%vn", err)return}// 4. 然后我們可以將消息發布到聲明的隊列body := bodyFrom(os.Args)err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // 立即false, // 強制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久ContentType: "text/plain",Body: []byte(body),})if err != nil {fmt.Printf("publish a message failed, err:%vn", err)return}log.Printf(" [x] Sent %s", body) }// bodyFrom 從命令行獲取將要發送的消息內容 func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s }

work.go內容如下:

package mainimport ("bytes""fmt""log""time""github.com/streadway/amqp" )func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Printf("connect to RabbitMQ failed, err:%vn", err)return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Printf("open a channel failed, err:%vn", err)return}defer ch.Close()// 聲明一個queueq, err := ch.QueueDeclare("task_queue", // nametrue, // 聲明為持久隊列false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)if err != nil {fmt.Printf("ch.Qos() failed, err:%vn", err)return}// 立即返回一個Delivery的通道msgs, err := ch.Consume(q.Name, // queue"", // consumerfalse, // 注意這里傳false,關閉自動消息確認false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Printf("ch.Consume failed, err:%vn", err)return}// 開啟循環不斷地消費消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手動傳遞消息確認}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever }

使用消息確認和預取計數,可以設置工作隊列(work queue)。即使RabbitMQ重新啟動,持久性選項也可以讓任務繼續存在。

有關amqp.Channel方法和消息屬性的內容,可以瀏覽amqp API文檔。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的python2处理耗时任务_RabbitMQ Go客户端教程2——任务队列/工作队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。