MapReduce
简介
MapReduce 是一个分布式计算的抽象。
抽象
MapReduce 的思想来自函数式编程。
对于这样的分布式计算过程(输入一组 KV 对,输出一组 KV 对),抽象为多个 Map 和 Reduce 过程,整个过程称为 Job ,每次 Map 或 Reduce 过程称为 Task, Map 和 Reduce 可以级联组合。用户实现整个计算过程,就要实现多个 Job 并组合成 Task 。
Map 和 Reduce 的形式化定义:
$\begin{equation}\begin{array}{lll}\operatorname{map} & (k 1, v 1) & \rightarrow \operatorname{list}(k 2, v 2) \\\text { reduce } & (k 2, \operatorname{list}(v 2)) & \rightarrow \operatorname{list}\left(v_{2}\right)\end{array}\end{equation}$
Map: (k1,v1) -> list(k2,v2)
Reduce: (k2,list(v2)) -> list(v2)
- Map
- 输入一个 KV 对,输出一组 KV 对作为中间结果,框架会将相同 Key 的 Value 组合起来一起传给 Reduce
- Reduce
- 输入一个 Key 和一组 Value ,输出一组可能更少的 Value
例子
map(String key, String value): / key: document name / value: document contents for each word w in value: EmitIntermediate(w, “1”);
reduce(String key, Iterator values): / key: a word / values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
过程
- 输入文件会被分为 M 个 Split,每个 Split 的大小通常在 16~64 MB 之间
- Job 包含 M 个Map Task 和 N 个 Reduce Task ,Master 分派这些 Task 给 Worker
- 收到 Map Task 的 Worker读入 Split 并执行 Map Task ,由 Map Task 产生的中间结果暂存在缓冲内存区
- 执行 Map Task 的同时,不断检查内存中的中间结果并将中间结果分到 R 个 Range 中,并将中间结果写入硬盘,然后将中间结果在硬盘的位置上报给 Master 。
- Master 将中间结果位置转发给 Reducer , Reducer 读取对应 Partition 的中间结果。在读取完毕后,Reducer 会对读取到的数据进行排序,令拥有相同键的 KV 对连续。
- Reducer 执行,Reduce 结果会被放入到对应的 Reduce Partition 结果文件
- Job 完成,输出 R 个文件。
M 和 R 的选取影响负载均衡。
容错
Worker 故障
Master 会周期地 Ping 每一个 Worker ,如果某个 Worker 在一段时间内没有响应,Master 就会认为这个 Worker 已经不可用。
任何分配给该 Worker 的 Map 任务,无论是正在运行还是已经完成,都需要由 Master 重新分配给其他 Worker,因为该 Worker 不可用也意味着存储在该 Worker 本地磁盘上的中间结果也不可用了。Master 也会将这次重试通知给所有 Reducer,没能从原本的 Mapper 上完整获取中间结果的 Reducer 便会开始从新的 Mapper 上获取数据。
如果有 Reduce 任务分配给该 Worker,Master 则会选取其中尚未完成的 Reduce 任务分配给其他 Worker。鉴于 Google MapReduce 的结果是存储在 Google File System 上的,已完成的 Reduce 任务的结果的可用性由 Google File System 提供,因此 MapReduce Master 只需要处理未完成的 Reduce 任务即可。
Master 故障
整个 MapReduce 集群中只会有一个 Master 结点,因此 Master 失效的情况并不多见。
Master 结点在运行时会周期性地将集群的当前状态作为保存点(Checkpoint)写入到磁盘中。Master 进程终止后,重新启动的 Master 进程即可利用存储在磁盘中的数据恢复到上一次保存点的状态。
故障与语义
当 Map 和 Reduce 是纯函数时,Job 的执行结果应该等同于串行执行结果。
一个 Reduce 可能被多个机器执行,Reduce 完成时 Worker 将临时文件重命名,需要由底层文件系统保证重命名是原子性的。
当 Map 和 Reduce 不是纯函数时。。。(略过,能理解但是没必要细说,TOOD)
性能优化
局部性
在 Google 内部所使用的计算环境中,机器间的网络带宽是比较稀缺的资源,需要尽量减少在机器间过多地进行不必要的数据传输。
Google MapReduce 采用 Google File System 来保存输入和结果数据,因此 Master 在分配 Map 任务时会从 Google File System 中读取各个 Block 的位置信息,并尽量将对应的 Map 任务分配到持有该 Block 的 Replica 的机器上;如果无法将任务分配至该机器,Master 也会利用 Google File System 提供的机架拓扑信息将任务分配到较近的机器上。
M 和 R
master 需要进行 $O(M+R)$ 次调度和 O(M*R) 的空间存储状态。
Combiner
在某些情形下,用户定义的 Map 任务可能会产生大量重复的中间结果。Google MapReduce 允许用户在Map上的Combiner函数:Mapper会对中间结果自动执行Combiner进行局部合并,减少产生的临时数据量,以计算换存储。
比如 WordCount 中大量 KV 对相同。
划分函数
用户可能需要自定义划分函数
任务冗余
如果集群中有某个 Worker 花了特别长的时间来完成最后的几个 Map 或 Reduce 任务,整个 MapReduce 计算任务的耗时就会因此被拖长,这样的 Worker 也就成了落后者(Straggler)。
MapReduce 在整个计算完成到一定程度时就会将剩余的任务进行冗余,即同时将其分配给其他空闲 Worker 来执行,并在其中一个 Worker 完成后将该任务视作已完成。
掉队的原因可能是不可用,也可能是 Worker 上的 Task 对同种资源争用过于激烈。