MIT 6.824 Lab1:MapReduce

Paper: MapReduce(2004) 一种编程模式,屏蔽了分布式系统的复杂,由一个master来分配任务,同时也需要RPC调用

遇到问题:不知道如何分配给reduce worker需要读取的文件

理解与解决:对于每一个执行map的worker A来说,A需要读取一个确定的文件file1,将每一个不同的key-value对(即一个单词->1)写入Nreduce个中间文件之一,Nreduce由用户指出,至于写入哪一个文件,需要用hash(key)% Nreduce 来获得该文件索引。For example, 第x个map任务,对于值为key的键值对(ihash(key)%Nreduce = Y),需要写入mr-X-Y.txt的中间文件中。

对于执行reduce任务的worker B

  1. 他需要读取所有 mr-*-r.txt的文件,r为这个reduce任务的id
  2. sort,根据key值排序,相同的key值就被放在一起了
  3. 调用reduce函数
  4. 将结果写入mr-out-r.txt

关于Linux下的timeout,在MacOS中没有对应的命令,使用brew安装coreutils,再在lab1的测试脚本中使用alias timeout=gtimeout,作为替代

第一次进行测试后,发现测试脚本只启动了3个worker,所以我的代码还需修改,worker完成当前分配的任务后,继续接受master的assign指令。

经过修改,单个worker可以不断向master请求,完成所有的mapreduce任务,同时加入了任务的超时判断,即在master的数据结构中增加了一个map,从任务号到任务开始时间的映射。但是测试脚本发现出现了map concurrent write,原来是新增加的map结构未加锁。

再次测试遇到问题,没有通过reduce任务的并发测试。现象十分奇怪,单独测试reduce的并发没有问题,但是只要加了word count的测试脚本,reduce任务就timeout。

为了检测worker crash,使用heartbeat(参考大佬vtta)来清除宕机的worker,同时增加了worker的唯一id生成函数,使用tempfile写入reduce的任务

2021/5/12 update

重构了下代码,没有使用心跳来保证worker的正常运行,而是只在master中增加对注册worker的定时器。如果worker在规定时间内没有RPC请求,master会删除有关worker的数据结构,当worker发来迟到的RPC时,master不予处理。

main/mrworker.go中会调用Worker函数开启worker来请求任务(mapjob、reducejob),处理逻辑如下:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	registReply := RegistResp{}
	if ok := call("Master.RegisterWorker", &RegistArgs{}, ®istReply); !ok {
		return
	}
	workerID := registReply.WorkerID

	// Your worker implementation here.
	// send call to request task
	tryAccessTimes := 0
	for {
		args := RequestArgs{-1, -1, workerID}
		reply := ResponseType{}

		if ok := call("Master.AcceptWorker", &args, &reply); !ok {
			fmt.Println("worker > request failed, sleep...")
			time.Sleep(100 * time.Millisecond)
			tryAccessTimes++
			if tryAccessTimes > 5 {
				fmt.Println("worker > cannot access master. Quit")
				return
			}
			continue
		}

		tryAccessTimes = 0
		if reply.NReduce == -1 {
			fmt.Println("worker > exit")
			return
		}

		// fmt.Println("apply job success")
		switch reply.JobType {
		case MAP: // map job
			mapJob(&reply, mapf)
		case REDUCE:
			reduceJob(&reply, reducef)
		}
	}
}

处理逻辑中定义了三种RPC(但是只使用了两种请求args)

const (
	MAP = 1
	REDUCE = 2
)

type RegistArgs struct {

}

type RegistResp struct {
	WorkerID int
}

// RequestArgs worker请求的类型
type RequestArgs struct {
	TaskNum int // 返回被分配的任务索引,初始请求时为空
	JobType int // 任务类型 1-map 2-reduce
	WorkerID int // 这个worker的ID
}

// master的应答
type ResponseType struct {
	NReduce int // master的用户参数,取模用
	JobType int
	BucketName string // 分配的任务名称 (map任务需要读取的文件名)
	TaskNum int 	// 任务号 
					// 1、对于map任务,master数据结构中的files的下标 + 1
					// 2、对于reduce任务,taskNum在 [0, NReduce) 区间内
}

这边使用了counter函数生成全局唯一的worker ID。

func counter() (f func() int) {
	i := 0
	return func() int {
		i += 1
		return i
	}
}

