分布式消息隊列 RocketMQ 源碼分析 —— Message 拉取與消費(下)

2020-12-13 CSDN

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/?csdn&2017-10-24 「芋道源碼」歡迎轉載,保留摘要,謝謝! 本文主要基於 RocketMQ 4.0.x 正式版 1、概述 2、Consumer 3、PushConsumer 一覽 4、PushConsumer 訂閱 DefaultMQPushConsumerImpl#subscribe(…) FilterAPI.buildSubscriptionData(…) DefaultMQPushConsumer#registerMessageListener(…) 5、PushConsumer 消息隊列分配 RebalanceService MQClientInstance#doRebalance(…) DefaultMQPushConsumerImpl#doRebalance(…) RebalanceImpl#doRebalance(…) RebalanceImpl#rebalanceByTopic(…) RebalanceImpl#removeUnnecessaryMessageQueue(…) RebalancePushImpl#removeUnnecessaryMessageQueue(…) [PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(…) RebalancePushImpl#dispatchPullRequest(…) DefaultMQPushConsumerImpl#executePullRequestImmediately(…) AllocateMessageQueueStrategy AllocateMessageQueueAveragely AllocateMessageQueueByMachineRoom AllocateMessageQueueAveragelyByCircle AllocateMessageQueueByConfig 5、PushConsumer 消費進度讀取 RebalancePushImpl#computePullFromWhere(…) [PullConsumer] RebalancePullImpl#computePullFromWhere(…) 6、PushConsumer 拉取消息 PullMessageService DefaultMQPushConsumerImpl#pullMessage(…) PullAPIWrapper#pullKernelImpl(…) PullAPIWrapper#recalculatePullFromWhichNode(…) MQClientInstance#findBrokerAddressInSubscribe(…) PullAPIWrapper#processPullResult(…) ProcessQueue#putMessage(…) 總結 6、PushConsumer 消費消息 ConsumeMessageConcurrentlyService 提交消費請求 ConsumeMessageConcurrentlyService#submitConsumeRequest(…) ConsumeMessageConcurrentlyService#submitConsumeRequestLater ConsumeRequest ConsumeMessageConcurrentlyService#processConsumeResult(…) ProcessQueue#removeMessage(…) ConsumeMessageConcurrentlyService#cleanExpireMsg(…) ProcessQueue#cleanExpiredMsg(…) 7、PushConsumer 發回消費失敗消息 DefaultMQPushConsumerImpl#sendMessageBack(…) MQClientAPIImpl#consumerSendMessageBack(…) 8、Consumer 消費進度 OffsetStore OffsetStore#load(…) LocalFileOffsetStore#load(…) OffsetSerializeWrapper RemoteBrokerOffsetStore#load(…) OffsetStore#readOffset(…) LocalFileOffsetStore#readOffset(…) RemoteBrokerOffsetStore#readOffset(…) OffsetStore#updateOffset(…) OffsetStore#persistAll(…) LocalFileOffsetStore#persistAll(…) RemoteBrokerOffsetStore#persistAll(…) MQClientInstance#persistAllConsumerOffset(…) 9、結尾

