Hadoop 基础

集群的最主要瓶颈?

磁盘 IO。

Hadoop 运行模式?

  • 独立(本地)运行模式:无需任何守护进程,所有的程序都运行在同一个 JVM 上执行。在独立模式下调试 MR 程序非常高效方便。所以一般该模式主要是在学习或者开发阶段调试使用。
  • 伪分布式模式:Hadoop 守护进程运行在本地机器上,模拟一个小规模的集群,换句话说,可以配置一台机器的 Hadoop 集群,伪分布式是完全分布式的一个特例。
  • 完全分布式模式:Hadoop 守护进程运行在一个集群上。

注意:所谓分布式要启动守护进程,即:使用分布式 Hadoop 时,要先启动一些准备程序进程,然后才能使用比如 start-dfs.shstart-yarn.sh。而本地模式不需要启动这些守护进程。

三种模式的集群必须配置信息:

下面详细分析配置三种模式的“集群”所需要的必须配置。可以配置完,体验一把,就可以主观地感受三种之间的区别。

组件名称 属性名称 本地模式 伪分布式 完全分布式
Common fs.defaultFs file:///(默认) hdfs://localhost/ hdfs://namenode
HDFS dfs.replication N/A 1 3(默认)
MapReduce mapreduce.framework.name local(默认) yarn yarn
Yarn yarn.resoucemanager.hostname
yarn.nodemanager.auxservice
N/A
N/A
localhost
mapreduce_shuffle
resoucemanager
maperduce_shuffle

注意:在本地模式下,将使用本地文件系统和本地 MapReduce 运行器。在分布式模式下,将启动 HDFS 和 YARN 守护进程。

Hadoop 生态圈的组件并做简要描述?

  • Zookeeper:是一个开源的分布式应用程序协调服务,基于 Zookeeper 可以实现同步服务,配置维护,命名服务。
  • Flume:一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
  • Hbase:是一个分布式的、面向列的开源数据库,利用 Hadoop HDFS 作为其存储系统。
  • Hive:基于 Hadoop 的一个数据仓库工具,可以将结构化的数据档映射为一张数据库表,并提供简单的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运行。
  • Sqoop:将一个关系型数据库中的数据导进到 Hadoop 的 HDFS 中,也可以将 HDFS 的数据导进到关系型数据库中。

解释“Hadoop”和“Hadoop 生态系统”两个概念?

Hadoop 是指 Hadoop 框架本身;Hadoop 生态系统,不仅包含 Hadoop,还包括保证 Hadoop 框架正常高效运行其他框架,比如 Zookeeper、Flume、Hbase、Hive、Sqoop 等辅助框架。

请列出正常工作的 Hadoop 集群中 Hadoop 都分别需要启动哪些进程,它们的作用分别是什么?

  • NameNode:它是 Hadoop 中的主服务器,管理文件系统名称空间和对集群中存储的文件的访问,保存有 metadate。
  • SecondaryNameNode:它不是 NameNode 的冗余守护进程,而是提供周期检查点和清理任务。帮助 NameNode 合并 editslog,减少 NameNode 启动时间。
  • DataNode:它负责管理连接到节点的存储(一个集群中可以有多个节点)。每个存储数据的节点运行一个 DataNode 守护进程。
  • ResourceManager(JobTracker):JobTracker 负责调度 DataNode 上的工作。每个 DataNode 有一个 TaskTracker,它们执行实际工作。
  • NodeManager(TaskTracker):执行任务。
  • DFSZKFailoverController:高可用时它负责监控 NameNode 的状态,并及时的把状态信息写入 Zookeeper。它通过一个独立线程周期性的调用 NameNode 上的一个特定接口来获取 NameNode 的健康状态。FC 也有选择谁作为 Active NameNode 的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。
  • JournalNode:高可用情况下存放 NameNode 的 EditLog 文件。

NameNode 在启动的时候会做哪些操作?

