日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

發布時間:2024/4/14 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在云計算環境中,很多時候需要用它其他機器的計算資源,我們有可能會在接收到Message進行處理時,會把一部分計算任務分配到其他節點來完成。那么,RabbitMQ如何使用RPC呢?在本篇文章中,我們將會通過其它節點求來斐波納契完成示例。

1. 客戶端接口 Client interface

為了展示一個RPC服務是如何使用的,我們將創建一段很簡單的客戶端class。 它將會向外提供名字為call的函數,這個call會發送RPC請求并且阻塞知道收到RPC運算的結果。代碼如下:

fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print "fib(4) is %r" % (result,)復制代碼

2. 回調函數隊列 Callback queue

總體來說,在RabbitMQ進行RPC遠程調用是比較容易的。client發送請求的Message然后server返回響應結果。為了收到響應client在publish message時需要提供一個”callback“(回調)的queue地址。code如下:

result = channel.queue_declare(exclusive=True) callback_queue = result.method.queuechannel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = callback_queue,),body=request)# ... and some code to read a response message from the callback_queue ...復制代碼

2.1 Message properties

AMQP 預定義了14個屬性。它們中的絕大多很少會用到。以下幾個是平時用的比較多的:

  • delivery_mode: 持久化一個Message(通過設定值為2)。其他任意值都是非持久化。請移步RabbitMQ消息隊列(三):任務分發機制
  • content_type: 描述mime-type 的encoding。比如設置為JSON編碼:設置該property為application/json。
  • reply_to: 一般用來指明用于回調的queue(Commonly used to name a callback queue)。
  • correlation_id: 在請求中關聯處理RPC響應(correlate RPC responses with requests)。

3. 相關id Correlation id

在上個小節里,實現方法是對每個RPC請求都會創建一個callback queue。這是不高效的。幸運的是,在這里有一個解決方法:為每個client創建唯一的callback queue。

這又有其他問題了:收到響應后它無法確定是否是它的,因為所有的響應都寫到同一個queue了。上一小節的correlation_id在這種情況下就派上用場了:對于每個request,都設置唯一的一個值,在收到響應后,通過這個值就可以判斷是否是自己的響應。如果不是自己的響應,就不去處理。

4. 總結

工作流程:

  • 當客戶端啟動時,它創建了匿名的exclusive callback queue.
  • 客戶端的RPC請求時將同時設置兩個properties: reply_to設置為callback queue;correlation_id設置為每個request一個獨一無二的值.
  • 請求將被發送到an rpc_queue queue.
  • RPC端或者說server一直在等待那個queue的請求。當請求到達時,它將通過在reply_to指定的queue回復一個message給client。
  • client一直等待callback queue的數據。當message到達時,它將檢查correlation_id的值,如果值和它request發送時的一致那么就將返回響應。

5. 最終實現

The code for rpc_server.py:

#!/usr/bin/env python import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print " [.] fib(%s)" % (n,)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = /props.correlation_id),body=str(response))ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')print " [x] Awaiting RPC requests" channel.start_consuming() 復制代碼

The server code is rather straightforward:

  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don’t expect this one to work for big numbers, it’s probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It’s executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

The code for rpc_client.py:

#!/usr/bin/env python import pika import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue,correlation_id = self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc = FibonacciRpcClient()print " [x] Requesting fib(30)" response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,)復制代碼

The client code is slightly more involved:

  • (7) We establish a connection, channel and declare an exclusive ‘callback’ queue for replies.
  • (16) We subscribe to the ‘callback’ queue, so that we can receive RPC responses.
  • (18) The ‘on_response’ callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we’re looking for. If so, it saves the response inself.response
    and breaks the consuming loop.
  • (23) Next, we define our main call method – it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it – the ‘on_response’ callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties:
    reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

開始rpc_server.py:

$ python rpc_server.py[x] Awaiting RPC requests復制代碼

通過client來請求fibonacci數:

$ python rpc_client.py[x] Requesting fib(30)復制代碼

現在這個設計并不是唯一的,但是這個實現有以下優勢:

  • 如何RPC server太慢,你可以擴展它:啟動另外一個RPC server。
  • 在client端, 無所進行加鎖能同步操作,他所作的就是發送請求等待響應。

我們的code還是挺簡單的,并沒有嘗試去解決更復雜和重要的問題,比如:

  • 如果沒有server在運行,client需要怎么做?
  • RPC應該設置超時機制嗎?
  • 如果server運行出錯并且拋出了異常,需要將這個問題轉發到client嗎?
  • 需要邊界檢查嗎?

參考資料:

1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html

2. http://blog.csdn.net/anzhsoft/article/details/19633107



總結

以上是生活随笔為你收集整理的RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。