RabbitMQ 消費端限流、TTL、死信隊列

2020-12-25 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

作者:向海www.cnblogs.com/haixiang/p/10905189.html

前幾天,師長已經帶大家了解了:RabbitMQ 簡介以及使用場景,以及帶大家進行了:手把手帶你Springboot整合RabbitMq ,一篇講完,然後還帶大家用

Hyperf+RabbitMQ+WebSocket實現大屏幕消息推送(藍字點擊可跳轉),今天這篇,為大家講講RabbitMQ的消費端限流、TTL、死信隊列相關內容。

消費端限流

1. 為什麼要對消費端限流

假設一個場景,首先,我們 Rabbitmq 伺服器積壓了有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多數據!

當數據量特別大的時候,我們對生產端限流肯定是不科學的,因為有時候並發量就是特別大,有時候並發量又特別少,我們無法約束生產端,這是用戶的行為。所以我們應該對消費端限流,用於保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導致系統的卡頓甚至直接崩潰。

2.限流的 api 講解

RabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息(通過基於 consume 或者 channel 設置 Qos 的值)未被確認前,不進行消費新的消息。

/**

* Request specific "quality of service" settings.

* These settings impose limits on the amount of data the server

* will deliver to consumers before requiring acknowledgements.

* Thus they provide a means of consumer-initiated flow control.

* @param prefetchSize maximum amount of content (measured in

* octets) that the server will deliver, 0 if unlimited

* @param prefetchCount maximum number of messages that the server

* will deliver, 0 if unlimited

* @param global true if the settings should be applied to the

* entire channel rather than each consumer

* @throws java.io.IOException if an error is encountered

*/

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

prefetchSize:0,單條消息大小限制,0代表不限制prefetchCount:一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多於 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。global:true、false 是否將上面設置應用於 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別。當我們設置為 false 的時候生效,設置為 true 的時候沒有了限流功能,因為 channel 級別尚未實現。注意:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。

3.如何對消費端進行限流

首先第一步,我們既然要使用消費端限流,我們需要關閉自動 ack,將 autoAck 設置為 falsechannel.basicConsume(queueName, false, consumer);第二步我們來設置具體的限流大小以及數量。channel.basicQos(0, 15, false);第三步在消費者的 handleDelivery 消費方法中手動 ack,並且設置批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);

這是生產端代碼,與前幾章的生產端代碼沒有做任何改變,主要的操作集中在消費端。

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class QosProducer {

public static void main(String[] args) throws Exception {

//1. 創建一個 ConnectionFactory 並進行設置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

//2. 通過連接工廠來創建連接

Connection connection = factory.newConnection();

//3. 通過 Connection 來創建 Channel

Channel channel = connection.createChannel();

//4. 聲明

String exchangeName = "test_qos_exchange";

String routingKey = "item.add";

//5. 發送

String msg = "this is qos msg";

for (int i = 0; i < 10; i++) {

String tem = msg + " : " + i;

channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());

System.out.println("Send message : " + tem);

}

//6. 關閉連接

channel.close();

connection.close();

}

}

這裡我們創建一個消費者,通過以下代碼來驗證限流效果以及 global 參數設置為 true 時不起作用.。我們通過Thread.sleep(5000); 來讓 ack 即處理消息的過程慢一些,這樣我們就可以從後臺管理工具中清晰觀察到限流情況。

import com.rabbitmq.client.*;

import java.io.IOException;

public class QosConsumer {

public static void main(String[] args) throws Exception {

//1. 創建一個 ConnectionFactory 並進行設置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

//2. 通過連接工廠來創建連接

Connection connection = factory.newConnection();

//3. 通過 Connection 來創建 Channel

final Channel channel = connection.createChannel();

//4. 聲明

String exchangeName = "test_qos_exchange";

String queueName = "test_qos_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, true, false, false, null);

channel.basicQos(0, 3, false);

//一般不用代碼綁定,在管理界面手動綁定

channel.queueBind(queueName, exchangeName, routingKey);

//5. 創建消費者並接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

String message = new String(body, "UTF-8");

System.out.println("[x] Received '" + message + "'");

channel.basicAck(envelope.getDeliveryTag(), true);

}

};