NameNode 数据存储在内存和本地磁盘,本地磁盘数据存储在 fsimage 镜像文件和 edits 编辑日志文件。

  • 首次启动 NameNode

    1. 格式化文件系统,为了生成 fsimage 镜像文件;
    2. 启动 NameNode;
      1. 读取 fsimage 文件,将文件内容加载进内存;
      2. 等待 DataNode 注册与发送 block report。
    3. 启动 DataNode;
      1. 向 NameNode 注册;
      2. 发送 block report;
      3. 检查 fsimage 中记录的块的数量和 block report 中的块的总数是否相同。
    4. 对文件系统进行操作(创建目录,上传文件,删除文件等)。
      1. 此时内存中已经有文件系统改变的信息,但是磁盘中没有文件系统改变的信息,此时会将这些改变信息写入 edits 文件中,edits 文件中存储的是文件系统元数据改变的信息。
  • 第二次启动 NameNode

    1. 读取 fsimage 和 edits 文件;
    2. 将 fsimage 和 edits 文件合并成新的 fsimage 文件;
    3. 创建新的 edits 文件,内容为空;
    4. 启动 DataNode。

DataNode 了解吗,它的工作机制是怎样的?

  1. 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,时间戳。
  2. DataNode 启动后向 NameNode 进行注册,通过后,周期性(1 小时)地向 NameNode 上报所有的块信息;
  3. 心跳每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令,如复制块数据到另一台机器,或删除某个数据块。如果超过 10 分钟没有收到某个 DataNode 的心跳,则认为该节点不可用;
  4. 集群运行中可以安全加入和退出一些机器。

Secondary NameNode 了解吗,它的工作机制是怎样的?

Secondary NameNode 目的是合并 NameNode 的 edit logs 到 fsimage 文件中,是减少 NameNode 启动时间,它的具体工作机制:

  1. 第一阶段:NameNode 启动
    1. 第一次启动 NameNode 格式化后,创建 fsimage 和 edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存;
    2. 客户端对元数据进行增删改的请求;
    3. NameNode 记录操作日志,更新滚动日志;
    4. NameNode 在内存中对数据进行增删改查。
  2. 第二阶段:Secondary NameNode 工作
    1. Secondary NameNode 询问 NameNode 是否需要 checkpoint。直接带回 NameNode 是否检查结果;
    2. Secondary NameNode 请求执行 checkpoint;
    3. NameNode 滚动正在写的 edits 日志
    4. 将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode;
    5. Secondary NameNode 加载编辑日志和镜像文件到内存,并合并;
    6. 生成新的镜像文件 fsimage.chkpoint;
    7. 拷贝 fsimage.chkpoint 到 NameNode;
    8. NameNode 将 fsimage.chkpoint 重新命名成 fsimage 所以如果 NameNode 中的元数据丢失,是可以从 Secondary NameNode 恢复一部分元数据信息的,但不是全部,因为 NameNode 正在写的 edits 日志还没有拷贝到 Secondary NameNode,这部分恢复不了。

Secondary NameNode 不能恢复 NameNode 的全部数据,那如何保证 NameNode 数据存储安全?

这个问题就要说 NameNode 的高可用了,即 NameNode HA 一个 NameNode 有单点故障的问题,那就配置双 NameNode,配置有两个关键点,一是必须要保证这两个 NameNode 的元数据信息必须要同步的,二是一个 NameNode 挂掉之后另一个要立马补上。

  • 元数据信息同步在 HA 方案中采用的是“共享存储”。每次写文件时,需要将日志同步写入共享存储,这个步骤成功才能认定写文件成功。然后备份节点定期从共享存储同步日志,以便进行主备切换。

  • 监控 NameNode 状态采用 Zookeeper,两个 NameNode 节点的状态存放在 Zookeeper 中,另外两个 NameNode 节点分别有一个进程监控程序,实施读取 Zookeeper 中有 NameNode 的状态,来判断当前的 NameNode 是不是已经 down 机。如果 standby 的 NameNode 节点的 ZKFC 发现主节点已经挂掉,那么就会强制给原本的 active NameNode 节点发送强制关闭请求,之后将备用的 NameNode 设置为 active。

如果面试官再问 HA 中的共享存储是怎么实现的知道吗?

可以进行解释下:NameNode 共享存储方案有很多,比如 Linux HA,VMware FT,QJM 等,目前社区已经把由 Cloudera 公司实现的基于 QJM(Quorum Journal Manager)的方案合并到 HDFS 的 trunk 之中并且作为默认的共享存储实现基于 QJM 的共享存储系统主要用于保存 EditLog,并不保存 FSImage 文件。FSImage 文件还是在 NameNode 的本地磁盘上。

