消息隊列 NSQ 源碼學習筆記 (三)

2021-03-03 Go語言中文網




github.com/nsq/apps/nsqd/main.go

代碼邏輯如下:

1.  獲取配置,並從metadata 的持久化文件中讀取topic、channel 信息。meta 信息格式:


Topics []struct {
  Name string `json:"name"`
  Paused bool   `json:"paused"`
  Channels []struct {
    Name string `json:"name"`
    Paused bool   `json:"paused"`
  } `json:"channels"`
} `json:"topics"


2.  啟動nsqd.Main 程序, 埠監聽TCP 服務 和 HTTP 服務(支持HTTPS)。

3.  啟動事件循環

queueScanLoop 處理 in-flight 消息 和 deferred 消息隊列事件的協程

lookupLoop 處理與 nsqlookup 交互的協程。包括消息的廣播,lookup 節點的更新等。

如果配置了狀態監聽的地址,則會啟動 statsdLoop 協程,用於定時發送(UDP)當前服務的各類狀態


Topic 處理


  // 64bit atomic vars need to be first for proper alignment
  //on 32bit platforms
  messageCount uint64  // 消息數量
  messageBytes uint64  // 消息字節數

  sync.RWMutex // 結構體讀寫鎖

  name string
  channelMap map[string]*Channel // 保存topic 下所有channel
  backend BackendQueue // 落地的消息隊列
  memoryMsgChan chan *Message // 內存中的消息
  startChan chan int  // topic 被訂閱了,可以啟動消費了
  exitChan chan int   // 協程退出channel
  channelUpdateChan chan int // channel 更新的消息
  waitGroup util.WaitGroupWrapper
  exitFlag int32          // 退出標記
  idFactory *guidFactory // uuid 生成器

  ephemeral bool // 是否為臨時topic
  deleteCallback func(*Topic)  // 臨時topic,自動刪除相關channel
  deleter        sync.Once

  paused    int32
  pauseChan chan int     // 暫停的信號
  ctx *context  // 上下文,保存nsqd 
}


2. topic 的創建



  

for {
  select {
  // 消息可從二者中隨機獲取,所以topic 中消息是不保序的
  case msg = <-memoryMsgChan:
  // 內存消息
  case buf = <-backendChan:
  // 持久化文件中推送的消息
  ...
  case <-t.channelUpdateChan:
  // 更新channel,則會增加topic 下發的列表
    ...
    continue
  case <-t.pauseChan:
  // 暫停topic,則所有chan 都暫停
    ...
    continue
  case <-t.exitChan:
    goto exit
  }
  for i, channel := range chans {
  // 將topic 收到的消息廣播到 topic 下所有的channel 中
    chanMsg := msg


    // 考慮比較周全的是,減少一次message 的創建
    if i > 0 {
      chanMsg = NewMessage(msg.ID, msg.Body)
      chanMsg.Timestamp = msg.Timestamp
      chanMsg.deferred = msg.deferred
    }
    if chanMsg.deferred != 0 {
    // 如果是defer 的消息,會添加到channel 的defer 隊列中
      channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
      continue
    }
  // 正常消息,直接添加到channel 中
    err := channel.PutMessage(chanMsg)
    if err != nil {
      // log
    }
  }
}



3. 值得關注的topic 操作



func (t *Topic) put(m *Message) error {
  select {
  case t.memoryMsgChan <- m:
  default:
    b := bufferPoolGet()
    err := writeMessageToBackend(b, m, t.backend)
    bufferPoolPut(b)
    t.ctx.nsqd.SetHealth(err)
    if err != nil {
      // log
      return err
    }
  }
  return nil
}



  利用了golang chan 阻塞的原理,當 memoryMsgChan 滿了之後,case t.memoryMsgChan <- m 無法執行,會執行 default 操作,自動添加消息到硬碟中。



Channel 處理


channel 沒有自己的事件操作,都是通過被動執行相關操作。


1. 數據結構


type Channel struct {
  // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  requeueCount uint64  // 重新消費的message 個數
  messageCount uint64  // 消息總數
  timeoutCount uint64  // 消費超時的message 個數

  sync.RWMutex

  topicName string
  name string
  ctx *context

  backend BackendQueue // 落地的隊列

  memoryMsgChan chan *Message // 內存中的消息
  exitFlag int32     // 退出標識
  exitMutex sync.RWMutex

  // state tracking
  clients map[int64]Consumer // 支持多個client 消費,但是一條消息僅能被某一個client 消費
  paused int32        // 暫停標識
  ephemeral bool         // 臨時 channel 標識
  deleteCallback func(*Channel)   // 刪除的回調函數
  deleter    sync.Once
  e2eProcessingLatencyStream *quantile.Quantile

  deferredMessages map[MessageID]*pqueue.Item  // defer 消息保存的map
  deferredPQ     pqueue.PriorityQueue      // defer 隊列 (優先隊列保存)
  deferredMutex  sync.Mutex          // 相關的互斥鎖

  inFlightMessages map[MessageID]*Message    // 正在消費的消息保存的map
  inFlightPQ     inFlightPqueue        // 正在消費的消息保存在優先隊列 (優先隊列保存)
  inFlightMutex  sync.Mutex          // 相關的互斥鎖
}

2. 事件循環處理


在啟動nsqd 時,會啟動一些事件循環的處理。


2.1 channel 隊列處理

  channel 有兩個重要隊列:defer隊列和inflight 隊列, 事件處理主要是對兩個隊列的消息數據做處理



2.2 lookup 事件響應


  此處的事件循環,是用於和lookupd 交戶使用的事件處理模塊。例如Topic 增加或者刪除, channel 增加或者刪除 需要對所有 nslookupd 模塊做消息廣播等處理邏輯,均在此處實現。

  主要的事件:


定時心跳操作 每隔 15s 發送 PING 到 所有 nslookupd 的節點上

topic,channel新增刪除操作 發送消息到所有 nslookupd 的節點上

配置修改的操作 如果配置修改,會重新從配置中刷新一次 nslookupd 節點


消費協程事件處理


       當一個客戶端與nsqd 通過TCP建立連接後,將啟動protocolV2.messagePump 協程,用於處理消息的交互,主協程用於做事件的響應。


  messagePump:


func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
  // 變量初始化
  ...

  for {
    if subChannel == nil || !client.IsReadyForMessages() {
      // 如果沒有channel,則清空
    } else if flushed {
      memoryMsgChan = subChannel.memoryMsgChan
      backendMsgChan = subChannel.backend.ReadChan()
      flusherChan = nil          
    } else {
      memoryMsgChan = subChannel.memoryMsgChan
      backendMsgChan = subChannel.backend.ReadChan()
      // 如果動態設置了flusher 的定時器,則使用這個定時器刷新
      flusherChan = outputBufferTicker.C
    }

    select {
    case <-flusherChan:
    // 寫刷新消息
      ...
    case <-client.ReadyStateChan:
    case subChannel = <-subEventChan:
    // 一個consumer 同一個tcp 連接,只能訂閱一個topic
      subEventChan = nil
    case identifyData := <-identifyEventChan:
    // 客戶端認證, 心跳之類的
    // ...
    case <-heartbeatChan:
    // 心跳消息
      err = p.Send(client, frameTypeResponse, heartbeatBytes)
      if err != nil {
        goto exit
      }
    case b := <-backendMsgChan:
    // 硬碟消息推送到consumer 中
      if sampleRate > 0 && rand.Int31n(100) > sampleRate {
        continue
      }

      // 硬碟消息保存為二進位,需要解碼
      msg, err := decodeMessage(b)
      if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
        continue
      }
      msg.Attempts++

      // 設置超時事件,並將消息放入flight 隊列中
      subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
      client.SendingMessage()
      err = p.SendMessage(client, msg)
      if err != nil {
        goto exit
      }
      flushed = false
    case msg := <-memoryMsgChan:
    // 內存消息推送到consumer
      if sampleRate > 0 && rand.Int31n(100) > sampleRate {
        continue
      }
      msg.Attempts++

      // 設置超時事件,並將消息放入flight 隊列中
      subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
      client.SendingMessage()
      err = p.SendMessage(client, msg)
      if err != nil {
        goto exit
      }
      flushed = false
    case <-client.ExitChan:
      goto exit
    }
  }
exit:
  // 服務退出,則關閉計時器
  // ...
}

TCP 傳輸協議


nsqd 消息 id 生成方法採用的uuid 生成算法 snowflake 算法

in_flight_queue 和 delay_queue 實現都是使用堆排序實現的優先隊列

Mchannel 中隨機篩選Nchannel 做隊列隊列掃描, 每次獲取的概率相同


func UniqRands(quantity int, maxval int) []int {
  if maxval < quantity {
    quantity = maxval
  }


  intSlice := make([]int, maxval)
  for i := 0; i < maxval; i++ {
    intSlice[i] = i
  }


  // 每次從[i, maxval] 中篩選 1 個元素,放到位置 i 中
  for i := 0; i < quantity; i++ {
    j := rand.Int()%maxval + i
    // swap
    intSlice[i], intSlice[j] = intSlice[j], intSlice[i]
    maxval--


  }
  return intSlice[0:quantity]
}



推薦閱讀

喜歡本文的朋友,歡迎關注「Go語言中文網」:

Go語言中文網啟用微信學習交流群,歡迎加微信:274768166,投稿亦歡迎

相關焦點

  • 消息隊列 NSQ 源碼學習筆記 (一)
    NSQ 是Golang 實現的一個輕型消息隊列
  • Java中常用的七個阻塞隊列第二篇DelayQueue源碼介紹
    Java中常用的七個阻塞隊列第二篇DelayQueue源碼介紹通過前面兩篇文章,我們對隊列有了了解及已經認識了常用阻塞隊列中的三個了。本篇我們繼續介紹剩下的幾個隊列。本文主要內容:通過源碼學習Delayqueue及理解Dqueue並用代碼簡單演示使用場景。
  • client-go 源碼學習總結
    前言目前在雲原生社區的 Kubernetes 源碼研習社中和廣大學友們共同學習鄭東旭大佬的 Kubernetes 源碼剖析[1]這本書。當前正在開展第一期學習活動,第五章節 client-go 的學習。之所以從這一章節開始學習,主要是考慮到 client-go 在源碼中相對比較獨立,可以單獨閱讀。
  • Android Handler 由淺入深源碼全解析
    源碼流程分析        大王,且先隨我看小的從網上盜來的一張圖。handler發送Message(消息)至MessageQueue(模擬隊列),由Looper(循環器)不斷循環取出。然後通知Handler處理。這便是整個的消息機制。沒有多複雜。
  • Spring全家桶、Dubbo、分布式、消息隊列後端必備全套開源項目
    《Spring Boot 消息隊列 RocketMQ 入門》 對應 lab-31《Spring Boot 消息隊列 Kafka 入門》 對應 lab-03-kafka《Spring Boot 消息隊列 RabbitMQ 入門》 對應
  • Python 進階:queue 隊列源碼分析
    源碼分析先從初始化的函數來看:class Queue: def __init__(self, maxsize=0): # 設置隊列的最大容量 self.maxsize = maxsize self.
  • Java並發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放
    目錄本篇學習目標CLH隊列的結構資源獲取 入隊Node addWaiter(Node mode)不斷嘗試Node enq(final Node node)boolean acquireQueued(Node, int)出隊void setHead(Node)boolean shouldParkAfterFailedAcquire(Node,Node)boolean
  • 輕量級消息隊列RedisQueue
    消息隊列(Message Queue)是分布式系統必不可少的中間件,大部分消息隊列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用於中小團隊,並且對.NET技術的支持力度不夠。而Redis實現的輕量級消息隊列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!
  • Python 源碼分析:queue 隊列模塊
    源碼分析先從初始化的函數來看:class Queue:    def __init__(self, maxsize=0):                self.maxsize = maxsize        self.
  • Java並發包下鎖學習第三篇-鎖是怎麼維護內部隊列的
    從源碼學習Java並發的鎖是怎麼維護內部線程隊列的在上一篇文章中,凱哥對同步組件基礎框架- AbstractQueuedSynchronizer(AQS)做了大概的介紹。我們知道AQS能夠通過內置的FIFO隊列來完成資源獲取線程的排隊工作。那麼AQS是怎麼來維護這個排隊工作的呢?今天我們就來扒一扒AQS源碼。從源碼中來看看是怎麼維護對了的。
  • 消息隊列:Rabbitmq如何保證不丟消息
    於是便整理了這篇文章來跟大家分享下,自己的理解,如有不準確的地方或者不同的意見,還請各位能夠給出反饋,我們可以討論,相互學習,相互成長。>生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那麼確認消息會將消息寫入磁碟之後發出,broker回傳給生產者的確認消息中deliver-tag
  • 淺入淺出消息隊列
    相信在學生時代大家都遇到過上面的這種情況,如果我們將在學校上課抽象成一個系統,那這種情況就是一個很常見的消息隊列的使用場景。在上述實例中,要提的問題就是**「消息」,提問題的學生是「生產者」,回答問題的老師是「消費者」,收集問題的課代表是「消息隊列」**。
  • Kubernetes 學習筆記之 ServiceAccount TokensController 源碼解析
    在 Kubernetes 學習筆記之 ServiceAccount AdmissionController 源碼解析 文章中,知道一個 ServiceAccount 對象都會引用一個type="kubernetes.io/service-account-token" 的 secret 對象,這個 secret
  • 並發工具類Condition介紹與源碼解析
    從源碼中可以得出,在調用await方法之前一定要先獲取到鎖,否則會拋出異常。signal方法:讓線程跳出循環知道了await方法的源碼流程,就可以猜測signal方法的源碼了,先來分析下await方法如何才能執行完,先不考慮中斷線程的情況,要讓線程跳出while循環需要兩個條件:節點狀態不能是-2、讓節點在同步隊列中;記住這裡的同步隊列是AQS維護的同步隊列。
  • 五分鐘學後端技術:如何學習後端工程師必學的消息隊列
    什麼是消息隊列「RabbitMQ?」「Kafka?」「RocketMQ?」...在日常學習與開發過程中,我們常常聽到消息隊列這個關鍵詞,可能你是熟練使用消息隊列的老手,又或者你是不懂消息隊列的新手,不論你了不了解消息隊列,本文都將帶你搞懂消息隊列的一些基本理論。
  • Android多線程:手把手帶你深入Handler源碼分析(下)
    2.2 使用方式    Handler使用方式 因發送消息到消息隊列的方式不同而不同,共分為2種:使用Handler.sendMessage()、使用Handler.post()。下面的源碼分析將依據使用步驟講解。
  • 分布式消息隊列 RocketMQ 源碼分析 —— Message 拉取與消費(下)
    4.0.x 正式版 1、概述 2、Consumer 3、PushConsumer 一覽 4、PushConsumer 訂閱 DefaultMQPushConsumerImpl#subscribe(…) FilterAPI.buildSubscriptionData(…) DefaultMQPushConsumer#registerMessageListener(…) 5、PushConsumer 消息隊列分配
  • Android多線程:手把手帶你深入Handler源碼分析(上)
    2.2 使用方式  Handler使用方式 因發送消息到消息隊列的方式不同而不同,共分為2種:使用Handler.sendMessage()、使用Handler.post()。下面的源碼分析將依據使用步驟講解。
  • Kubernetes學習筆記之LRU算法源碼解析
    Kubernetes學習筆記之LRU算法源碼解析Overview本文章基於k8s release-1.17分支代碼。之前一篇文章學習 Kubernetes學習筆記之ServiceAccount TokensController源碼解析 ,主要學習ServiceAccount有關知識,發現其中使用了LRU Cache,代碼在 L106 。