這個Lab並不是真的讓我們去實現MapReduce的分布式並行框架,而是通過本地開啟不同的進程(線程),來模擬集群執行MapReduce程序。
課程相關文獻建議優先使用官方的相關資料,進行完成課程,本文章僅供參考。
•MIT 6.824 分布式系統課程官網mit 6.824 Distributed Systems[1]•MIT6.824 分布式系統實驗一 MapReduce mit 6.824 Lab1 MapReduce[2]•谷歌MapReduce論文MapReduce Paper[3]
原始碼•作者代碼倉庫連結🔗[4]
https://gitee.com/eddievim/eddievim_-mit6.824_lab
定義角色•
Master(調度)
•
Worker(進行具體的Map、Reduce)(默認初始化3個)
流程•通過Master角色接收到用戶傳進來的文件路徑,找到需要進行MapReduce處理的原始文件。•我們這邊簡單對輸入進行任務切分(一個文件對應一個MapTask)。•若輸入文件有M個,則開啟M個MapTask,Master調度Worker來領取任務並執行Map;•Map執行後的結果需要進行排序,並寫到磁碟中去;•Shuffle :將MapTask的輸出,改造成ReduceTask的輸入。•直至所有MapTask全部執行完畢後,開啟ReduceTask,ReduceTask的任務是到讀取MapTask產生的磁碟文件中,抓取出對應分區的Key出來,再將相同的Key與List(value)執行Reduce方法,得出結果後,最終再將結果寫出到結果文件中。
初始化工作定義各種數據結構,及數據的交換格式。
狀態的定義•-1: TaskWait• 0: JobOver•1: MapPhrase•2: ReducePhrase
const TaskWait, JobOver, MapPhrase, ReducePhrase = -1, 0, 1, 2
Mastertype Master struct {
Phrase int // 用於定義當前階段
NReduce int // ReduceTask並行度
NMap int // MapTask並行度
MapTaskQueue chan MapTask // 用於存儲MapTask的隊列,線程安全
ReduceTaskQueue chan ReduceTask // 用於存儲ReduceTask的隊列,線程安全
MapOutFilePaths [][]string // MapTask輸出的路徑
ReduceOutFilePaths []string // ReduceTask輸出的路徑
mutex sync.Mutex // 鎖,用於防止並發問題的產生
}
MapTasktype MapTask struct {
No int // 編號
Filepath string
}
ReduceTasktype ReduceTask struct {
No int // 編號
Filepaths []string
}
用於RPC通信的實體類type RPCArgs struct {
// 狀態
// 1: MapPhrase
// 2: ReducePhrase
Phrase int
OutPaths []string
}
type RPCReply struct {
// 狀態
// 1: MapPhrase
// 2: ReducePhrase
// 3:JobOver
Phrase int
HoldTask bool
MapTask MapTask
ReduceTask ReduceTask
}
任務調度Worker獲取任務在Worker處於空閒狀態(沒有任務正在執行時)進行調用。
Worker通過RPC調用Master的方法SendTask,獲取任務,如果獲取則進行執行,若為reply.HoldTask為false,則本次RPC調用,並沒有獲取到對應的任務。
// send task to worker
func (m *Master) SendTask(args *RPCArgs, reply *RPCReply) error {
m.mutex.Lock()
reply.Phrase = m.Phrase
m.mutex.Unlock()
reply.HoldTask = true
switch reply.Phrase {
case MapPhrase:
reply.MapTask = <-m.MapTaskQueue
case ReducePhrase:
reply.ReduceTask = <-m.ReduceTaskQueue
case TaskWait:
reply.HoldTask = false
default:
reply.HoldTask = false
}
return nil
}
Worker匯報任務在Worker完成一次任務的時候進行調用。
Worker通過RPC調用Master的ReportTask,匯報任務。
•若在MapTask階段,所有任務被處理完畢,則進入shuffle狀態。•若在ReduceTask階段,所有任務被處理完畢後,將狀態舍為JobOver,這將會通知Worker,使其結束進程。
// worker report task to master
func (m *Master) ReportTask(args *RPCArgs, reply *RPCReply) error {
m.mutex.Lock()
switch m.Phrase {
case MapPhrase:
m.MapOutFilePaths = append(m.MapOutFilePaths, args.OutPaths)
// Map階段完成了所有任務,進入Reduce階段
if len(m.MapOutFilePaths) == m.NMap {
m.Phrase = TaskWait
m.shuffle()
m.Phrase = ReducePhrase
}
case ReducePhrase:
m.ReduceOutFilePaths = append(m.ReduceOutFilePaths, args.OutPaths[0])
// Reduce階段結束
if len(m.ReduceOutFilePaths) == m.NReduce {
m.Phrase = JobOver
fmt.Println(m.Phrase)
}
}
m.mutex.Unlock()
return nil
}
階段的過渡Shuffle(MapPhrase -> ReducePhrase)將MapTask的輸出,將其「改造」為ReduceTask的輸入。將MapTask輸出的結果,根據不同的組,將同組的數據組合在一起。
// MapPhrase -> ReducePhrase
func (m *Master) shuffle() {
for i := 0; i < m.NReduce; i++ {
task := ReduceTask{
No: i,
}
for j := 0; j < m.NMap; j++ {
task.Filepaths = append(task.Filepaths, m.MapOutFilePaths[j][i])
}
m.ReduceTaskQueue <- task
}
}
Done用於判斷所有任務流程是否結束,只需要判斷Phrase是否是JobOver即可。
func (m *Master) Done() bool {
return m.Phrase == JobOver
}
Worker的工作模式持續對Master進行輪詢,從而持續地獲取任務及狀態,當到達JobOver階段時,則結束。
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply := getTask()
switch reply.Phrase {
case MapPhrase:
opaths, ok := MapProcess(mapf, reply)
if ok {
fmt.Printf("mapProcess %v OK.\n", reply.MapTask.No)
call("Master.ReportTask", &RPCArgs{MapPhrase, opaths}, reply)
} else {
fmt.Printf("mapProcess %v failed.\n", reply.MapTask.No)
}
case ReducePhrase:
path, ok := ReduceProcess(reducef, reply)
if ok {
fmt.Printf("ReduceProcess %v OK.\n", reply.ReduceTask.No)
call("Master.ReportTask", &RPCArgs{ReducePhrase, []string{path}}, reply)
} else {
fmt.Printf("ReduceProcess %v Fail.\n", reply.ReduceTask.No)
}
case TaskWait:
time.Sleep(time.Millisecond)
case JobOver:
return
}
}
}
其餘的操作剩下的步驟都是Worker獲取文件路徑後,對文件內容進行讀取,然後進行相應操作Mapf/Reducef,然後再輸出,持久化到磁碟中,這裡不過多闡述,需要的同學可以到我的Gitee代碼倉庫進行查看。連結🔗[5]
References[1] mit 6.824 Distributed Systems: https://pdos.csail.mit.edu/6.824/
[2] mit 6.824 Lab1 MapReduce: https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
[3] MapReduce Paper: http://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf
[4] 作者代碼倉庫連結🔗: https://gitee.com/eddievim/eddievim_-mit6.824_lab
[5] 連結🔗: https://gitee.com/eddievim/eddievim_-mit6.824_lab