QJM 共享存储的基本思想来自于 Paxos 算法,采用多个称为 JournalNode 的节点组成的 JournalNode 集群来存储 EditLog。每个 JournalNode 保存同样的 EditLog 副本。每次 NameNode 写 EditLog 的时候,除了向本地磁盘写入 EditLog 之外,也会并行地向 JournalNode 集群之中的每一个 JournalNode 发送写请求,只要大多数(majority)的 JournalNode 节点返回成功就认为向 JournalNode 集群写入 EditLog 成功。如果有 2N+1 台 JournalNode,那么根据大多数的原则,最多可以容忍有 N 台 JournalNode 节点挂掉。

在 NameNode HA 中,会出现脑裂问题吗?怎么解决脑裂?

假设 NameNode1 当前为 Active 状态,NameNode2 当前为 Standby 状态。如果某一时刻 NameNode1 对应的 ZKFailoverController 进程发生了“假死”现象,那么 Zookeeper 服务端会认为 NameNode1 挂掉了,根据前面的主备切换逻辑,NameNode2 会替代 NameNode1 进入 Active 状态。但是此时 NameNode1 可能仍然处于 Active 状态正常运行,这样 NameNode1 和 NameNode2 都处于 Active 状态,都可以对外提供服务。这种情况称为脑裂。

脑裂对于 NameNode 这类对数据一致性要求非常高的系统来说是灾难性的,数据会发生错乱且无法恢复。Zookeeper 社区对这种问题的解决方法叫做 fencing,中文翻译为隔离,也就是想办法把旧的 Active NameNode 隔离起来,使它不能正常对外提供服务。在进行 fencing 的时候,会执行以下的操作:

  1. 首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它转换为 Standby 状态。
  2. 如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施,Hadoop 目前主要提供两种隔离措施,通常会选择 sshfence:
    • sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死;
    • shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离。

补充:ZKFailoverController 主要职责

  • 健康监测:周期性的向它监控的 NameNode 发送健康探测命令,从而来确定某 个 NameNode 是否处于健康状态,如果机器宕机,心跳失败,那么 ZKFC 就会标记它处于一个不健康的状态;
  • 会话管理:如果 NameNode 是健康的,ZKFC 就会在 Zookeeper 中保持一个打开的会话,如果 NameNode 同时还是 Active 状态的,那么 ZKFC 还会在 Zookeeper 中占有一个类型为短暂类型的 znode,当这个 NameNode 挂掉时,这个 znode 将会被删除,然后备用的 NameNode ,将会得到这把锁,升级为主 NameNode ,同时标记状态为 Active;
  • 当宕机的 NameNode 新启动时,它会再次注册 Zookeper,发现已经有 znode 锁了,便会自动变为 Standby 状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置 2 个 NameNode;
  • master 选举:如上所述,通过在 Zookeeper 中维持一个短暂类型的 znode,来实现抢占式的锁机制,从而判断那个 NameNode 为 Active 状态。

Hadoop HDFS

HDFS 中的 block 默认保存几份?

默认保存 3 份。

HDFS 默认 BlockSize 是多大?

Hadoop1.x 是 64MB,hadoop2.x 开始是 128MB。

负责 HDFS 数据存储的是哪一部分?

DataNode 负责数据存储

HDFS 组成架构?

架构主要由四个部分组成,分别为 HDFS Client、NameNode、DataNode 和 Secondary NameNode。

  • Client:就是客户端。
    • 文件切分。文件上传 HDFS 的时候,Client 将文件切分成一个一个的 Block,然后进行存储;
    • 与 NameNode 交互,获取文件的位置信息;
    • 与 DataNode 交互,读取或者写入数据;
    • Client 提供一些命令来管理 HDFS,比如启动或者关闭 HDFS;
    • Client 可以通过一些命令来访问 HDFS。
  • NameNode:就是 Master,它是一个主管、管理者。
    • 管理 HDFS 的名称空间;
    • 管理数据块(Block)映射信息;
    • 配置副本策略;
    • 处理客户端读写请求。
  • DataNode:就是 Slave。NameNode 下达命令,DataNode 执行实际的操作。
    • 存储实际的数据块;
    • 执行数据块的读/写操作。
  • Secondary NameNode:并非 NameNode 的热备。当 NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务。
    • 辅助 NameNode,分担其工作量;
    • 定期合并 fsimage 和 edits,并推送给 NameNode;
    • 在紧急情况下,可辅助恢复 NameNode。

文件大小设置,增大有什么影响?

HDFS 中的文件在物理上是分块存储(block),块的大小可以通过配置参数 dfs.blocksize 来规定,默认大小在 hadoop2.x 版本中是 128MB,老版本中是 64MB。