相關焦點

  • Spring全家桶、Dubbo、分布式、消息隊列後端必備全套開源項目
    在帶你快速學會 MQ 消息的發送與消費的同時,我還想告訴你 MQ 還有集群消費、廣播消費、順序消息、定時消息、事務消息、消費重試等等特性。在帶你快速學會 Job 任務的編寫的同時,我還想告訴你還有 Quartz 單體、Quartz 集群、XXL-JOB 等等企業使用更多的調度平臺。
  • RocketMQ消息軌跡-設計篇
    RocketMQ 消息軌跡主要包含兩篇文章:設計篇與源碼分析篇,本節將詳細介紹RocketMQ消息軌跡-設計相關。1、消息軌跡數據格式RocketMQ4.5版本消息軌跡主要記錄如下信息:traceType跟蹤類型,可選值:Pub(消息發送)、SubBefore(消息拉取到客戶端,執行業務定義的消費邏輯之前)、SubAfter(消費後)。timeStamp當前時間戳。regionIdbroker所在的區域ID,取自BrokerConfig#regionId。
  • 想了解Kafka,RabbitMQ,ZeroMQ,RocketMQ,ActiveMQ之間的差異?這一篇文章就夠了!
    rocketmq隊列的概念和kafka的分區概念是基本一致的,kafka同一個topic的分區儘可能地分布在不同的broker上,分區副本也會分布在不同的broker上。rocketmq集群的slave會從master拉取數據備份,master分布在不同的broker上。activemq:支持簡單集群模式,比如'主-備',對高級集群模式支持不好。
  • 想了解 Kafka, RabbitMQ, ZeroMQ, RocketMQ, ActiveMQ 之間的差異?這一篇文章就夠了!
    rocketmq隊列的概念和kafka的分區概念是基本一致的,kafka同一個topic的分區儘可能地分布在不同的broker上,分區副本也會分布在不同的broker上。rocketmq集群的slave會從master拉取數據備份,master分布在不同的broker上。activemq:支持簡單集群模式,比如'主-備',對高級集群模式支持不好。
  • 消息隊列 NSQ 源碼學習筆記 (三)
    name string  channelMap map[string]*Channel // 保存topic 下所有channel  backend BackendQueue // 落地的消息隊列  memoryMsgChan chan *Message //
  • message-pipe v1.0.1 發布,支持 Nacos 服務發現
    Message Pipe基於Redis實現的分布式消息順序消費管道。
  • Android多線程:手把手帶你深入Handler源碼分析(下)
    今天,我將繼續介紹Handler的源碼分析,希望你們會喜歡。2.2 使用方式    Handler使用方式 因發送消息到消息隊列的方式不同而不同,共分為2種:使用Handler.sendMessage()、使用Handler.post()。下面的源碼分析將依據使用步驟講解。
  • 這些MQ概念你都懂嗎:死信隊列、重試隊列、消息回溯等
    04.重試隊列重試隊列其實可以看成是一種回退隊列,具體指消費端消費消息失敗時,為防止消息無故丟失而重新將消息回滾到Broker中。與回退隊列不同的是重試隊列一般分成多個重試等級,每個重試等級一般也會設置重新投遞延時,重試次數越多投遞延時就越大。
  • Mongodb源碼分析--消息(message)
    具體的代碼(位於message.cpp):           struct  MSGHEADER {        int32 messageLength;  //  消息長度(字節),包括它自身         int32 requestID;      //  消息標識符,用於在請求響應過程中唯一標識該消息
  • RabbitMQ的5種核心消息模式都不懂,也敢說會用消息隊列
    簡介RabbitMQ是最受歡迎的開源消息中間件之一,在全球範圍內被廣泛應用。RabbitMQ是輕量級且易於部署的,能支持多種消息協議。RabbitMQ可以部署在分布式系統中,以滿足大規模、高可用的要求。相關概念我們先來了解下RabbitMQ中的相關概念,這裡以5種消息模式中的路由模式為例。
  • 如何使用Spring Boot與RabbitMQ結合實現延遲隊列
    顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之後就會被消費者馬上消費。延遲隊列能做什麼?延遲隊列多用於需要延遲工作的場景。最常見的是以下兩種場景:延遲消費。比如:用戶生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。
  • 雲原生時代 RocketMQ 運維管控的利器 - RocketMQ Operator
    本文主要分為三個部分:首先簡單介紹一下 RocketMQ Operator 的相關知識;然後結合案例詳細介紹 RocketMQ Operator 提供的自定義資源及使用方法;最後介紹 Operator 社區目前的情況並展望 RocketMQ Operator 下一步的發展方向。相關背景知識1.
  • IM開發基礎知識補課(五):通俗易懂,正確理解並用好MQ消息隊列
    MQ消息隊列中間件已被廣泛用於電商、即時通訊、社交等各種中大型分布式應用系統。:訂閱並消費kafka隊列中的日誌數據。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。
  • Android消息機制之Looper、Handler、MessageQueen
    消息機制的相關概念Android消息機制的通信流程Looper源碼分析MessageQueen源碼分析Handler源碼分析面試題結語前言Android消息機制可以說是我們Android工程師面試題中的必考題,弄懂它的原理是我們避不開的任務,所以長痛不如短痛,花點時間幹掉他
  • 淺入淺出消息隊列
    於是老師決定讓同學們把需要提的問題寫在紙上,下自習後交給課代表,然後老師再從課代表那取出要問的問題,然後再一個個解決。相信在學生時代大家都遇到過上面的這種情況,如果我們將在學校上課抽象成一個系統,那這種情況就是一個很常見的消息隊列的使用場景。
  • 五分鐘學後端技術:如何學習後端工程師必學的消息隊列
    常用的消息隊列我們可以把消息隊列比作是一個存放消息的容器,當我們需要使用消息的時候可以取出消息供自己使用。消息隊列是分布式系統中重要的組件,使用消息隊列主要是為了通過異步處理提高系統性能和削峰、降低系統耦合性。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。
  • 輕量級消息隊列RedisQueue
    消息隊列(Message Queue)是分布式系統必不可少的中間件,大部分消息隊列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用於中小團隊,並且對.NET技術的支持力度不夠。而Redis實現的輕量級消息隊列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!
  • SpringBoot+RabbitMQ (保證消息100%投遞成功並被消費)
    二、實現思路簡略介紹163郵箱授權碼的獲取編寫發送郵件工具類編寫RabbitMQ配置文件生產者發起調用消費者發送郵件定時任務定時拉取投遞失敗的消息, 重新投遞各種異常情況的測試驗證拓展: 使用動態代理實現消費端冪等性驗證和消息確認