RabbitMQ安装|使用|概念|Golang开发
手冊:http://www.rabbitmq.com/getstarted.html
安裝:http://www.rabbitmq.com/download.html
參考:http://blog.csdn.net/whycold/article/details/41119807
一.介紹
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Go、Python、Ruby。用于在分布式系統中存儲轉發消息。
二.安裝
ubuntu直接下載deb文件安裝,默認已經啟動,sudo敲入:
sudo rabbitmq-server start sudo lsof -i:5672啟用插件,進入UI:
sudo rabbitmq-plugins enable rabbitmq_management登錄http://127.0.0.1:15672(默認guest只能localhost訪問,要遠程訪問,需要使用可遠程訪問的管理員賬號)
用戶名:密碼=guest:guest
三.使用
# 敲入查看幫助 sudo rabbitmqctl# 創建用戶 sudo rabbitmqctl add_user 登錄用戶名 密碼 # 可以創建管理員用戶,負責整個MQ的運維 sudo rabbitmqctl set_user_tags 登錄用戶名 administrator # 可以創建RabbitMQ監控用戶,負責整個MQ的監控 sudo rabbitmqctl set_user_tags 登錄用戶名 monitoring # 可以創建某個項目的專用用戶,只能訪問項目自己的virtual hosts sudo rabbitmqctl set_user_tags 登錄用戶名 management # 查看用戶 sudo rabbitmqctl list_users # 授權 # 該命令使用戶具有/這個virtual host中所有資源的配置、寫、讀權限以便管理其中的資源 # set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 其中,<conf> <write> <read>的位置分別用正則表達式來匹配特定的資源,如'^(amq\.gen.*|amq\.default)$'可以匹配server生成的和默認的exchange,'^$'不匹配任何資源 sudo rabbitmqctl set_permissions -p / 登錄用戶名 '.*' '.*' '.*'四.概念入門
1.ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。
Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。
ConnectionFactory為Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
2.Queue
Queue(隊列)是RabbitMQ的內部對象,用于存儲消息,用下圖表示。
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息并最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息并消費。
多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。
3.消息的一些機制
3.1.消息確認Message acknowledgment
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息后發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒有收到回執并檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這里不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。
這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多;消費者重啟后會重復消費這些消息并重復執行業務邏輯…
另外pub message是沒有ack的。(??)
3.2.消息持久Message durability
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由于這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務。
3.3.提前取機制Prefetch count
前面我們講到如果有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作并一直空閑的情況。我們可以通過設置prefetchCount來限制Queue每次發送給每個消費者的消息數,比如我們設置prefetchCount=1,則Queue每次給每個消費者發送一條消息;消費者處理完這條消息后Queue會再給該消費者發送一條消息。就是變慢而已。訂閱模式如何平攤?這種模式是一個消費者一次性拿很多條消息?
4.Exchange
在上一節我們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。
Exchange是按照什么邏輯將消息路由到Queue的?這個將在Binding一節介紹。
RabbitMQ中的Exchange有四種類型,不同的類型有著不同的路由策略,這將在Exchange Types一節介紹。
5.Routing key
生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。
在Exchange Type與binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。
RabbitMQ為routing key設定的長度限制為255 bytes。
6.Binding
RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
6.1.Binding key
在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者將消息發送給Exchange時,一般會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。
在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。 binding key 并不是在所有情況下都生效,它依賴于Exchange Type,比如fanout類型(廣播)的Exchange就會無視binding key,而是將消息路由到所有綁定到該Exchange的Queue。
7.Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規范里還提到兩種Exchange Type,分別為system與自定義,這里不予以描述),下面分別進行介紹。
7.1.fanout
fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
7.2.direct
direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。
以上圖的配置為例,我們以routingKey=”error”發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以routingKey=”info”或routingKey=”warning”來發送消息,則消息只會路由到Queue2。如果我們以其他routingKey發送消息,則消息不會路由到這兩個Queue中。
7.3.topic
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:
1. routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” 2. binding key與routing key一樣也是句點號“. ”分隔的字符串 3. binding key中可以存在兩種特殊字符“*”與“#”,用于做模糊匹配,其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1與Q2,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey
7.4.headers
headers類型的Exchange不依賴于routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。
8.RPC
MQ本身是基于異步的消息處理,前面的示例中所有的生產者(P)將消息發送到RabbitMQ后不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。
但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成后再進行下一步處理。這相當于RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。
RabbitMQ中實現RPC的機制是:
五.Go 接口
http://www.rabbitmq.com/tutorials/tutorial-one-go.html
請看官方示例:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go
上面的例子仔細看,有必要看源碼!
GO接口庫
六.實例解釋
四種模式
工作隊列:默認點對點模式
發布方,一個!
package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { // 撥號,下面例子都一樣 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 這個是最重要的 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 申明一個隊列 // https://godoc.org/github.com/streadway/amqp#Channel.QueueDeclare q, err := ch.QueueDeclare( "task_queue", // name 有名字! true, // durable 持久性的,如果事前已經聲明了該隊列,不能重復聲明 false, // delete when unused false, // exclusive 如果是真,連接一斷開,隊列刪除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := bodyFrom(os.Args) // 發布 err = ch.Publish( "", // exchange 默認模式,exchange為空 q.Name, // routing key 默認模式路由到同名隊列,即是task_queue false, // mandatory false, amqp.Publishing{ // 持久性的發布,因為隊列被聲明為持久的,發布消息必須加上這個(可能不用),但消息還是可能會丟,如消息到緩存但MQ掛了來不及持久化。 DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }工作方,多個,拿發布方的消息
package mainimport ("bytes""fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 指定隊列! q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // Fair dispatch 預取,每個工作方每次拿一個消息,確認后才拿下一次,緩解壓力 err = ch.Qos( 1, // prefetch count // 待解釋 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消費根據隊列名 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 設置為真自動確認消息 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") // 確認消息被收到!!如果為真的,那么同在一個channel,在該消息之前未確認的消息都會確認,適合批量處理 // 真時場景:每十條消息確認一次,類似 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }發布-訂閱:廣播模式
發布方
package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默認模式有默認交換機,廣播自己定義一個交換機,交換機可與隊列進行綁定 err = ch.ExchangeDeclare( "logs", // name "fanout", // type 廣播模式 true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 發布 err = ch.Publish( "logs", // exchange 消息發送到交換機,這個時候沒隊列綁定交換機,消息會丟棄 "", // routing key 廣播模式不需要這個,它會把所有消息路由到綁定的所有隊列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }訂閱方
package mainimport ("fmt""log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 同樣要申明交換機 err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 新建隊列,這個隊列沒名字,隨機生成一個名字 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 表示連接一斷開,這個隊列自動刪除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 隊列和交換機綁定,即是隊列訂閱了發到這個交換機的消息 err = ch.QueueBind( q.Name, // queue name 隊列的名字 "", // routing key 廣播模式不需要這個 "logs", // exchange 交換機名字 false, nil) failOnError(err, "Failed to bind a queue") // 開始消費消息,可開多個訂閱方,因為隊列是臨時生成的,所有每個訂閱方都能收到同樣的消息 msgs, err := ch.Consume( q.Name, // queue 隊列名字 "", // consumer true, // auto-ack 自動確認 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }高級路由 發布-訂閱 新版:默認點對點模式
發布-訂閱每個綁定的隊列都收到一樣的消息,現在不想!使用路由功能,隊列綁定進行分發。
發布方
package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交換機申明,且類型為點對點默認 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 發布 err = ch.Publish( "logs_direct", // exchange 發到這個交換機 severityFrom(os.Args), // routing key 且路由key是由命令行指定,如下方,指定了error false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] } return s }發消息到交換機,路由key為error
go run *.go error "Run. Run. Or it will explode."消費方
package mainimport ("fmt""log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 慣例 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 申明臨時隊列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } # 綁定隊列和交換機,綁定多個路由key,見下方 for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) // 下面同個隊列可以收到不同路由key的消息 ,廣播模式除外! err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") } // 消費隊列 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever }消費這些key:info warning error
go run *.go info warning error話題 發布-訂閱 新新版:話題模式
上面的路由都是標準的,就是固定字符串名字,話題模式可以使用類正則的路由,這樣模糊匹配更棒!!
路由類似于這樣?*.love.*
* (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.發布方
package mainimport ("fmt""log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交換機,話題模式 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 類似上面 err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key 路由可以不標準了 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s }命令運行
To receive all the logs:
go run *.go "#"To receive all logs from the facility “kern”:
go run *.go "kern.*"Or if you want to hear only about “critical” logs:
go run *.go "*.critical"You can create multiple bindings:
go run *.go "kern.*" "*.critical"消費方
package mainimport ("fmt""log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 同理 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 臨時隊列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) // 綁定也是類似的 err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever }命令運行
go run *.go "kern.critical" "A critical kernel error"rpc:RPC模式
應答方
package mainimport ("fmt""log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { // 撥號 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 聲明匿名隊列 q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when usused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 公平分發 沒有這個則round-robbin:https://segmentfault.com/a/1190000004492447 err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消費,等待請求 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { //請求來了 for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) // 計算 response := fib(n) // 回答 err = ch.Publish( "", // exchange d.ReplyTo, // routing key 回答隊列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, 序列號 Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") // 確認回答完畢 d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <forever }請教方
package mainimport ("fmt""log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { // 撥號 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 隊列聲明 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 為真即連接斷開就刪除 false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 消費 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive 這個為真,服務器會認為這是該隊列唯一的消費者 false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) // 請教! err = ch.Publish( "", // exchange "rpc_queue", // routing key 問題發到這里 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, 希望回答被發到這里 Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") // 取答案 for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n }http://www.rabbitmq.com/tutorials/tutorial-six-go.html
七.屬性詳解,測試!
package mainimport ( "fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } // Queue屬性測試 // // durable屬性和auto-delete屬性可以同時生效; // durable屬性和exclusive屬性會有性質上的沖突,兩者同時設置時,僅exclusive屬性生效; // auto_delete屬性和exclusive屬性可以同時生效; // // auto_delete如果有連接存在消費者訂閱該http://www.lenggirl.com/tool/RabbitMQ.htmlqueue,正常,如果消費者全部消失,自動刪除隊列 // 可以在沒有創建consumer的情況下,創建出具有auto-delete屬性的queue。 // // exclusive,如果聲明該隊列的連接斷開,自動刪除隊列 // queue的存在條件是在聲明該隊列的連接上存在某個consumer訂閱了該queue。 // func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默認模式有默認交換機,廣播自己定義一個交換機,交換機可與隊列進行綁定 /* ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags. ExchangeDeclare方法在服務器聲明一個exchange。如果不存在,新建一個,存在的話則確認type和durability和auto-delete的標志是否一致。 Errors returned from this method will close the channel. 如果方法返回錯誤,channel會被關閉。 Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consists of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon. 名字以"amq."開頭的Exchange是為之前已經聲明和標準化的exchange們保留的, 在exchange已經存在的情況下,或者passive選項設置為真,客戶端才有可能聲明一個這樣的exchange。 exchange的名字是一個非空序列,僅能包含字母,數字,連字符-,下劃線_,句號.,冒號: 另外的方法ExchangeDeclarePassive主要用來檢測exchange是否已經存在。 Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers". Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges. Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed. Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings. Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding durable queues to auto-deleted exchanges. Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable. Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful for when you wish to implement inter-exchange topologies that should not be exposed to users of the broker. When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions. Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters. func (me *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error { */ err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err總結
以上是生活随笔為你收集整理的RabbitMQ安装|使用|概念|Golang开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Qt for Android 开发大坑
- 下一篇: NFS部署及优化(一)