思考:为什么块的大小不能设置的太小,也不能设置的太大?

HDFS 的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。

如果寻址时间约为 10ms,而传输速率为 100MB/s,为了使寻址时间仅占传输时间的 1%,我们要将块大小设置约为 10ms×100×100M/s = 100MB。如图:

默认的块大小 128MB,增加文件块大小,需要增加磁盘的传输速率。

HDFS 的压缩算法?

企业开发用的比较多的是 snappy。

压缩算法 优点 缺点 应用场景
Gzip - 压缩比例比较高,而且压缩、解压速度比较快;
- hadoop 本身支持,在应用中处理 gzip 格式的文件就和直接处理文本一样;
- 大部分 linux 系统都自带 gzip 命令,使用方便。
不支持 split。 当每个文件压缩之后在 130M 以内的(1 个块大小内),都可以考虑用 gzip 压缩格式。
Bzip2 - 支持 split;
- 具有很高的压缩率,比 gzip 压缩率都高;
- hadoop 本身支持,但不支持 native;
- 在 linux 系统下自带 bzip2 命令,使用方便。
- 压缩/解压速度慢;
- 不支持 native。
- 适合对速度要求不高,但需要较高的压缩率的时候,可以作为 MapReduce 作业的输出格式;
- 或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;
- 或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持 split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。
Lzo - 压缩/解压速度也比较快,合理的压缩率;
- 支持 split,是 hadoop 中最流行的压缩格式;
- 可以在 linux 系统下安装 lzop 命令,使用方便。
- 压缩率比 gzip 要低一些;
- hadoop 本身不支持,需要安装;
- 在应用中对 lzo 格式的文件需要做一些特殊处理(为了支持 split 需要建索引,还需要指定 inputFormat 为 lzo 格式)。
一个很大的文本文件,压缩之后还大于 200MB 以上的可以考虑,而且单个文件越大,lzo 优点越越明显。
Snappy 高速压缩速度和合理的压缩率。 - 不支持 split;
- 压缩率比 gzip 要低;
- hadoop 本身不支持,需要安装。
- 当 MapReduce 作业的 Map 输出的数据比较大的时候,作为 Map 到 Reduce 的中间数据的压缩格式;
- 或者作为一个 MapReduce 作业的输出和另外一个 MapReduce 作业的输入。

HDFS 的存储机制?

HDFS 存储机制,包括 HDFS 的写入数据过程和读取数据过程两部分

HDFS 写数据过程

  1. 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在;
  2. NameNode 返回是否可以上传;
  3. 客户端请求第一个 block 上传到哪几个 DataNode 服务器上;
  4. NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3;
  5. 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成;
  6. dn1、dn2、dn3 逐级应答客户端;
  7. 客户端开始往 dn1 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位,dn1 收到一个 packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答;
  8. 当一个 block 传输完成之后,客户端再次请求 NameNode 上传第二个 block 的服务器。(重复执行 3~7 步)。

HDFS 读数据过程

  1. 客户端通过 Distributed FileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址;
  2. 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据;
  3. DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 packet 为单位来做校验);
  4. 客户端以 packet 为单位接收,先在本地缓存,然后写入目标文件。

可以参考下面的漫画进行理解:

  • HDFS 写数据

  • HDFS 读数据

  • 容错

  • 副本存放策略

Hadoop MapReduce

谈谈 Hadoop 序列化和反序列化及自定义 bean 对象实现序列化?

  • 序列化和反序列化

    • 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
    • 反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
    • Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable),精简、高效。
  • 自定义 bean 对象要想序列化传输步骤及注意事项:

    1. 必须实现 Writable 接口;
    2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造;
    3. 重写序列化方法;
    4. 重写反序列化方法;
    5. 注意反序列化的顺序和序列化的顺序完全一致;
    6. 要想把结果显示在文件中,需要重写 toString(),且用 \t 分开,方便后续用;
    7. 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程一定会对 key 进行排序。

MapTask 和 ReduceTask 工作机制(MapReduce 工作原理)?

MapTask 工作机制

  1. Read 阶段:Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value;
  2. Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value;
  3. Collect 收集阶段:在用户编写 map() 函数中,当数据处理完成后,一般会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中;
  4. Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作;
  5. Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

ReduceTask 工作机制

  1. Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中;
  2. Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多;
  3. Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此, ReduceTask 只需对所有数据进行一次归并排序即可;
  4. Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。

