谭浩的博客

Simple is beauty.

用go实现MapReduce部分功能

本文是MIT分布式课程6.824实验一的总结。该课程主要介绍分布式系统,课程主要内容为阅读经典论文,并使用golang来实现一些经典的分布式系统。通过该地址可以访问线上课程。

实验准备

该实验需要了解golang的基本用法,通过学习A tour of go可以快速的了解go的基本用法。这里有一个要点是要理解并使用go中的channel,channel可以用来在线程之间进行沟通,类似于管道。你可以从一个goroutine中将一个值发送给一个channel,并在另一个goroutine中获取channel中的值。

其读和写的基本操作如下:

1
2
3
ch := make(chan int) //创建一个chan
ch <- 1 //发送1到chan中
v := <- ch //从chan中读取值

下面是用channel实现的互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Mutex struct{
l chan int
}

//创建一个锁
func NewMutex() *Mutex{
mutex := &Mutex{make(chan int, 1)}
mutex.UnLock()
return mutex
}

func (m *Mutex) Lock() {
<- m.l
}

func (m *Mutex) UnLock() {
m.l <- 1
}

doMap 和 doReduce函数

doMap 和 doReduce分别管理map和task任务。

doMap函数签名如下:
func doMap(jobName string, mapTask int, inFile string, nReduce int, mapF func(string, string) []KeyValue,)

该函数接受一个需要处理的文件,使用用户自定义的mapF函数对文件内容进行处理,并将处理结果写入nReduce个文件传给Reduce处理。

  • 读取文件并用用户自定义的map函数将内容处理为<interKey, interValue>格式的中间键值对
  • 对interKey哈希求余后分别写入nReduce个文件中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 读取文件,并处理产生键值对
cont, _ := ioutil.ReadFile(inFile)
contents := string(cont)
interkeys := mapF(inFile, contents)

reduceFiles := make([]*os.File, nReduce)
for i := 0; i < nReduce; i++ {
file, _ := os.OpenFile(reduceName(jobName, mapTask, i), os.O_CREATE|os.O_WRONLY, 0666)
reduceFiles[i] = file
defer file.Close()
}

// 将键值对写入nReduce个文件中
for _, keyValue := range interkeys {
r := ihash(keyValue.Key) % nReduce
file := reduceFiles[r]
enc := json.NewEncoder(file)
enc.Encode(&keyValue)
}

doReduce函数签名如下:
func doReduce(jobName string, reduceTask int, outFile string, nMap int,reduceF func(key string, values []string) string,)
该函数执行Reduce任务,读取Map阶段产生的中间键值对,并将键按序排列好之后,将同一个键的值聚合起来形成<key, >格式,最后调用用户自定义的Reduce函数计算出最后的结果,将结果写入一个输出文件中。

  • 首先读取Map任务产生的文件
  • 将读取的中间键值对进行排序处理
  • 相同键的值聚合后传给用户自定义的函数执行,最终将结果写到输出文件中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 读取Map阶段产生的文件
var keyValues []KeyValue
for i := 0; i < nMap; i++ {
interFile := reduceName(jobName, i, reduceTask)
file, ferr := os.Open(interFile)
if ferr != nil {
panic(ferr)
}
defer file.Close()
br := bufio.NewReader(file)
for {
line, _, next := br.ReadLine()
var tmp KeyValue
json.Unmarshal(line, &tmp)
if next == io.EOF {
break
}
keyValues = append(keyValues, tmp)
}
}
// 排序
sort.Slice(keyValues, func(i, j int) bool {
return keyValues[i].Key < keyValues[j].Key
})

oFile, _ := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY, 0666)
defer oFile.Close()
enc := json.NewEncoder(oFile)

//处理键值对
tmp := keyValues[0].Key
var values []string
for _, keyValue := range keyValues {
if keyValue.Key == tmp {
// 聚合操作
values = append(values, keyValue.Value)
} else {
// 写入文件
enc.Encode(KeyValue{tmp, reduceF(tmp, values)})
values = []string{keyValue.Value}
tmp = keyValue.Key
}
}
enc.Encode(KeyValue{tmp, reduceF(tmp, values)})

实现调度函数

schedule函数将Map和Reduce任务分配给空闲的worker执行。
函数签名为func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string)

该函数从chan中获取空闲的worker的地址,然后将任务非配给空闲的worker执行,每个任务都在goroutine中执行,实现并发执行,最后等待所有的goroutine任务执行结束。

  • 使用for循环启动需要执行的任务
  • 使用goroutine执行每个任务
  • 等待所有任务执行结束
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    var doTaskArgs = DoTaskArgs{JobName: jobName, Phase: phase, NumOtherPhase: n_other}

    var wg sync.WaitGroup
    wg.Add(ntasks)

    for i := 0; i < ntasks; i++ {
    if mapPhase == mapPhase {
    doTaskArgs.File = mapFiles[i]
    }
    doTaskArgs.TaskNumber = i
    doTaskArgs := doTaskArgs
    fmt.Println("------------" + phase + "--------------------")
    workerAddress := <-registerChan
    fmt.Printf("我得到地址了%v %s", i, workerAddress)
    go func() {
    //调度任务
    call(workerAddress, "Worker.DoTask", doTaskArgs, nil)
    wg.Done()
    registerChan <- workerAddress
    }()
    }
    wg.Wait()

调度函数容错处理

schedule调度的任务可能会发生错误,需要将发生错误的任务在一个空闲的worker上重新执行。这里可以对发生错误的任务使用递归的方式在一个新的worker执行。

1
2
3
4
5
6
7
8
9
10
11
12
var exec func(string, DoTaskArgs)
exec = func(workerAddress string, doTaskArgs DoTaskArgs) {
result := call(workerAddress, "Worker.DoTask", doTaskArgs, nil)
if result == false {
wg.Add(1)
// 在一个新的worker上重新执行该任务
go exec(<-registerChan, doTaskArgs)
}
wg.Done()
registerChan <- workerAddress
}
go exec(workerAddress, doTaskArgs)

总结

以上实验基本实现了MapReduce中的Map阶段和Reduce阶段,并实现了简易的任务调度功能。该实验还包含词频统计和倒排索引生成的MapReduce程序编写。可以访问我的github仓库获取该部分内容代码。