// generate a unique id for a worker or a Job
var uniqueID = counter()

mapjob的处理

func mapJob(reply *ResponseType,
	mapf func(string, string) []KeyValue) {
	// 打开原始文件
	file, err := os.Open(reply.BucketName)
	if err != nil {
		log.Fatalf("cannot open %v", reply.BucketName)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", reply.BucketName)
	}
	file.Close()

	// 调用用户的map函数
	kva := mapf(reply.BucketName, string(content))

	// 对于每个key值,划分为nReduce个组
	var groups = make([][]KeyValue, reply.NReduce)
	for _, kv := range kva {
		gid := ihash(kv.Key) % reply.NReduce
		groups[gid] = append(groups[gid], kv)
	}

	// 将中间文件写入disk,注意写入的是NReduce个不同文件
	for index, kvs := range groups {
		// 创建或打开intermediate文件
		filename := "mr-" + strconv.Itoa(reply.TaskNum) + "-" + strconv.Itoa(index)
		// file, _ = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
		file, err := ioutil.TempFile(".", "mr-")
		if err != nil {
			log.Fatalln("cannot create temporary file")
		}

		enc := json.NewEncoder(file)
		for _, kv := range kvs {
			err := enc.Encode(&kv)
			if err != nil {
				os.Remove(file.Name())
				// call for failure
				log.Fatal("map write file error")
			}
		}
		os.Rename(file.Name(), filename)
		if err != nil {
			log.Fatalln("cannot rename to", filename)
		}
		log.Printf("%s created.", filename)
	}

	rpArgs := RequestArgs{}
	rpArgs.JobType = MAP
	rpArgs.TaskNum = reply.TaskNum

	rpReply := ResponseType{}

	call("Master.WorkerFinished", &rpArgs, &rpReply) // TODO: 考虑失败的worker

}

reducejob的处理