如何判定一个 job 的 map 和 reduce 的数量?

  • map 的数量

    map 个数的计算公式为:splitSize = max(minSize, min(maxSize, blockSize)),如果没有设置 minSize 和 maxSize,splitSize 的大小默认等于 blockSize,

    在 map 阶段读取数据前,FileInputFormat 会将输入文件分割成 split。split 的个数决定了 map 的个数。影响 map 个数,即 split 个数的因素主要有:

    • HDFS 块的大小,即 HDFS 中 dfs.block.size 的值。如果有一个输入文件为 1024MB,当块为 256MB 时,会被划分为 4 个 split;当块为 128MB 时,会被划分为 8 个 split。

    • 文件的大小。当块为 128MB 时,如果输入文件为 128MB,会被划分为 1 个 split;当块为 256MB,会被划分为 2 个 split。

    • 文件的个数。FileInputFormat 按照文件分割 split,并且只会分割大文件,即那些大小超过 HDFS 块的大小的文件。如果 HDFS 中 dfs.block.size 设置为 64MB,而输入的目录中文件有 100 个,则划分后的 split 个数至少为 100 个。

    • splitSize 的大小。分片是按照 splitSize 的大小进行分割的,一个 split 的大小在没有设置的情况下,默认等于 HDFS Block 的大小。但应用程序可以通过两个参数来对 splitSize 进行调节。

      • maxSize:split 的最大值,默认是 Long.MAX_VALUE。可以通过mapreduce.input.fileinputformat.split.maxsize 进行设置。
      • minSize:split 的最小值,该值可由两个途径设置:(1)通过子类重写方法 protected void setMinSplitSize(long minSplitSize) 进行设置。一般情况为 1,特殊情况除外;(2)通过配置文件中的 mapreduce.input.fileinputformat.split.minsize 进行设置。
  • reduce 数量

    reduce 的数量 job.setNumReduceTasks(x); x 为 reduce 的数量。不设置的话默认为 1。

在一个运行的 Hadoop 任务中,什么是 InputSplit?

FileInputFormat 源码解析 input.getSplits(job)

  1. 找到你数据存储的目录;
  2. 开始遍历处理(规划切片)目录下的每一个文件;
  3. 遍历第一个文件 ss.txt;
    1. 获取文件大小 fs.sizeOf(ss.txt);
    2. 计算切片大小 computeSliteSize(Math.max(minSize, Math.min(maxSize, blocksize))) = blocksize = 128MB
    3. 默认情况下,切片大小 = blocksize;
    4. 开始切,形成第 1 个切片:ss.txt—0:128MB,第 2 个切片 ss.txt—128:256MB,第 3 个切片 ss.txt—256MB:300MB(每次切片时,都要判断切完剩下的部分是否大于块的 1.1 倍,不大于 1.1 倍就划分为一块切片);
    5. 将切片信息写到一个切片规划文件中;
    6. 整个切片的核心过程在 getSplit() 方法中完成;
    7. 数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等;
    8. 注意:block 是 HDFS 上物理上存储的存储的数据,切片是对数据逻辑上的划分。
  4. 提交切片规划文件到 YARN 上,YARN 上的 MRAppMaster 就可以根据切片规划文件计算开启 MapTask 个数,一个 job 的 map 阶段 MapTask 并行度(个数),也是由客户端提交 job 时的切片(split)个数决定。

描述 MapReduce 有几种排序及排序发生的阶段?

排序的分类:

  • 部分排序:MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序;

  • 全排序:如何用 Hadoop 产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了 MapReduce 所提供的并行架构;

    替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为待分析文件创建 3 个分区,在第一分区中,记录的单词首字母 a-g,第二分区记录单词首字母 h-n, 第三分区记录单词首字母 o-z;

  • 辅助排序:(GroupingComparator 分组):MapReduce 框架在记录到达 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 任务且这些 map 任务在不同轮次中完成时间各不相同。一般来说,大多数 MapReduce 程序会避免让 reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序;

  • 二次排序:在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。

自定义排序 WritableComparable:

bean 对象实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。

1
2
3
4
5
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
排序发生的阶段:

  • 一个是在 map side 发生在 spill 后 partition 前;
  • 一个是在 reduce side 发生在 copy 后 reduce 前。

MapReduce 的 shuffle 过程?

Shuffle 机制:MapReduce 保证每个 reducer 的输入都是按键有序排列的,系统执行排序的过程(即将 map 输出作为输入传给 reducer)称为 shuffle。

