RabbitMQ 使用指南 1 MQ 簡介 消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通信來 進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進 程間的通信。對於消息中間件,常見 角色大致也就有 Producer (生產者)、 Consumer (消 費者) 。 常見的消息中間件產品 :1). ActiveMQ ActiveMQ 是 Apache 出品,最流行的,能力強勁的開源消息總線。 ActiveMQ 是 一個完全支持 JMS1.1 和 J2EE 1.4 規範的 JMS Provider 實現。我們在本次課程中 介紹 ActiveMQ 的使用。 2). RabbitMQ AMQP 協議的領導實現,支持多種場景。淘寶的 MySQL 集群內部有使用它進行通訊,OpenStack 開源雲平臺的通信組件,最先在金融行業得到運用。 3). ZeroMQ史上最快的消息隊列系統 4). Kafka Apache 下的一個子項目。特點:高吞吐,在一臺普通的伺服器上既可以達到 10W/s的吞吐速率;完全的分布式系統。適合處理海量數據 2 MQ 作用 1). 解耦 :中間件中的生產者只管發送消息 , 消費者只要從隊列當中獲取消息進行消費就可以 , 從而來實現業務的解耦。 2). 冗餘存儲 : 有些情況下,處理數據的過程會失敗。消息中間件可以把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。在把一個消息從消息中間 件中刪除之前,需要你的處理系統明確地指出該消息己經被處理完成,從而確保你的數據 被安全地保存直到 你使用完畢。 3). 可恢復性:當系統一部分組件失效時,不會影響到整個系統。消息中間件降低了進程間的稿合度,所以即使一個處理消息的進程掛掉,加入消息中間件中的消息仍然可以在系統恢復後進行處理 。
4). 順序保證:在大多數使用場景下,數據處理的順序很重要,大部分消息中間件支持一 定程度上的順序性。 5). 緩衝: 在任何重要的系統中,都會存在需要不同處理時間的元素。消息中間件通過一 個緩衝層來幫助任務最高效率地執行,寫入消息中間件的處理會儘可能快速 。
6). 異步通信:在很多時候應用不想也不 需要立即處理消息。消息中間件提供了異步處 理機制,允許應用把一些消息放入消息中間件中,但並不立即處理它,在之後需要的時候再慢慢處理 。 3 RabbitMQ 安裝及啟動 3.1 安裝依賴環境 rpm -ivh erlang -20.3.8.6 -1.el6.x86_64.rpm yum -y install epel -release yum -y install socat 3.2 安裝 rabbitMQ rpm -ivh rabbitmq -server -3.7.7 -1.el6.noarch.rpm 3.3 添加用戶 默認情況下管理 界面只能在 Linux 系統本機可以訪問 , 如果想其他的主機也能訪問需要 配置:rabbitmq -plugins enable rabbitmq_management 添加訪問用戶:rabbitmqctl add_user admin admin 3.4 RabbitMQ 啟動 /停止 啟動 : service rabbitmq -server start 停止 : service rabbitmq -server stop 查看狀態 : service rabbitmq -server status
4 Rabbit MQ 管理界面訪問 4.1 Overview 概要 該 欄 目 主 要 展 示 的 是 MQ 的概要信息, 如消息的數量,Connection , Channel , Exchange , Queue , Consumer 的數量 .
4.2 Exchange 交換器 該欄目主要展示的是當前虛擬主機下的交換器,也可以在此添加一個新的交換器, 並且配置對應的交換器的規則屬性 。
4.3 Queues 隊列 該欄目 展示的是消息隊列的信息,裡面有各個隊列的概要信息, 也可以在此欄目添加隊列 Queue
4.4 Admin 系統管理 該欄目展示的是用戶管理的信息, 包含用戶列表的展示 ,添加用戶,添加虛擬主機等信息
5 RabbitMQ 的相關概念 5.1 生產者與消費者 5.1.1 生產者 Producer: 生產者,就是投遞消息的一方。 生產者創建消息,然後發布到 RabbitMQ 中。消息一般可以包含 2 個部分 :消息體和標籤 (Label) 。消息體也可以稱之為 payload ,在實際應用中,消 息體一般是一個帶有業務邏輯結構 的數據,比如一個 JSON 字符串。當然可以進一步對這個消息體進行序列 化操作。消息的標籤用來表述這條消息, 比如一個交換器的名稱和一個路由鍵。生產者把消息交由 RabbitMQ,RabbitMQ 之後會根據標籤把消息發送給感興趣的消費者(Consumer ) 。 5.1.2 消費者 Consumer: 消費者,就是接收消息的一方。 消費者連接到 RabbitMQ 伺服器,並訂閱到隊列上 。當消費者消費一條消息時 , 只是消費消息的消息體 (payload ) 。在消息路由的過程中,消息的標籤會丟棄, 存入到隊列中的消息只有消息體,消費者也只會消費到消息體,也就不知道消息的生產者是誰,當然消費者也不需要知道 。 5.2 隊列 Queue: 隊列,是 Rabbi tMQ 的內部對象,用於存儲消息。
5.3 交換器 , 路由鍵 , 綁定 5.3.1 交換器 Exchange: 交換器。在上圖中我們暫時可以理解成生產者將消息投遞到隊列中,實際 上 這個在 RabbitMQ 中不會發生。真實情況是,生產者將消息發送到 Exchange ( 交換器 ),由交換器將消息路由到一個或者多個隊列中。如果路由不到,或 許會返回給生產者,或許直接丟棄。這裡可以將 RabbitMQ 中的交換器看作一個簡單的實體 。
RabbitMQ 中的 交換器有四種類型 , 四種類型分別是 fanout 、 direct 、 topic 、headers ,不同的類型有著不 同的路由策略。 5.3.2 路由鍵 RoutingKey : 路由鍵 。 生產者將消息發給交換器 的時候, 一般會指定一個 RoutingKey,用來指定這個 消息的路由規則,而這個 RoutingKey 需要與交換器類型和綁定鍵 (BindingKey) 聯和使用才能最終生效。 在交換器類型和綁定鍵 (BindingKey) 固定的情況下,生產者可以在發送消息給交 換器時, 通過指定 RoutingKey 來決定消息流向哪裡。 5.3.3 綁定 Binding: 綁定。 RabbitMQ 中通過綁定將交換器與隊列關聯起來,在綁定的時候一 般會指定一個綁定鍵 (BindingKey) ,這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了。 5.4 交換器類型 1).fanout :它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列 中。 2).direct: 該 類 型 的 交 換 器 路 由 規 則 也 很 簡 單 , 它 會 把 消 息 路 由 到 那 些 BindingKey 和 RoutingKey 完全匹配的隊列中。 3).topic : 前面講到 direct 類型的交換器路由規則是完全匹配 BindingK eytopic 類型的交換器在匹配規則上進行了擴展,它與 direct 類型的交換器相似,也是將消息路由到 BindingKey 和 RoutingKey 相匹配的隊 列中,但這裡的匹配規則有些不同,它約定 :
RoutingKey 為 一 個 點 號 "." 分割的字符串 , 如 : com.itcast.client , com.itheima.exam 。 BindingKey 與 RoutingKey 一樣也是點號 "." 分割 的字符串。 BindingKey 中可 以 存在 兩 種 特殊 的 字符 串 "*" 和 "#" , 用 於 模糊 匹配 ,其中 "#" 用於匹配一個單詞 , "*" 用於匹配多個單個 (可以是零個 )。 4). headers : 該類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據 發送的消息內容中的headers屬性進行匹配。 6 生產者發送消息 6.1 隊列綁定 6.1.1 創建隊列 在 RabbitMQ 的後臺管理界面中創建一個隊列 , 指定隊列名稱。
6.1.2 創建交換器 Exchange 在 RabbitMQ 的後臺管理界面中創建一個交換器,指定交換器的名 稱, 並且指定交換器類 型。
6.1.3 綁定隊列與交換器在交換器列表點擊對應的交換器,進入到綁定界面,指定隊列名稱 queue ,指定RoutingKey ,通過該 RoutingKey 來綁定該隊列與交換器 Exchange 。 之後,在發送消息時,指定了 Exchange,及 RoutingKey,就可以將該消息路由 到該隊列 queue 中。
6.2 發送消息邏輯代碼
6.2.1 引入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.2.0</version> </dependency>
6.2.2 發送消息
@Testpublic void test1() throws Exception{//指定往哪一個交換器中發送消息String exchangeName = "itcast.v0.topic";//指定消息的路由RoutingKeyString routingKey = "itcast.item.add";//創建一個連接工廠 , 指定 主機,埠, 訪問的虛擬主機, 用戶名, 密碼ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.142.132");connectionFactory.setPort(5672); connectionFactory .setVirtualHost( "/" ); connectionFactory .setUsername( "admin" ); connectionFactory .setPassword( "admin" ); // 創建一個連結 Connection connection = connectionFactory .newConnection(); // 創建一個通道 Channel Channel channel = connection .createChannel(); // 調用 basicPublish 循環發送 50 條消息 , 每條消息之間 ,間隔 1秒 for (int i = 0; i < 50; i++) { channel .basicPublish( exchangeName , routingKey , MessageProperties. TEXT_PLAIN , ( "生產者生產的消息 083100" +i).getBytes()); TimeUnit. SECONDS .sleep(1); } }
6.3 發送消息平臺監測
7 消費者接受消息 7.1 引入依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.2.0</version></dependency>
7.2接收消息
@Testpublic void test1() throws Exception{//指定隊列名稱String queueName = "itcast_item_add_queue";//獲取連接工廠 , 指定主機 , 埠, 虛擬主機 , 用戶名 , 密碼ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.142.132");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//創建連接Connection connection = connectionFactory.newConnection();//創建通道ChannelChannel channel = connection.createChannel();//設置客戶端最多接收未被ack的消息的個數channel.basicQos(10);//構造消息者, 進行消息的消費Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag : " + consumerTag);System.out.println("properties : " +JSON.toJSONString(properties));System.out.println("envelope : " + JSON.toJSONString(envelope));System.out.println("receive Message : " + new String(body) );System.out.println("----------------------------------------");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//通過消息已經接受到channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(queueName, consumer);System.in.read();//讓程序等待在這裡, 一直監聽該消息隊列channel.close();connection.close();}
7.3 結果輸出
其中: consumerTag : 消息消費者的標籤 properties : 消息內容的頭信息數據 envelope : 消息體的數據包 ,其中包含消息發送時指定的 exchange, routingKey 等信息 .