//6. 設置 Channel 消費者綁定隊列

channel.basicConsume(queueName, false, consumer);

channel.basicConsume(queueName, false, consumer1);

}

}

我們從下圖中發現 Unacked值一直都是 3 ,每過 5 秒 消費一條消息即 Ready 和 Total 都減少 3,而 Unacked的值在這裡代表消費者正在處理的消息,通過我們的實驗發現了消費者一次性最多處理 3 條消息,達到了消費者限流的預期功能。

當我們將void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 設置為 true的時候我們發現並沒有了限流的作用。

TTL

TTL是Time To Live的縮寫,也就是生存時間。RabbitMQ支持消息的過期時間,在消息發送時可以進行指定。

RabbitMQ支持隊列的過期時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那麼消息會自動的清除。

這與 Redis 中的過期時間概念類似。我們應該合理使用 TTL 技術,可以有效的處理過期垃圾消息,從而降低伺服器的負載,最大化的發揮伺服器的性能。

RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.

RabbitMQ允許您為消息和隊列設置TTL(生存時間)。 這可以使用可選的隊列參數或策略來完成(建議使用後一個選項)。 可以對單個隊列,一組隊列強制執行消息TTL,也可以為單個消息應用消息TTL。

——摘自 RabbitMQ 官方文檔

1.消息的 TTL

我們在生產端發送消息的時候可以在 properties 中指定 expiration屬性來對消息過期時間進行設置,單位為毫秒(ms)。

/**

* deliverMode 設置為 2 的時候代表持久化消息

* expiration 意思是設置消息的有效期,超過10秒沒有被消費者接收後會被自動刪除

* headers 自定義的一些屬性

* */

//5. 發送

Map<String, Object> headers = new HashMap<String, Object>();

headers.put("myhead1", "111");

headers.put("myhead2", "222");

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

.deliveryMode(2)

.contentEncoding("UTF-8")

.expiration("100000")

.headers(headers)

.build();

String msg = "test message";

channel.basicPublish("", queueName, properties, msg.getBytes());

我們也可以後臺管理頁面中進入 Exchange 發送消息指定expiration

2.隊列的 TTL

我們也可以在後臺管理界面中新增一個 queue,創建時可以設置 ttl,對於隊列中超過該時間的消息將會被移除。

死信隊列

死信隊列:沒有被及時消費的消息存放的隊列

消息沒有被及時消費的原因:

a.消息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=falseb.TTL(time-to-live) 消息超時未消費c.達到最大隊列長度

實現死信隊列步驟

首先需要設置死信隊列的 exchange 和 queue,然後進行綁定:

Exchange: dlx.exchange

Queue: dlx.queue

RoutingKey: # 代表接收所有路由 key

然後我們進行正常聲明交換機、隊列、綁定,只不過我們需要在普通隊列加上一個參數即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )這樣消息在過期、requeue失敗、 隊列在達到最大長度時,消息就可以直接路由到死信隊列!

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class DlxProducer {

public static void main(String[] args) throws Exception {

//設置連接以及創建 channel 湖綠

String exchangeName = "test_dlx_exchange";

String routingKey = "item.update";

String msg = "this is dlx msg";

//我們設置消息過期時間,10秒後再消費 讓消息進入死信隊列

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

.deliveryMode(2)

.expiration("10000")

.build();

channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());

System.out.println("Send message : " + msg);

channel.close();

connection.close();

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

public class DlxConsumer {

public static void main(String[] args) throws Exception {

//創建連接、創建channel忽略 內容可以在上面代碼中獲取

String exchangeName = "test_dlx_exchange";

String queueName = "test_dlx_queue";

String routingKey = "item.#";

//必須設置參數到 arguments 中

Map<String, Object> arguments = new HashMap<String, Object>();

arguments.put("x-dead-letter-exchange", "dlx.exchange");

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

//將 arguments 放入隊列的聲明中

channel.queueDeclare(queueName, true, false, false, arguments);

//一般不用代碼綁定,在管理界面手動綁定

channel.queueBind(queueName, exchangeName, routingKey);

//聲明死信隊列

channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);

channel.queueDeclare("dlx.queue", true, false, false, null);

//路由鍵為 # 代表可以路由到所有消息

channel.queueBind("dlx.queue", "dlx.exchange", "#");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//6. 設置 Channel 消費者綁定隊列

channel.basicConsume(queueName, true, consumer);

}

}