分区,排序,溢写,拷贝到对应 reduce 机器上,增加 combiner,压缩溢写的文件。

如何优化 shuffle?

Map 端 shuffle

  • kvbuffer,默认是 100MB,可以通过参数 mapreduce.task.io.sort.mb 来修改,一般不修改;
  • 缓冲区阈值,一般是 80%,可以通过 mapreduce.map.sort.spill.percent 来修改;
  • 合并 spill 文件,mapreduce.task.io.sort.factor 属性配置每次最多合并多少个文件,默认为 10,即一次最多合并 10 个 spill 文件.如果 spill 文件数量大于 mapreduce.map.combiner.minspills 配置的数,则在合并文件写入之前,会再次运行 combiner。如果 spill 文件数量太少,运行 combiner 的收益可能小于调用的代价;
  • 对 map 输出进行压缩,在数据量大的时候,可以对 map 输出进行压缩,要启用压缩,将 mapreduce.map.output.compress 设为 true,并使用 mapreduce.map.output.compress.codec 设置使用的压缩算法。

Reduce 端 shuffle

  • copy 线程数量:copy 是用来从 map 任务中提取数据的,默认为 5 个 copy 线程,可以通是 mapreduce.reduce.shuffle.parallelcopies 配置。
  • 内存分配:如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给 reduce 函数,但是如果 reduce 函数对内存需求不是很高,将 mapreduce.reduce.merge.inmem.threshold(触发合并的 map 输出文件数)设为 0,mapreduce.reduce.input.buffer.percent(用于保存 map 输出文件的堆内存比例)设为1.0。

Partion 分区

Partion 作用主要是对 map 处理的数据进行分区,可以解决数据倾斜的问题。

  • 分区是根据 MR 的输出 <key,value> 进行分区的。默认情况下,MR 的输出只有一个分区,一个分区就是一个文件。
  • 自定义分区:继承 Partitioner,重写 getPartition 这个方法。
  • 如果没有定义 Partitioner,则使用默认的 partition 算法,即根据每一条数据的 key 的 hashcode 值摸运算(%)reduce 的数量,得到的数字就是“分区号”。

描述 MapReduce 中 Combiner 的作用是什么,一般使用情景,哪些情况不需要,以及和 Reduce 的区别?

  • Combiner 是一种特殊的 Reducer,它的意义就是对每一个 MapTask 的输出进行局部汇总,以减小网络传输量;
  • Combiner 在 Mapper 端执行一次合并,用于减少 Mapper 输出到 Reducer 的数量,可以调高效率;
  • Combiner 能够应用的前提是不能影响最终的业务逻辑,而且,Combiner 的输出 kv 应该跟 Reducer 的输入 kv 类型要对应起来;
  • Combiner 和 Reducer 的区别在于运行的位置;
    • Combiner 是在每一个 MapTask 所在的节点运行;
    • Reducer 是接收全局所有 Mapper 的输出结果。

Hadoop YARN

简述 hadoop1.x 与 hadoop2.x 的架构异同

  • 加入了 yarn 解决了资源调度的问题;
  • 加入了对 zookeeper 的支持实现比较可靠的高可用。

为什么会产生 yarn,它解决了什么问题,有什么优势?

  • Yarn 最主要的功能就是解决运行的用户程序与 yarn 框架完全解耦。
  • Yarn 上可以运行各种类型的分布式运算程序(mapreduce 只是其中的一种),比如 mapreduce、storm 程序,spark 程序等等。

yarn 包括什么组件,各自的作用是什么?

yarn 是一个资源管理、任务调度的框架。主要包含三个模块:ResourceManger、NodeManger、ApplicationMater。

  • ResourceManger:负责所有资源的监控、分配和管理;
  • ApplicationMater:负责每一个具体应用程序的调度和协调,用户提交的每个应用程序均包含一个 ApplicationMater,它可以运行在 ResourceManger 以外的机器上;
  • NodeManger:负责每一个节点的维护。

Hadoop 有哪些调度器?

  • 默认的调度器 FIFO:Hadoop 中默认的调度器,它先按照作业的优先级高低,再按照到达时间的先后选择被执行的作业。
  • 计算能力调度器 Capacity Scheduler:支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,同时考虑用户资源量限制和内存限制。
  • 公平调度器 Fair Scheduler:同计算能力调度器类似,支持多队列多用户,每个队列中的资源量可以配置,同一队列中的作业公平共享队列中所有资源。实际上,Hadoop 的调度器远不止以上三种,最近,出现了很多针对新型应用的 Hadoop 调度器。

