github.com/nsq/apps/nsqd/main.go
代碼邏輯如下:
1. 獲取配置,並從metadata 的持久化文件中讀取topic、channel 信息。meta 信息格式:
Topics []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
} `json:"channels"`
} `json:"topics"
2. 啟動nsqd.Main 程序, 埠監聽TCP 服務 和 HTTP 服務(支持HTTPS)。
3. 啟動事件循環
queueScanLoop 處理 in-flight 消息 和 deferred 消息隊列事件的協程
lookupLoop 處理與 nsqlookup 交互的協程。包括消息的廣播,lookup 節點的更新等。
如果配置了狀態監聽的地址,則會啟動 statsdLoop 協程,用於定時發送(UDP)當前服務的各類狀態
Topic 處理
// 64bit atomic vars need to be first for proper alignment
//on 32bit platforms
messageCount uint64 // 消息數量
messageBytes uint64 // 消息字節數
sync.RWMutex // 結構體讀寫鎖
name string
channelMap map[string]*Channel // 保存topic 下所有channel
backend BackendQueue // 落地的消息隊列
memoryMsgChan chan *Message // 內存中的消息
startChan chan int // topic 被訂閱了,可以啟動消費了
exitChan chan int // 協程退出channel
channelUpdateChan chan int // channel 更新的消息
waitGroup util.WaitGroupWrapper
exitFlag int32 // 退出標記
idFactory *guidFactory // uuid 生成器
ephemeral bool // 是否為臨時topic
deleteCallback func(*Topic) // 臨時topic,自動刪除相關channel
deleter sync.Once
paused int32
pauseChan chan int // 暫停的信號
ctx *context // 上下文,保存nsqd
}
2. topic 的創建
for {
select {
// 消息可從二者中隨機獲取,所以topic 中消息是不保序的
case msg = <-memoryMsgChan:
// 內存消息
case buf = <-backendChan:
// 持久化文件中推送的消息
...
case <-t.channelUpdateChan:
// 更新channel,則會增加topic 下發的列表
...
continue
case <-t.pauseChan:
// 暫停topic,則所有chan 都暫停
...
continue
case <-t.exitChan:
goto exit
}
for i, channel := range chans {
// 將topic 收到的消息廣播到 topic 下所有的channel 中
chanMsg := msg
// 考慮比較周全的是,減少一次message 的創建
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
// 如果是defer 的消息,會添加到channel 的defer 隊列中
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
// 正常消息,直接添加到channel 中
err := channel.PutMessage(chanMsg)
if err != nil {
// log
}
}
}
3. 值得關注的topic 操作
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
// log
return err
}
}
return nil
}
利用了golang chan 阻塞的原理,當 memoryMsgChan 滿了之後,case t.memoryMsgChan <- m 無法執行,會執行 default 操作,自動添加消息到硬碟中。
Channel 處理
channel 沒有自己的事件操作,都是通過被動執行相關操作。
1. 數據結構
type Channel struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
requeueCount uint64 // 重新消費的message 個數
messageCount uint64 // 消息總數
timeoutCount uint64 // 消費超時的message 個數
sync.RWMutex
topicName string
name string
ctx *context
backend BackendQueue // 落地的隊列
memoryMsgChan chan *Message // 內存中的消息
exitFlag int32 // 退出標識
exitMutex sync.RWMutex
// state tracking
clients map[int64]Consumer // 支持多個client 消費,但是一條消息僅能被某一個client 消費
paused int32 // 暫停標識
ephemeral bool // 臨時 channel 標識
deleteCallback func(*Channel) // 刪除的回調函數
deleter sync.Once
e2eProcessingLatencyStream *quantile.Quantile
deferredMessages map[MessageID]*pqueue.Item // defer 消息保存的map
deferredPQ pqueue.PriorityQueue // defer 隊列 (優先隊列保存)
deferredMutex sync.Mutex // 相關的互斥鎖
inFlightMessages map[MessageID]*Message // 正在消費的消息保存的map
inFlightPQ inFlightPqueue // 正在消費的消息保存在優先隊列 (優先隊列保存)
inFlightMutex sync.Mutex // 相關的互斥鎖
}
2. 事件循環處理
在啟動nsqd 時,會啟動一些事件循環的處理。
2.1 channel 隊列處理
channel 有兩個重要隊列:defer隊列和inflight 隊列, 事件處理主要是對兩個隊列的消息數據做處理
2.2 lookup 事件響應
此處的事件循環,是用於和lookupd 交戶使用的事件處理模塊。例如Topic 增加或者刪除, channel 增加或者刪除 需要對所有 nslookupd 模塊做消息廣播等處理邏輯,均在此處實現。
主要的事件:
定時心跳操作 每隔 15s 發送 PING 到 所有 nslookupd 的節點上
topic,channel新增刪除操作 發送消息到所有 nslookupd 的節點上
配置修改的操作 如果配置修改,會重新從配置中刷新一次 nslookupd 節點
消費協程事件處理
當一個客戶端與nsqd 通過TCP建立連接後,將啟動protocolV2.messagePump 協程,用於處理消息的交互,主協程用於做事件的響應。
messagePump:
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
// 變量初始化
...
for {
if subChannel == nil || !client.IsReadyForMessages() {
// 如果沒有channel,則清空
} else if flushed {
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
// 如果動態設置了flusher 的定時器,則使用這個定時器刷新
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// 寫刷新消息
...
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// 一個consumer 同一個tcp 連接,只能訂閱一個topic
subEventChan = nil
case identifyData := <-identifyEventChan:
// 客戶端認證, 心跳之類的
// ...
case <-heartbeatChan:
// 心跳消息
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
// 硬碟消息推送到consumer 中
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
// 硬碟消息保存為二進位,需要解碼
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
// 設置超時事件,並將消息放入flight 隊列中
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
// 內存消息推送到consumer
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
// 設置超時事件,並將消息放入flight 隊列中
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
// 服務退出,則關閉計時器
// ...
}
TCP 傳輸協議
nsqd 消息 id 生成方法採用的uuid 生成算法 snowflake 算法
in_flight_queue 和 delay_queue 實現都是使用堆排序實現的優先隊列
從M 個channel 中隨機篩選N個channel 做隊列隊列掃描, 每次獲取的概率相同
func UniqRands(quantity int, maxval int) []int {
if maxval < quantity {
quantity = maxval
}
intSlice := make([]int, maxval)
for i := 0; i < maxval; i++ {
intSlice[i] = i
}
// 每次從[i, maxval] 中篩選 1 個元素,放到位置 i 中
for i := 0; i < quantity; i++ {
j := rand.Int()%maxval + i
// swap
intSlice[i], intSlice[j] = intSlice[j], intSlice[i]
maxval--
}
return intSlice[0:quantity]
}
推薦閱讀
喜歡本文的朋友,歡迎關注「Go語言中文網」:
Go語言中文網啟用微信學習交流群,歡迎加微信:274768166,投稿亦歡迎