總結

DLX也是一個正常的 Exchange,和一般的 Exchange 沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列中有死信時,RabbitMQ 就會自動的將這個消息重新發布到設置的 Exchange 上去,進而被路由到另一個隊列。可以監聽這個隊列中消息做相應的處理。

相關焦點

  • 小兔子 RabbitMQ 養成攻略!
    這樣在秒殺持續的這個時間段內,會有幾十萬或者更多的請求都放在消息隊列裡。因為畢竟秒殺只是會在短暫的那一段時間,等它過去之後,每秒可能就只有幾十,幾百個請求進入消息隊列。但是系統還會按照每秒 2 千個請求的速度去處理。所以,秒殺結束,系統會把那些剩下的消息都消費掉。
  • 史上最透徹的 RabbitMQ 可靠消息傳輸實戰
    4、死信隊列DLX,Dead Letter Exchange 的縮寫,又死信郵箱、死信交換機。DLX就是一個普通的交換機,和一般的交換機沒有任何區別。 當消息在一個隊列中變成死信(dead message)時,通過這個交換機將死信發送到死信隊列中(指定好相關參數,rabbitmq會自動發送)。什麼是死信呢?什麼樣的消息會變成死信呢?
  • 用拼多多商城展示如何應對流量暴增,RabbitMQ竟完美詮釋
    1.訂單存儲的解耦 為了進行流量削峰,我們引入 rabbitmq 消息隊列,當購物系統產生訂單後,可以把訂單數據發送到消息隊列;而訂單消費者應用從消息隊列接收訂單消息
  • 輕量級消息隊列RedisQueue
    這裡我們需要支持消費確認的可信隊列 RedisReliableQueue。消費之後,除非程序主動確認消費,否則Redis不許刪除消息。 GetReliableQueue獲取隊列實例後,Add發布消息,TakeOneAsync異步消費一條消息,並指定10秒阻塞超時,處理完成後再通過Acknowledge確認。
  • 小豬佩奇第91期:Rebecca Rabbit
    英文旁白Rebecca RabbitRebecca rabbitIt's the end of another lovely day.I wish I was a rabbit.I know.Should I teach you bothhow to be a rabbit?Yes, please.First, you have to twitchyour nose and squeak, like this.
  • Rabbit!》
    Are you kidding me? It's totally a duck.QUACK QUACK"I distinctly heard rabbit sounds."SNIFF SNIFF "Now the duck is wading through the swamp.No, the rabbit is hiding in the grass.
  • ttl就是兩個攻在一起?關於T你必須知道是事全在這兒!
    ttl就是兩個攻在一起?關於T你必須知道是事全在這兒!文:她小魚雖然現在主流趨勢是拉拉不分tph,可是實際上,拉拉的分類卻是無比複雜的好麼! 像最多見的tpl,然後還有ttl、ppl不勝枚舉。 t最為複雜,還分娘t和鐵t。 h最百搭,好像哪種女人都能愛。
  • Rabbit!》
    See,there's his bill.看,這是它的嘴。It's a rabbit. And he's about to eat a carrot.它是一隻兔子。正準備吃胡蘿蔔。That's funny.I distinctly heard rabbit sounds."SNIFF SNIFF "你可真搞笑,我聽到的絕對是兔子發出的聲音:嗅嗅!
  • 超連結構築的奇妙仙境 | the Internet rabbit hole
    現在這個時代,到處都是吸引人注意力的東西,稍不注意時間就會溜走。等反應過來,已經忘了自己最初是要幹什麼了。英語裡專門有一個短語來形容這種情況:(go/fall) down the rabbit hole。
  • 看了一部反戰電影Jojo rabbit
    (註:有討論情節,雖然我覺得這種電影沒什麼好劇透的,但是介意別看)昨天早上我七點多就醒了,困得要死。在地鐵上打盹兒不成,本來想去電影院睡會兒,結果電影從開始就很抓人,醍醐灌頂。說起來這還是電影院復工以後我第一次去看電影,之前本來想看《風聲》,後來也跟我的film mate馮天天在小米盒子上看完了。
  • 高性能內存隊列Disruptor原理分析
    性能很厲害比JDK的ArrayBlockingQueue性能高近一個數量級單線程每秒能處理超600W的數據(處理600W並非是消費者消費完600W的數據,而是說Disruptor能在1秒內將600W數據發送給消費者,換句話說
  • Rabbit! 中文名《鴨子,還是兔子》
    That's not a duck.That's a rabbit!它不是一隻鴨子,是一隻兔子!【故事導讀】今天的繪本故事從一個既像鴨子又像兔子 動物身上開始,兩個小朋友對它展開了激烈的討論,所以它到底是鴨子還是兔子呢?小朋友A說:「看,它是一隻鴨子!」小朋友B說:「不是,它是一隻兔子!」
  • Bora普拉提工作室,英國 / 32mq design studio
    主要活動空間  ©Alan Williams, alanwilliamsphotography.com感謝 32mq design studio 對gooood的分享項目位於Date: 2018Gross floor area: 160 m2Client: Bora StudioArchitectural design: 32mq design studioGeneral
  • 《三國志14》龜甲隊列使用心得分享 龜甲隊列強度評測
    導 讀 三國志14威力加強版龜甲隊列使用心得 龜甲隊列強度分析 三國志14目前已經推出威力加強版DLC,
  • 《隊列之末》隊列之末
    C的英劇總是那麼精良,畫一般的鏡頭,妥帖優美的配樂,緩緩道來,如同Christopher和Valentine之間的愛情,壓抑在世俗下,隱忍了那麼久,卻這樣緩緩而深刻地到來,他們是真正的soul mate~然而Parade's End絕不僅僅是一部愛情小說,「隊列之末」,隊列,不是指軍隊,而是Tietjens一直以來所堅守的一種信念——parade,英國騎士時代的傳統道德,它不僅僅是面子
  • 「分布式架構」最終一致性:暗示的切換隊列
    為了理解最終的一致性,我們需要知道兩個概念:暗示切換隊列和反熵,這兩個概念都需要特別注意。 第一部分 什麼是暗示的切換隊列? 儘管有一個很酷的名字,暗示切換(HH)隊列並沒有得到很多關注。HH隊列有一項非常重要的工作,但是除非您是系統管理員,否則很少直接與它交互。
  • Spotify新版蘋果CarPlay應用曝光:支持隊列訪問、全新視覺UI
    IT之家1月11日消息 外媒 9to5 Mac 報導,在去年 11 月終於增加了對獨立 Apple Watch 流媒體的支持後,Spotify 現在正在測試其 CarPlay 應用的重大更新。近期向 TestFlight 測試版用戶分發的最新版本 Spotify for iOS 帶來了新的隊列系統以及界面變化。
  • 練隊列 促養成——富陽消防救援大隊組織開展隊列訓練
    一年之計在於春為進一步加強指戰員作風養成,鞏固日常生活制度的落實,牢固樹立隊員的作風紀律自身養成意識,3月24日,富陽消防救援大隊組織全體幹部開展了隊列訓練。「藍朋友」的隊列訓練時間Tuesday此次隊列訓練主要內容有:稍息、立正、跨立、停止間轉法、蹲下、起立、行進與立定、敬禮、禮畢、脫帽戴帽等科目訓練。訓練中,指揮員口令準確清楚,聲音洪亮,著重對隊員的稍息與立正、正步擺臂、整理著裝等順序、動作要領進行重複講解示範,嚴抓每一個細節、每一個動作。
  • 街舞女神Sukirabbit:做不一樣的港女!
    如果每個人都有機會變身動物的話,那麼香港舞者Sukirabbit該化成頭獅子——不是連喘氣也令人生畏的那種野獸,而是Disney出品的Simba,會在熱帶雨林裡與彭彭丁滿盡情搖擺,一尾甩去,甩走邪惡世俗,腳下帶風,逐愛與自由
  • 《隊列之末》格魯比的雪松
    鑑於沒發過什麼文章,就先po上來好了-----------------------------------------------------------------迷之分割線----------------------------------------------------------------花了一整個下午,磕磕絆絆地看完了《隊列之末》。