什么是 Container?

Container 是一个抽象概念,称之为容器,包含任务运行时所需的资源(包括内存、硬盘、cpu 等)和环境(包含启动命令、环境变量等)。

yarn 的执行流程?

  1. 客户端向集群提交一个任务,该任务首先到 RM 中的 AM;
  2. AM 收到任务后,会在集群中找一个 NodeManger,在该 NodeManger 上启动一个 APPMaster 进程。该进程用于执行任务划分和任务监控;
  3. AppMaster 启动起来之后,会向 RM 中的 AM 注册信息,APPMaster 向 RM 下的 ResourceSchedule 申请计算任务所需的资源;
  4. AppMaster 申请到资源之后,会与所有 NodeManger 通信要求他们启动所有计算任务(Map 和 Reduce);
  5. 各个 NM 启动对应的容器 Container 用来执行 Map 和 Reduce 任务;
  6. 各个任务会向 APPMaster 汇报自己的执行进度和执行状况,以便让 AppMaster 随时掌握各个任务的运行状态,在某个任务出了问题之后重启执行该任务;
  7. 在执行完之后,APPMaster 会向 AM 汇报,以便让 ApplicationManger 注销并关闭自己,使得资源得以回收。

MapReduce 容错性?

1)MRAppMaster 容错性   一旦运行失败,由 YARN 的 ResourceManager 负责重新启动,最多重启次数可由用户设置,默认是 2 次。一旦超过最高重启次数,则作业运行失败。 2)Map/Reduce Task   Task Task 周期性向 MRAppMaster 汇报心跳;一旦 Task 挂掉,则 MRAppMaster 将为之重新申请资源,并运行之。最多重新运行次数可由用户设置,默认 4 次。

MapReduce 2.x 推测执行算法及原理?

  • 作业完成时间取决于最慢的任务完成时间
    • 一个作业由若干个 Map 任务和 Reduce 任务构成。因硬件老化、软件 Bug 等,某些任务可能运行非常慢。
    • 典型案例:系统中有 99% 的 Map 任务都完成了,只有少数几个 Map 老是进度很慢,完不成,怎么办?
  • 推测执行机制
    • 发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。
  • 不能启用推测执行机制情况
    • 任务间存在严重的负载倾斜;
    • 特殊任务,比如任务向数据库中写数据。
  • 算法原理

    假设某一时刻,任务 T 的执行进度为 progress,则可通过一定的算法推测出该任务的最终完成时刻 estimateEndTime。另一方面,如果此刻为该任务启动一个备份任务,则可推断出它可能的完成时刻 estimateEndTime,于是可得出以下几个公式:

    • estimateEndTime = estimatedRunTime + taskStartTime
    • estimatedRunTime = (currentTimestamp - taskStartTime)/progress
    • estimateEndTime = currentTimestamp + averageRunTime

    其中,currentTimestamp 为当前时刻;taskStartTime 为该任务的启动时刻;averageRunTime 为已经成功运行完成的任务的平均运行时间。这样,MRv2 总是选择 estimateEndTime- estimateEndTime 差值最大的任务,并为之启动备份任务。为了防止大量任务同时启动备份任务造成的资源浪费,MRv2 为每个作业设置了同时启动的备份任务数目上限。

    推测执行机制实际上采用了经典的算法优化方法:以空间换时间,它同时启动多个相同任务处理相同的数据,并让这些任务竞争以缩短数据处理时间。显然,这种方法需要占用更多的计算资源。在集群资源紧缺的情况下,应合理使用该机制,争取在多用少量资源的情况下,减少作业的计算时间。

Hadoop 调优

MapReduce 跑得慢的原因?

MapReduce 程序效率的瓶颈在于两点:

  • 计算机性能
    • CPU、内存、磁盘健康、网络
  • I/O 操作优化
    • 数据倾斜
    • map 和 reduce 数设置不合理
    • reduce 等待过久
    • 小文件过多
    • 大量的不可分块的超大文件
    • spill 次数过多
    • merge 次数过多等