func reduceJob(reply *ResponseType,
	reducef func(string, []string) string) {
	// 读取所有属于taskNum的mr-X-taskNum.txt文件 到 intermediate
	var intermediate []KeyValue
	for i := 0; ; i++ { // i对应map任务号
		filename := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskNum)
		file, err := os.Open(filename)
		if err != nil {
			break
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
		file.Close()
	}

	// sort
	sort.Sort(ByKey(intermediate))

	// 注意!使用临时文件,防止残缺文件被写入!
	oname := fmt.Sprintf("mr-out-%v", reply.TaskNum)
	ofile, err := ioutil.TempFile(".", "mr-")
	// oname := "mr-out-" + strconv.Itoa(reply.TaskNum)
	// ofile, err := os.Create(oname)
	if err != nil {
		log.Fatalln("cannot create temporary file")
	}

	fmt.Printf("worker > inter len : %v\n", len(intermediate))
	// call the user define reduce function
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { // the same key has been sorted to be together
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	err = ofile.Close()
	if err != nil {
		log.Fatalln("cannot close", oname)
	}
	err = os.Rename(ofile.Name(), oname)
	if err != nil {
		log.Fatalln("cannot rename to", oname)
	}

	rpArgs := RequestArgs{}
	rpArgs.JobType = REDUCE
	rpArgs.TaskNum = reply.TaskNum

	rpReply := ResponseType{}

	call("Master.WorkerFinished", &rpArgs, &rpReply) // 目前,未考虑失败的worker
}

master中保存的数据结构

var (
	mu sync.Mutex
)

// State 为job的状态
type State int
const (
	IDLE = 0
	INPROGRESS = 1
	COMPLETE = 2
)

type Master struct {
	// Your definitions here.
	mapState map[int]State  // 序号 -》状态
	fileNames []string // 序号 -> 文件名
	mapWorker map[int]int  // 序号 -》workerid
	mapDone bool

	nReduce int // 需要启动的reduce worker数量,也是每个map任务需要写入的文件数量
	reduceState map[int]State // 一个nReduce大小的切片,记录reduce任务完成的情况, 0-未分配,1-已分配,2-已完成
	reduceWorker map[int]int // 任务号 -》 worker
	reduceDone bool

	timers map[int]*time.Timer  // 计时器,映射为workerID到timer
}

master对三种RPC的处理

用于worker注册的

// RegisterWorker worker注册,生成定时器goroutine
func (m *Master) RegisterWorker(args *RegistArgs, reply *RegistResp) error {
	mu.Lock()
	defer mu.Unlock()

	workerID := uniqueID()
	reply.WorkerID = workerID
	m.timers[workerID] = time.NewTimer(time.Second * 10)
	log.Printf("Master > new worker %v register", workerID)

	// 为每个worker分配一个定时器线程
	go func(worker int, timer <-chan time.Time) {
		<- timer // 如果定时器超时了
		mu.Lock()
		defer mu.Unlock()
		delete(m.timers, worker) // 删除定时器
		for jobid, wid := range m.mapWorker {
			if wid == worker {
				m.mapState[jobid] = IDLE
				delete(m.mapWorker, jobid)
				log.Printf("Master > map worker %v time out", worker)
			}
		}
		for jobid, wid := range m.reduceWorker {
			if wid == worker {
				m.reduceState[jobid] = IDLE
				delete(m.reduceWorker, jobid)
				log.Printf("Master > reduce worker %v time out", worker)
			}
		}
	}(workerID, m.timers[workerID].C)

	return nil
}

用于worker请求任务的:

// AcceptWorker PRC响应 接受worker的请求任务请求
func (m *Master) AcceptWorker(args *RequestArgs, reply *ResponseType) error {
	mu.Lock()
	defer mu.Unlock()
	
	reply.NReduce = m.nReduce
	if !m.mapDone { // map未完成
		mapjobid := m.assignMapJob(args.WorkerID)

		if mapjobid == -1 {
			return fmt.Errorf("please apply job again")
		}
		reply.JobType = MAP
		reply.BucketName = m.fileNames[mapjobid]
		reply.TaskNum = mapjobid
		
		m.mapWorker[mapjobid] = args.WorkerID

	} else if !m.reduceDone{
		rdTaskNum := m.assignReduceJob(args.WorkerID)

		if rdTaskNum == -1 {
			reply.NReduce = -1 // 告诉worker不要再申请任务了
			return fmt.Errorf("no job available")
		}
		reply.JobType = REDUCE
		reply.TaskNum = rdTaskNum
	}
	return nil // success assigned
}

用与worker告知结束任务的

// WorkerFinished 回应worker完成工作
// 对于timeout的worker,即使worker完成了任务,由于在master保存的数据结构中找不到对应的worker
func (m *Master) WorkerFinished(args *RequestArgs, reply *ResponseType) error {
	mu.Lock()
	defer mu.Unlock()

	if args.JobType == MAP {
		_, ok := m.mapWorker[args.TaskNum]
		if !ok {
			return fmt.Errorf("Map Worker timeout, job : %v", m.fileNames[args.TaskNum])
		}
		m.mapState[args.TaskNum] = COMPLETE
		delete(m.mapWorker, args.TaskNum)

		fmt.Printf("Map job" + m.fileNames[args.TaskNum] + " finish\n")
	} else if args.JobType == REDUCE {
		_, ok := m.reduceWorker[args.TaskNum]
		if !ok {
			return fmt.Errorf("Reduce worker timeout, job : %v", args.TaskNum)
		}
		m.reduceState[args.TaskNum] = COMPLETE
		delete(m.reduceWorker, args.TaskNum)

		fmt.Printf("Reduce job" + strconv.Itoa(args.TaskNum) + " finish\n")
	}
	return nil
}

master分配任务使用的函数

// 根据job类型来选择文件名
func (m *Master) assignMapJob(worker int) (job int){
	mapComplete := true
	for jobid, state := range m.mapState {
		if state == IDLE {
			job = jobid
			m.mapState[jobid] = INPROGRESS
			m.mapWorker[jobid] = worker
			return
		}
		if state != COMPLETE {
			mapComplete = false
		}
	}
	if mapComplete {
		m.mapDone = true
		log.Println("map phase compelet")
	}
	return -1
}

func (m *Master) assignReduceJob(worker int) (reduceNum int) {
	reduceComplete := true
	for jobid, state := range m.reduceState {
		if state == IDLE {
			reduceNum = jobid  // 返回该reduce任务的编号,即这个reduce worker要读取mr-X-reduceNum.txt的中间文件
			m.reduceState[jobid] = INPROGRESS
			m.reduceWorker[jobid] = worker
			return
		}
		if state != COMPLETE {
			reduceComplete = false
		}
	}
	if reduceComplete {
		m.reduceDone = true
		log.Println("reduce phase complete")
	}
	return -1
}