MIT 6.824 Lab1:MapReduce
Paper: MapReduce(2004) 一种编程模式,屏蔽了分布式系统的复杂,由一个master来分配任务,同时也需要RPC调用
遇到问题:不知道如何分配给reduce worker需要读取的文件
理解与解决:对于每一个执行map的worker A来说,A需要读取一个确定的文件file1,将每一个不同的key-value对(即一个单词->1)写入Nreduce个中间文件之一,Nreduce由用户指出,至于写入哪一个文件,需要用hash(key)% Nreduce 来获得该文件索引。For example, 第x个map任务,对于值为key的键值对(ihash(key)%Nreduce = Y),需要写入mr-X-Y.txt的中间文件中。
对于执行reduce任务的worker B
- 他需要读取所有 mr-*-r.txt的文件,r为这个reduce任务的id
- sort,根据key值排序,相同的key值就被放在一起了
- 调用reduce函数
- 将结果写入mr-out-r.txt
关于Linux下的timeout,在MacOS中没有对应的命令,使用brew安装coreutils,再在lab1的测试脚本中使用alias timeout=gtimeout,作为替代
第一次进行测试后,发现测试脚本只启动了3个worker,所以我的代码还需修改,worker完成当前分配的任务后,继续接受master的assign指令。
经过修改,单个worker可以不断向master请求,完成所有的mapreduce任务,同时加入了任务的超时判断,即在master的数据结构中增加了一个map,从任务号到任务开始时间的映射。但是测试脚本发现出现了map concurrent write,原来是新增加的map结构未加锁。
再次测试遇到问题,没有通过reduce任务的并发测试。现象十分奇怪,单独测试reduce的并发没有问题,但是只要加了word count的测试脚本,reduce任务就timeout。
为了检测worker crash,使用heartbeat(参考大佬vtta)来清除宕机的worker,同时增加了worker的唯一id生成函数,使用tempfile写入reduce的任务
2021/5/12 update
重构了下代码,没有使用心跳来保证worker的正常运行,而是只在master中增加对注册worker的定时器。如果worker在规定时间内没有RPC请求,master会删除有关worker的数据结构,当worker发来迟到的RPC时,master不予处理。
main/mrworker.go中会调用Worker函数开启worker来请求任务(mapjob、reducejob),处理逻辑如下:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
registReply := RegistResp{}
if ok := call("Master.RegisterWorker", &RegistArgs{}, ®istReply); !ok {
return
}
workerID := registReply.WorkerID
// Your worker implementation here.
// send call to request task
tryAccessTimes := 0
for {
args := RequestArgs{-1, -1, workerID}
reply := ResponseType{}
if ok := call("Master.AcceptWorker", &args, &reply); !ok {
fmt.Println("worker > request failed, sleep...")
time.Sleep(100 * time.Millisecond)
tryAccessTimes++
if tryAccessTimes > 5 {
fmt.Println("worker > cannot access master. Quit")
return
}
continue
}
tryAccessTimes = 0
if reply.NReduce == -1 {
fmt.Println("worker > exit")
return
}
// fmt.Println("apply job success")
switch reply.JobType {
case MAP: // map job
mapJob(&reply, mapf)
case REDUCE:
reduceJob(&reply, reducef)
}
}
}
处理逻辑中定义了三种RPC(但是只使用了两种请求args)
const (
MAP = 1
REDUCE = 2
)
type RegistArgs struct {
}
type RegistResp struct {
WorkerID int
}
// RequestArgs worker请求的类型
type RequestArgs struct {
TaskNum int // 返回被分配的任务索引,初始请求时为空
JobType int // 任务类型 1-map 2-reduce
WorkerID int // 这个worker的ID
}
// master的应答
type ResponseType struct {
NReduce int // master的用户参数,取模用
JobType int
BucketName string // 分配的任务名称 (map任务需要读取的文件名)
TaskNum int // 任务号
// 1、对于map任务,master数据结构中的files的下标 + 1
// 2、对于reduce任务,taskNum在 [0, NReduce) 区间内
}
这边使用了counter函数生成全局唯一的worker ID。
func counter() (f func() int) {
i := 0
return func() int {
i += 1
return i
}
}
// generate a unique id for a worker or a Job
var uniqueID = counter()
mapjob的处理
func mapJob(reply *ResponseType,
mapf func(string, string) []KeyValue) {
// 打开原始文件
file, err := os.Open(reply.BucketName)
if err != nil {
log.Fatalf("cannot open %v", reply.BucketName)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", reply.BucketName)
}
file.Close()
// 调用用户的map函数
kva := mapf(reply.BucketName, string(content))
// 对于每个key值,划分为nReduce个组
var groups = make([][]KeyValue, reply.NReduce)
for _, kv := range kva {
gid := ihash(kv.Key) % reply.NReduce
groups[gid] = append(groups[gid], kv)
}
// 将中间文件写入disk,注意写入的是NReduce个不同文件
for index, kvs := range groups {
// 创建或打开intermediate文件
filename := "mr-" + strconv.Itoa(reply.TaskNum) + "-" + strconv.Itoa(index)
// file, _ = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
file, err := ioutil.TempFile(".", "mr-")
if err != nil {
log.Fatalln("cannot create temporary file")
}
enc := json.NewEncoder(file)
for _, kv := range kvs {
err := enc.Encode(&kv)
if err != nil {
os.Remove(file.Name())
// call for failure
log.Fatal("map write file error")
}
}
os.Rename(file.Name(), filename)
if err != nil {
log.Fatalln("cannot rename to", filename)
}
log.Printf("%s created.", filename)
}
rpArgs := RequestArgs{}
rpArgs.JobType = MAP
rpArgs.TaskNum = reply.TaskNum
rpReply := ResponseType{}
call("Master.WorkerFinished", &rpArgs, &rpReply) // TODO: 考虑失败的worker
}
reducejob的处理
func reduceJob(reply *ResponseType,
reducef func(string, []string) string) {
// 读取所有属于taskNum的mr-X-taskNum.txt文件 到 intermediate
var intermediate []KeyValue
for i := 0; ; i++ { // i对应map任务号
filename := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskNum)
file, err := os.Open(filename)
if err != nil {
break
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
file.Close()
}
// sort
sort.Sort(ByKey(intermediate))
// 注意!使用临时文件,防止残缺文件被写入!
oname := fmt.Sprintf("mr-out-%v", reply.TaskNum)
ofile, err := ioutil.TempFile(".", "mr-")
// oname := "mr-out-" + strconv.Itoa(reply.TaskNum)
// ofile, err := os.Create(oname)
if err != nil {
log.Fatalln("cannot create temporary file")
}
fmt.Printf("worker > inter len : %v\n", len(intermediate))
// call the user define reduce function
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { // the same key has been sorted to be together
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
err = ofile.Close()
if err != nil {
log.Fatalln("cannot close", oname)
}
err = os.Rename(ofile.Name(), oname)
if err != nil {
log.Fatalln("cannot rename to", oname)
}
rpArgs := RequestArgs{}
rpArgs.JobType = REDUCE
rpArgs.TaskNum = reply.TaskNum
rpReply := ResponseType{}
call("Master.WorkerFinished", &rpArgs, &rpReply) // 目前,未考虑失败的worker
}
master中保存的数据结构
var (
mu sync.Mutex
)
// State 为job的状态
type State int
const (
IDLE = 0
INPROGRESS = 1
COMPLETE = 2
)
type Master struct {
// Your definitions here.
mapState map[int]State // 序号 -》状态
fileNames []string // 序号 -> 文件名
mapWorker map[int]int // 序号 -》workerid
mapDone bool
nReduce int // 需要启动的reduce worker数量,也是每个map任务需要写入的文件数量
reduceState map[int]State // 一个nReduce大小的切片,记录reduce任务完成的情况, 0-未分配,1-已分配,2-已完成
reduceWorker map[int]int // 任务号 -》 worker
reduceDone bool
timers map[int]*time.Timer // 计时器,映射为workerID到timer
}
master对三种RPC的处理
用于worker注册的
// RegisterWorker worker注册,生成定时器goroutine
func (m *Master) RegisterWorker(args *RegistArgs, reply *RegistResp) error {
mu.Lock()
defer mu.Unlock()
workerID := uniqueID()
reply.WorkerID = workerID
m.timers[workerID] = time.NewTimer(time.Second * 10)
log.Printf("Master > new worker %v register", workerID)
// 为每个worker分配一个定时器线程
go func(worker int, timer <-chan time.Time) {
<- timer // 如果定时器超时了
mu.Lock()
defer mu.Unlock()
delete(m.timers, worker) // 删除定时器
for jobid, wid := range m.mapWorker {
if wid == worker {
m.mapState[jobid] = IDLE
delete(m.mapWorker, jobid)
log.Printf("Master > map worker %v time out", worker)
}
}
for jobid, wid := range m.reduceWorker {
if wid == worker {
m.reduceState[jobid] = IDLE
delete(m.reduceWorker, jobid)
log.Printf("Master > reduce worker %v time out", worker)
}
}
}(workerID, m.timers[workerID].C)
return nil
}
用于worker请求任务的:
// AcceptWorker PRC响应 接受worker的请求任务请求
func (m *Master) AcceptWorker(args *RequestArgs, reply *ResponseType) error {
mu.Lock()
defer mu.Unlock()
reply.NReduce = m.nReduce
if !m.mapDone { // map未完成
mapjobid := m.assignMapJob(args.WorkerID)
if mapjobid == -1 {
return fmt.Errorf("please apply job again")
}
reply.JobType = MAP
reply.BucketName = m.fileNames[mapjobid]
reply.TaskNum = mapjobid
m.mapWorker[mapjobid] = args.WorkerID
} else if !m.reduceDone{
rdTaskNum := m.assignReduceJob(args.WorkerID)
if rdTaskNum == -1 {
reply.NReduce = -1 // 告诉worker不要再申请任务了
return fmt.Errorf("no job available")
}
reply.JobType = REDUCE
reply.TaskNum = rdTaskNum
}
return nil // success assigned
}
用与worker告知结束任务的
// WorkerFinished 回应worker完成工作
// 对于timeout的worker,即使worker完成了任务,由于在master保存的数据结构中找不到对应的worker
func (m *Master) WorkerFinished(args *RequestArgs, reply *ResponseType) error {
mu.Lock()
defer mu.Unlock()
if args.JobType == MAP {
_, ok := m.mapWorker[args.TaskNum]
if !ok {
return fmt.Errorf("Map Worker timeout, job : %v", m.fileNames[args.TaskNum])
}
m.mapState[args.TaskNum] = COMPLETE
delete(m.mapWorker, args.TaskNum)
fmt.Printf("Map job" + m.fileNames[args.TaskNum] + " finish\n")
} else if args.JobType == REDUCE {
_, ok := m.reduceWorker[args.TaskNum]
if !ok {
return fmt.Errorf("Reduce worker timeout, job : %v", args.TaskNum)
}
m.reduceState[args.TaskNum] = COMPLETE
delete(m.reduceWorker, args.TaskNum)
fmt.Printf("Reduce job" + strconv.Itoa(args.TaskNum) + " finish\n")
}
return nil
}
master分配任务使用的函数
// 根据job类型来选择文件名
func (m *Master) assignMapJob(worker int) (job int){
mapComplete := true
for jobid, state := range m.mapState {
if state == IDLE {
job = jobid
m.mapState[jobid] = INPROGRESS
m.mapWorker[jobid] = worker
return
}
if state != COMPLETE {
mapComplete = false
}
}
if mapComplete {
m.mapDone = true
log.Println("map phase compelet")
}
return -1
}
func (m *Master) assignReduceJob(worker int) (reduceNum int) {
reduceComplete := true
for jobid, state := range m.reduceState {
if state == IDLE {
reduceNum = jobid // 返回该reduce任务的编号,即这个reduce worker要读取mr-X-reduceNum.txt的中间文件
m.reduceState[jobid] = INPROGRESS
m.reduceWorker[jobid] = worker
return
}
if state != COMPLETE {
reduceComplete = false
}
}
if reduceComplete {
m.reduceDone = true
log.Println("reduce phase complete")
}
return -1
}