MapReduce 优化方法?

  1. 数据输入

    • 合并小文件:在执行 mr 任务前将小文件进行合并,大量的小文件会产生大量的 map 任务,增大 map 任务装载次数,而任务的装载比较耗时,从而导致 mr 运行较慢。
    • 采用 CombineFileInputFormat 来作为输入,解决输入端大量小文件场景。
  2. map 阶段

    • 减少 spill 次数:通过调整 io.sort.mbsort.spill.percent参数值,增大触发 spill 的内存上限,减少 spill 次数,从而减少磁盘 IO。
    • 减少 merge 次数:通过调整 io.sort.factor 参数,增大 merge 的文件数目,减少 merge 的次数,从而缩短 mr 处理时间。
    • 在 map 之后先进行 combine 处理,减少 I/O。
  3. reduce 阶段

    • 合理设置 map 和 reduce 数:两个都不能设置太少,也不能设置太多。太少,会导致 task 等待,延长处理时间;太多,会导致 map、reduce 任务间竞争资源,造成处理超时等错误。
    • 设置 map、reduce 共存:调整 slowstart.completedmaps参数,使 map 运行到一定程度后,reduce 也开始运行,减少 reduce 的等待时间。
    • 规避使用 reduce,因为 reduce 在用于连接数据集的时候将会产生大量的网络消耗。
    • 合理设置 reduce 端的 buffer,默认情况下,数据达到一个阈值的时候,buffer 中的数据就会写入磁盘,然后 reduce 会从磁盘中获得所有的数据。也就是说,buffer 和 reduce 是没有直接关联的,中间多个一个写磁盘 -> 读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得 buffer 中的一部分数据可以直接输送到 reduce,从而减少 IO 开销:mapred.job.reduce.input.buffer.percent,默认为 0.0。当值大于 0 的时候,会保留指定比例的内存读 buffer 中的数据直接拿给 reduce 使用。这样一来,设置 buffer 需要内存,读取数据需要内存,reduce 计算也要内存,所以要根据作业的运行情况进行调整。
  4. IO 传输

    • 采用数据压缩的方式,减少网络 IO 的时间。安装 Snappy 和 LZOP 压缩编码器。
    • 使用 SequenceFile 二进制文件
  5. 数据倾斜问题

    1. 数据倾斜现象:
      • 数据频率倾斜:某一个区域的数据量要远远大于其他区域。
      • 数据大小倾斜:部分记录的大小远远大于平均值。
    2. 如何收集倾斜数据

      • 在 reduce 方法中加入记录 map 输出键的详细情况的功能。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public static final String MAX_VALUES = "skew.maxvalues";
      private int maxValueThreshold;

      @Override
      public void configure(JobConf job) {
      maxValueThreshold = job.getInt(MAX_VALUES, 100);
      }

      @Override
      public void reduce(Text key, Iterator<Text> values,
      OutputCollector<Text, Text> output,
      Reporter reporter) throws IOException {
      int i = 0;
      while (values.hasNext()) {
      values.next();
      i++;
      }
      if (++i > maxValueThreshold) {
      log.info("Received " + i + " values for key " + key);
      }
      }

    3. 减少数据倾斜的方法
      • 抽样和范围分区:可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
      • 自定义分区:另一个抽样和范围分区的替代方案是基于输出键的背景知识进行自定义分区。例如,如果 map 输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分 reduce 实例。而将其他的都发送给剩余的 reduce 实例。
      • Combine:使用 Combine 可以大量地减小数据频率倾斜和数据大小倾斜。在可能的情况下,Combine 的目的就是聚合并精简数据。

HDFS 小文件优化方法?

  1. HDFS 小文件弊端:

    HDFS 上每个文件都要在 NameNode 上建立一个索引,这个索引的大小约为 150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用 NameNode 的内存空间,另一方面就是索引文件过大是的索引速度变慢。

  2. 解决的方式:

    • Hadoop 本身提供了一些文件压缩的方案。
    • 从系统层面改变现有 HDFS 存在的问题,其实主要还是小文件的合并,然后建立比较快速的索引。
  3. Hadoop 自带小文件解决方案

    • Hadoop Archive:是一个高效地将小文件放入 HDFS 块中的文件存档工具,它能够将多个小文件打包成一个 HAR 文件,这样在减少 NameNode 内存使用的同时。
    • Sequence file:Sequence file 由一系列的二进制 key/value 组成,如果为 key 小文件名,value 为文件内容,则可以将大批小文件合并成一个大文件。
    • CombineFileInputFormat:CombineFileInputFormat 是一种新的 inputFormat,用于将多个文件合并成一个单独的 split,另外,它会考虑数据的存储位置。