日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)...

發布時間:2024/4/13 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

柯南君:看大數據時代下的IT架構(4)消息隊列之RabbitMQ--案例(Helloword起航)

二、起航


? ? ? ?本章節,柯南君將從幾個層面,用官網例子講解一下RabbitMQ的實操經典程序案例,讓大家重新回到經典“Hello world!”(The simplest thing that does something )時代,RabbitMQ 支持N多種客戶端(client),這里無法一一講解,暫定java client,有時間的情況下,在彌補一下。

事先,先普及一下圖標(我們會在下面的事例中,會大量用到,所以先普及一下,便于識別,最終更好理解事例的含義)

1、圖標概念

① producting(生產者):在程序中 發送消息的一端,我們暫且稱之為 生產者,在這里用“p”表示

②?queue(隊列):隊列是一個郵箱的名字。它住在RabbitMQ。盡管消息流經RabbitMQ和您的應用程序,他們只可以存儲在一個隊列中。隊列是不受任何限制,它可以儲存盡可能多的信息(按你興趣來了),它本質上是一個無限緩沖區。許多生產商可以發送消息到一個隊列,許多消費者可以嘗試接收數據從一個隊列。

③?consuming(消費者):消費者和生產者是對應的,較為相似的意思;在這里,我用“C”表示

?

2、The Java client library

RabbitMQ 中AMQP這是一個開放的、通用的協議消息。有許多客戶AMQP在許多不同的語言。我們將使用提供的Java客戶機RabbitMQ。?

我們需要下載(Download) client library package,并要核實每個jar包,解壓到相應位置,如下圖所示:

第一步:點擊?http://www.rabbitmq.com/java-client.html,然后找到相應的lib下載位置

第二步:選擇合適的下載,比如我下載了zip包,如圖所示:

第三步:Unzip it(解壓它) 到你的working directory(工作目錄)中 and grab (并且獲得)你的jar包文件

?

  • $ unzip rabbitmq-java-client-bin-*.zip
  • $ cp rabbitmq-java-client-bin-*/*.jar ./

?

3、程序案例

1)?"Hello World"?

在這部分教程中我們將用Java寫兩個程序,發送一個消息的生產者,消費者接收信息并打印出來。我們會掩蓋一些細節的Java API,集中在這個非常簡單的東西開始。這是一個“Hello World”的消息。在下面的圖中,“P”是我們的生產和“C”是我們的消費者。中間的框是一個隊列,消息緩沖區RabbitMQ保持代表消費者。
?① sending (發送)

首先?讓The sender(消息發送者)發送消息并且讓我們的receiver (消息接收者)接收消息,The sender(消息發送者)將會connect to(連接)RabbitMQ,發送一個single message(單一的信息),然后exit(退出)。
  • 在send.java 中,我們需要import一些class ,如下所示:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
  • set up(設置)類和queue的name
public class Send {private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
  • then 我們create 一個connection (連接)到server(服務端)
onnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); 備注
  • ?這個connection 是抽象的socket connection鏈接;
  • ?負責協議版本(protocol version negotigation)和身份認證(authentication?);
  • 我們在本地機器上連接到一個代理即 localhost ,如果我們想要連接到代理不同機器上我們簡單的指定其名稱或者IP地址即可;
接下來,我們創建一個channel(通道),這個通道匯集了大多數的API服務! 為了發送,我們必須先聲明一個為我們發送queue(隊列),然后,往queue里發送一個message channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); 消息內容是一個字節數組,所以你可以編碼任何你喜歡的。最后,我們關閉通道和連接; channel.close();connection.close(); 問題 ?如果 sending doesn‘t work! 我們將怎么辦?why? 如果這是你第一次使用RabbitMQ并且你看不到“發送的”消息,那么你可能抓耳撓腮沒有足夠的空閑磁盤空間(默認情況下它需要至少1 gb免費),因此拒絕接受消息。檢查代理日志文件確認,如果有必要減少限制。配置文件的文檔將向您展示如何設置disk_free_limit。 接下來的是send.java所有源代碼:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }

?

?? ?② Receiving (接收)
這就是我們的發送者。我們的接收器是將消息從RabbitMQ,所以不像發送方發布一個消息,我們將保持運行監聽消息并打印出來
  • The code (in?Recv.java) has almost the same imports as?Send:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; 額外的QueueingConsumer是一個類,我們將使用緩沖消息推到我們的服務器。設置發送者一樣,我們打開一個連接和一個通道,并宣布我們將使用的隊列。注意這與隊列,發送發布。 public class Recv {private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } } 注意,我們在這里聲明隊列。因為我們可能會在發送方之前開始啟動接收方,我們要確保隊列存在之前我們嘗試使用消息。我們要告訴服務器提供我們從隊列的消息。因為它將異步消息,我們提供一個回調對象的形式,將緩沖的消息,直到我們準備使用它們。 QueueingConsumer要做什么呢? QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } QueueingConsumer.nextDelivery()塊,直到另一個來自服務器的消息交付。??
下面是Recv.java 源代碼: import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }

總結

以上是生活随笔為你收集整理的柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。