当前位置: 首页 > news >正文

6.824lab1总结

目录

      • 总体概要
      • 核心结构体
      • coordinator思路:
        • 任务池管理
        • RPC函数
      • worker思路:
      • 实现细节

总体概要

程序主要由mrcoordinator.go、mrworker.go为启动模块。

  • mrcoordinator.go: 启动rpc服务,循环等待m.Done()为true时退出。
  • mrwoker.go:调用mr.worker(mapf, reducef)函数,执行map/reduce任务。

核心结构体

Coordinator(协调者)持有任务池,能够查看任务的完成情况。任务的状态主要分为三种:

  • “working”:正在执行
  • “success”:执行成功
  • “offline”:任务未开始 或 任务掉线
//任务池保存目前所有任务状态
type TaskPool struct {MapTasks         []MapTaskMapSuccessNum    int //map任务完成数ReduceTasks      []ReduceTaskReduceSuccessNum int //reduce任务完成数mutex sync.Mutex
}type MapTask struct {id       intFileName stringstatus   string //任务状态:  "working"、"success" "offline"mutex    sync.Mutex
}type ReduceTask struct {id     intstatus string //任务状态:  "working" 、"success"、"offline"mutex  sync.Mutex
}

coordinator思路:

任务池管理

调用CreateTaskPool函数初始化任务池,将所有任务分成map0,map1…reduce0,reduce1…。

c.taskPool = CreateTaskPool(files, nReduce)

创建Add…Task()函数用于添加相应的任务,将任务的状态变成"working"。

//添加Map任务 如果成功返回(序号,文件名,true)。 失败返回(0,"",false)
func (p TaskPool) AddMapTask() (idx int, fileName string, ok bool)
//添加Reduce任务 成功返回(reduce任务序列号,map任务总数,true)
func (p *TaskPool) AddReduceTask() (idx int, mapTaskNum int, ok bool)

RPC函数

  1. 任务请求:由worker调用,args暂时没用,返回reply为worker被分配的任务。
//RPC请求任务
func (c *Coordinator) RequestTask(args *RequestTaskArgs, reply *RequestTaskReply) error
  1. 成功执行通知:worker在成功执行已分配的任务后,会通过rpc告诉coordinator
//RPC通知执行成功
func (c *Coordinator) SuccessExecuteInfo(args *SuccessExecuteArgs, reply *SuccessExecuteReply) error

worker思路:

worker不断重复一个for循环:

  1. CallRequestTask() //通过rpc获取任务
  2. HandleMapTask()/HandleReduceTask() //处理对应的任务
  3. CallSuccessExecute(task.Id, task.TaskType) //通知coordinator任务已经完成

实现细节

问题1:由于reduce任务必须要在map任务之后去执行,所以需要解决在所有map任务都属于working或success状态时(map任务没有全部完成,但是所有的map任务都有人在做或已经完成),新来一个worker该怎么办。

解决方案:在加入map任务时若发现处于上面状态,返回特殊的返回值,如任务的内容fileName为空,这样worker通过返回值就知道worker属于冗余状态,worker便会休眠两秒,在两秒之后再去请求任务。

问题2:worker在获取任务之后挂掉了怎么办?
解决方案:在coordinator分配任务给worker时,同时开启一个goroutine用来检测worker是否在10s内完成任务。如果没有完成(任务的标志还是"working"),将任务强行下线。

//添加任务的同时创建goroutine,检测10s是否完成任务go func(p TaskPool, id int) {time.Sleep(10 * time.Second)p.MapTasks[id].mutex.Lock()if p.MapTasks[id].status == "working" {p.MapTasks[id].status = "offline"}p.MapTasks[id].mutex.Unlock()}(p, id)

问题3:worker挂掉之后新的worker接手任务之前的任务怎么办?要保证任务的正确结果。
解决方案:检测任务文件是否存在,如果存在则删除,后面再重新创建。

//检测:上次任务的遗留。判断是否存在,如果存在则删除 mr_reply.id_[0...nReduce-1]for i := 0; i < reply.NReduce; i++ {writeFileName := fmt.Sprintf("mr_%s_%s", strconv.Itoa(reply.Id), strconv.Itoa(i))if FileIsExists(writeFileName) {err := os.Remove(writeFileName)if err != nil {panic(err)}}}

问题4:单个worker如何解决全部map、reduce任务。
解决方案:worker跑在一个for循环上,for循环是否执行由一个bool型的变量Continue来决定。Continue的值由rpc通知coordinator任务完成时返回。如果整个任务没有完成则返回true,否则返回false。

var Continue bool = truefor Continue {Continue = falsetask := CallRequestTask() //rpc请求任务if task.TaskType == "map" {//map任务HandleMapTask(mapf, task)Continue = CallSuccessExecute(task.Id, task.TaskType)} else if task.TaskType == "reduce" {//reduce任务HandleReduceTask(reducef, task)Continue = CallSuccessExecute(task.Id, task.TaskType)} else {//map or reduce存在working状态time.Sleep(2 * time.Second)Continue = true}}
http://www.lryc.cn/news/16667.html

相关文章:

  • NIO蔚来 面试——IP地址你了解多少?
  • Gluten 首次开源技术沙龙成功举办,更多新能力值得期待
  • springboot+redis+lua实现限流
  • 线段树总结
  • 龙芯GS232(MIPS 32)架构cache管理笔记
  • js去重
  • 小白都能看懂的C语言入门教程
  • leetcode 21~30 学习经历
  • 让ArcMap变得更加强大,用python执行地理处理以及编写自定义脚本工具箱
  • SAP 项目实施阶段全过程
  • idea中的Maven导包失败问题解决总结
  • REDIS中的缓存穿透,缓存击穿,缓存雪崩原因以及解决方案
  • 数据库及缓存之MySQL(一)
  • 项目管理中,项目经理需要具备哪些能力?
  • itk中的一些图像处理
  • Endless lseek导致的SQL异常
  • JUC-day01
  • Mind+Python+Mediapipe项目——AI健身之跳绳
  • 数据库概述
  • 【已解决】解决IDEA的maven刷新依赖时出现Connot reconnect错误
  • 动态链接库(.so)文件的变编译和引用、执行
  • linux(centos8)文件解压命令
  • 阅读笔记6——通道混洗
  • 上海亚商投顾:沪指失守3300点 卫星导航概念全天强势
  • 疯狂的SOVA:Android银行木马“新标杆”
  • 汽车零部件企业数字工厂管理系统建设方案
  • 【线程同步工具】Semaphore源码解析
  • 获取实时天气
  • 【数据库】redis数据持久化
  • 前端编译、JIT编译、AOT编译