Spark 内核

Spark 的有几种部署模式,每种模式特点?

  • 本地模式:Spark 不一定非要跑在 Hadoop 集群,可以在本地,起多个线程的方式来指定。将 Spark 应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类:
    • local:只启动一个 Executor
    • local[k]:启动 k 个 Executor
    • local[*]:启动跟 CPU 数目相同的 Executor
  • standalone 模式:分布式部署集群,自带完整的服务,资源管理和任务监控是 Spark 自己监控,这个模式也是其他模式的基础。
  • Spark on Yarn 模式:分布式部署集群,资源和任务监控交给 YARN 管理,但是目前仅支持粗粒度资源分配方式,包含 cluster 和 client 运行模式,cluster 适合生产,Driver 运行在集群子节点,具有容错功能,client 适合调试,Driver 运行在客户端。
  • Spark On Mesos 模式:官方推荐这种模式(当然,原因之一是血缘关系)。正是由于 Spark 开发之初就考虑到支持 Mesos,因此,目前而言,Spark 运行在 Mesos 上会比运行在 YARN 上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
    • 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个 Driver 和若干个 Executor 组成,其中,每个 Executor 占用若干资源,内部可运行多个 Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
    • 细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos 还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。

Spark 为什么比 MapReduce 快?

  • 基于内存计算,减少低效的磁盘交互;
  • 高效的调度算法,基于 DAG;
  • 容错机制 Lineage,精华部分就是 DAG 和 Lineage。

简单说一下 Hadoop 和 Spark 的 shuffle 相同和差异?

  • 从 high-level 的角度来看,两者并没有大的差别。都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作)。
  • 从 low-level 的角度来看,两者差别不小。Hadoop MapReduce 是 sort-based,进入 combine 和 reduce 的 records 必须先 sort。这样的好处在于 combine/reduce 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey 的操作;如果你是 Spark 1.1 的用户,可以将 spark.shuffle.manager 设置为 sort,则会对数据进行排序。在 Spark 1.2 中,sort 将作为默认的 Shuffle 实现。
  • 从实现角度来看,两者也有不少差别。Hadoop MapReduce 将处理流程划分出明显的几个阶段:map,spill,merge,shuffle,sort,reduce 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation,所以 spill,merge,aggregate 等操作需要蕴含在 transformation 中。

如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?

shuffle write 由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

Spark 工作机制?Spark 应用程序的执行过程?

  1. 构建 Application 的运行环境,Driver 创建一个 SparkContext;
  2. SparkContext 向资源管理器(Standalone、Mesos、Yarn)申请 Executor 资源,资源管理器启动 StandaloneExecutorBackend(Executor);
  3. Executor 向 SparkContext 申请 Task;
  4. SparkContext 将应用程序分发给 Executor;
  5. SparkContext 就建成 DAG 图,DAG Scheduler 将 DAG 图解析成 Stage,每个 Stage 有多个 Task,形成 TaskSet 发送给 Task Scheduler,由 Task Scheduler 将 Task 发送给 Executor 运行;
  6. Task 在 Executor 上运行,运行完释放所有资源。

Spark 的优化怎么做?

Spark 调优比较复杂,但是大体可以分为三个方面来进行

  • 平台层面的调优:防止不必要的 jar 包分发,提高数据的本地性,选择高效的存储格式如 parquet;
  • 应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用 RDD 进行缓存,作业并行化执行等等;
  • JVM 层面的调优:设置合适的资源量,设置合理的 JVM,启用高效的序列化方法如 Kyro,增大 off-heap 内存等等;

数据本地性是在哪个环节确定的?

具体的 Task 运行的那台机器上,DAG 划分 Stage 的时候确定的。

RDD 的弹性表现在哪几点?

  • 自动的进行内存和磁盘的存储切换;
  • 基于 Lineage 的高效容错;
  • Task 如果失败会自动进行特定次数的重试;
  • Stage 如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
  • checkpoint 和 persist,数据计算之后持久化缓存;
  • 数据调度弹性,DAG TASK 调度和资源无关;
  • 数据分片的高度弹性。

RDD 有哪些缺陷?

  • 不支持细粒度的写和更新操作,Spark 写数据是粗粒度的。所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读。
  • 不支持增量迭代计算,Flink 支持

Spark 的 shuffle 过程?

从下面三点去展开

  • shuffle 过程的划分
  • shuffle 的中间结果如何存储
  • shuffle 的数据如何拉取过来

Spark 的数据本地性有哪几种?

Spark 中的数据本地性有三种:

  • PROCESS_LOCAL 是指读取缓存在本地节点的数据
  • NODE_LOCAL 是指读取本地节点硬盘数据
  • ANY 是指读取非本地节点数据

通常读取数据 PROCESS_LOCAL > NODE_LOCAL > ANY,尽量使数据以 PROCESS_LOCAL 或 NODE_LOCAL 方式读取。其中 PROCESS_LOCAL 还和 cache 有关,如果 RDD 经常用的话将该 RDD cache 到内存中,注意,由于 cache 是 lazy 的,所以必须通过一个 action 的触发,才能真正的将该 RDD cache 到内存中。

Spark 为什么要持久化,一般什么场景下要进行 persist 操作?

Spark 所有复杂一点的算法都会有 persist 身影,Spark 默认数据放在内存,Spark 很多内容都是放在内存的,非常适合高速迭代,1000 个步骤只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,RDD 出错或者分片可以根据血统算出来,如果没有对父 RDD 进行 persist 或者 cache 的话,就需要重头做。以下场景会使用 persist:

  • 某个步骤计算非常耗时,需要进行 persist 持久化;
  • 计算链条非常长,重新恢复要算很多步骤;
  • checkpoint 所在的 RDD 要持久化 persist。checkpoint 前要持久化,写个 rdd.cache 或者 rdd.persist,将结果保存起来,再写 checkpoint 操作,这样执行起来会非常快,不需要重新计算 RDD 链条了。checkpoint 之前一定会进行 persist;
  • shuffle 之后要 persist,shuffle 要进行网络传输,风险很大,数据丢失重来,恢复代价很大;
  • shuffle 之前进行 persist,框架默认将数据持久化到磁盘,这个是框架自动做的。

介绍一下 join 操作优化经验?

join 其实常见的就分为两类:map-side joinreduce-side join。当大表和小表 join 时,用 map-side join 能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

描述 Yarn 执行一个任务的过程?

Yarn 工作流

  1. 客户端 client 向 ResourceManager 提交 Application,ResourceManager 接受 Application 并根据集群资源状况选取一个 node 来启动 Application 的任务调度器 Driver(ApplicationMaster)。
  2. ResourceManager 找到那个 node,命令其该 node 上的 nodeManager 来启动一个新的 JVM 进程运行程序的 Driver(ApplicationMaster)部分,Driver(ApplicationMaster)启动时会首先向 ResourceManager 注册,说明由自己来负责当前程序的运行。
  3. Driver(ApplicationMaster)开始下载相关 jar 包等各种资源,基于下载的 jar 等信息决定向 ResourceManager 申请具体的资源内容。
  4. ResourceManager 接受到 Driver(ApplicationMaster)提出的申请后,会最大化的满足资源分配请求,并发送资源的元数据信息给 Driver(ApplicationMaster)。
  5. Driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元数据信息发指令给具体机器上的 NodeManager,让其启动具体的 Container。
  6. NodeManager 收到 Driver 发来的指令,启动 Container,Container 启动后必须向 Driver(ApplicationMaster)注册。
  7. Driver(ApplicationMaster)收到 Container 的注册,开始进行任务的调度和计算,直到任务完成。

注意:如果 ResourceManager 第一次没有能够满足 Driver(ApplicationMaster)的资源请求 ,后续发现有空闲的资源,会主动向 Driver(ApplicationMaster)发送可用资源的元数据信息以提供更多的资源用于当前程序的运行。

Spark on Yarn 模式有哪些优点?

  • 与其他计算框架共享集群资源(Spark 框架与 MapReduce 框架同时运行,如果不用 Yarn 进行资源分配,MapReduce 分到的内存资源会很少,效率低下);资源按需分配,进而提高集群资源利用等。
  • 相较于 Spark 自带的 Standalone 模式,Yarn 的资源分配更加细致。
  • Application 部署简化,例如 Spark,Storm 等多种框架的应用由客户端提交后,由 Yarn 负责资源的管理和调度,利用 Container 作为资源隔离的单位,以它为单位去使用内存、CPU 等。
  • Yarn 通过队列的方式,管理同时运行在 Yarn 集群中的多个服务,可根据不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。

谈谈你对 Container 的理解?

  • Container 作为资源分配和调度的基本单位,其中封装了的资源如内存,CPU,磁盘,网络带宽等。目前 YARN 仅仅封装内存和 CPU;
  • Container 由 ApplicationMaster 向 ResourceManager 申请的,由 ResourceManager 中的资源调度器异步分配给 ApplicationMaster;
  • Container 的运行是由 ApplicationMaster 向资源所在的 NodeManager 发起的,Container 运行时需提供内部执行的任务命令。

Spark 使用 parquet 文件存储格式能带来哪些好处?

  • 如果说 HDFS 是大数据时代分布式文件系统首选标准,那么 parquet 则是整个大数据时代文件存储格式实时首选标准。
  • 速度更快:从使用 Spark SQL 操作普通文件 CSV 和 parquet 文件速度对比上看,绝大多数情况会比使用 csv 等普通文件速度提升 10 倍左右,在一些普通文件系统无法在 Spark 上成功运行的情况下,使用 parquet 很多时候可以成功运行。
  • parquet 的压缩技术非常稳定出色,在 Spark SQL 中对压缩技术的处理可能无法正常的完成工作(例如会导致 lost task,lost executor)但是此时如果使用 parquet 就可以正常的完成。
  • 极大的减少磁盘 IO,通常情况下能够减少 75% 的存储空间,由此可以极大的减少 Spark SQL 处理数据的时候的数据输入内容,尤其是在 Spark1.6x 中有个下推过滤器在一些情况下可以极大的减少磁盘的 IO 和内存的占用。
  • Spark 1.6x parquet 方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度 Spark1.6x 和 Spark1.5x 相比而言,提升了大约 1 倍的速度,在 Spark1.6x 中,操作 parquet 时候 CPU 也进行了极大的优化,有效的降低了 CPU 消耗。
  • 采用 parquet 可以极大的优化 Spark 的调度和执行。我们测试 Spark 如果用 parquet 可以有效的减少 Stage 的执行消耗,同时可以优化执行路径。

介绍 partition 和 block 有什么关联关系?

  • HDFS 中的 block 是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的 block 大小,便于快速找到、读取对应的内容;
  • Spark 中的 partition 是弹性分布式数据集 RDD 的最小单元,RDD 是由分布在各个节点上的 partition 组成的。partition 是指的 Spark 在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的 partition 大小不一,数量不定,是根据 application 里的算子和最初读入的数据分块数量决定;
  • block 位于存储空间、partition 位于计算空间,block 的大小是固定的、partition 大小是不固定的,是从 2 个不同的角度去看数据。

不需要排序的 hash shuffle 是否一定比需要排序的 sort shuffle 速度快?

不一定,当数据规模小,hash shuffle 快于 sorted shuffle,数据规模大的时候;当数据量大,sorted shuffle 会比 hash shuffle 快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x 之前 Spark 使用 hash,适合处理中小规模,1.x 之后,增加了 sorted shuffle,Spark 更能胜任大规模处理了。

Sort-based shuffle 的缺陷?

  • 如果 mapper 中 task 的数量过大,依旧会产生很多小文件,此时在 shuffle 传递数据到 reducer 的过程中,reduce 会需要同时大量的记录进行反序列化,导致大量的内存消耗和 GC 的巨大负担,造成系统缓慢甚至崩溃。
  • 如果需要在分片内也进行排序,此时需要进行 mapper 和 reducer 的两次排序。

spark.storage.memoryFraction 参数的含义,实际生产中如何调优?

  • 用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6,默认 Executor 60% 的内存,可以用来保存持久化的 RDD 数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘;

  • 如果持久化操作比较多,可以提高 spark.storage.memoryFraction 参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果 shuffle 的操作比较多,有很多的数据读写操作到 JVM 中,那么应该调小一点,节约出更多的内存给 JVM,避免过多的 JVM GC 发生。在 Web UI 中观察如果发现 GC 时间很长,可以设置 spark.storage.memoryFraction 更小一点。

介绍一下你对 Unified Memory Management 内存管理模型的理解?

Spark 中的内存使用分为两部分:执行(execution)与存储(storage)。执行内存主要用于 shuffles、joins、sorts 和 aggregations,存储内存则用于缓存或者跨节点的内部数据传输。1.6 之前,对于一个 Executor,内存都由以下部分构成:

  • ExecutionMemory:这片内存区域是为了解决 shuffles,joins,sorts 和 aggregations 过程中为了避免频繁 IO 需要的 buffer。通过 spark.shuffle.memoryFraction(默认 0.2)配置。
  • StorageMemory:这片内存区域是为了解决 block cache(就是你显示调用 rdd.cache,rdd.persist 等方法),还有就是 broadcasts 以及 task results 的存储。可以通过参数 spark.storage.memoryFraction(默认 0.6)设置。
  • OtherMemory:给系统预留的,因为程序本身运行也是需要内存的(默认为 0.2)。

传统内存管理的不足:

  • Shuffle 占用内存 0.2*0.8,内存分配这么少,可能会将数据 spill 到磁盘,频繁的磁盘 IO 是很大的负担,Storage 内存占用 0.6,主要是为了迭代处理。传统的 Spark 内存分配对操作人的要求非常高。(Shuffle 分配内存:ShuffleMemoryManager,TaskMemoryManager,ExecutorMemoryManager)一个 Task 获得全部的 Execution 的 Memory,其他 Task 过来就没有内存了,只能等待;
  • 默认情况下,Task 在线程中可能会占满整个内存,分片数据特别大的情况下就会出现这种情况,其他 Task 没有内存了,剩下的 cores 就空闲了,这是巨大的浪费。这也是人为操作的不当造成的;
  • MEMORY_AND_DISK_SER 的 storage 方式,获得 RDD 的数据是一条条获取,iterator 的方式。如果内存不够(spark.storage.unrollFraction),unroll 的读取数据过程,就是看内存是否足够,如果足够,就下一条。unroll 的 space 是从 Storage 的内存空间中获得的。unroll 的方式失败,就会直接放磁盘;
  • 默认情况下,Task 在 spill 到磁盘之前,会将部分数据存放到内存上,如果获取不到内存,就不会执行。永无止境的等待,消耗 CPU 和内存;

​在此基础上,Spark 提出了 UnifiedMemoryManager,不再分 ExecutionMemory 和 Storage Memory,实际上还是分的,只不过是 Execution Memory 访问 Storage Memory,Storage Memory 也可以访问 Execution Memory,如果内存不够,就会去借。

Spark 有哪两种算子?

​Transformation(转化)算子和 Action(执行)算子。

Spark 有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

​在我们的开发过程中,能避免则尽可能避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业,可以大大减少性能开销。

如何从 Kafka 中获取数据?

  • 基于 Receiver 的方式

    这种方式使用 Receiver 来获取数据。Receiver 是使用 Kafka 的高层次 Consumer API 来实现的。receiver 从 Kafka 中获取的数据都是存储在 Spark Executor 的内存中的,然后 Spark Streaming 启动的 job 会去处理那些数据。

  • 基于 Direct 的方式

    ​这种新的不基于 Receiver 的直接方式,是在 Spark 1.3 中引入的,从而能够确保更加健壮的机制。替代掉使用 Receiver 来接收数据后,这种方式会周期性地查询 Kafka,来获得每个 topic + partition 的最新的 offset,从而定义每个 batch 的 offset 的范围。当处理数据的 job 启动时,就会使用 Kafka 的简单 Consumer API 来获取 Kafka 指定 offset 范围的数据。

RDD 创建有哪几种方式?

  • 使用程序中的集合创建 RDD
  • 使用本地文件系统创建 RDD
  • 使用 HDFS 创建 RDD
  • 基于数据库 db 创建 RDD
  • 基于 NoSQL 创建 RDD,如 HBase
  • 基于 s3 创建 RDD
  • 基于数据流,如 socket 创建 RDD

Spark 并行度怎么设置比较合适?

​Spark 并行度,每个 core 承载 2~4 个 partition,如 32 个 core,那么 64~128 之间的并行度,也就是设置 64~128 个 partition,并行读和数据规模无关,只和内存使用量和 CPU 使用时间有关。

Spark 如何处理不能被序列化的对象?

​将不能序列化的内容封装成 object。

collect 功能是什么,其底层是怎么实现的?

​Driver 通过 collect 把集群中各个节点的内容收集过来汇总成结果,collect 返回结果是 Array 类型的,collect 把各个节点上的数据抓过来,抓过来数据是 Array 型,collect 对 Array 抓过来的结果进行合并,合并后 Array 中只有一个元素,是 tuple 类型(K-V 类型)的。

为什么 Spark Application 在没有获得足够的资源,job 就开始执行了,可能会导致什么什么问题发生?

​会导致执行该 job 的时候集群资源不足,导致执行 job 结束也没有分配足够的资源,分配了部分 Executor,该 job 就开始执行 task,应该是 task 的调度线程和 Executor 资源申请是异步的;如果想等待申请完所有的资源再执行 job 的,​需要将 ​spark.scheduler.maxRegisteredResourcesWaitingTime 设置的很大;​spark.scheduler.minRegisteredResourcesRatio 设置为 1,但是应该结合实际考虑,​否则很容易出现长时间分配不到资源,job 一直不能运行的情况。

map 与 flatMap 的区别?

  • map:对 RDD 每个元素转换,文件中的每一行数据返回一个数组对象。
  • flatMap:对 RDD 每个元素转换,然后再扁平化。

​将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为 null 的值。

Spark on Mesos 中,什么是的粗粒度分配,什么是细粒度分配,各自的优点和缺点是什么?

  • 粗粒度:启动时就分配好资源,程序启动,后续具体使用就使用分配好的资源,不需要再分配资源;好处:作业特别多时,资源复用率高,适合粗粒度;不好:容易资源浪费,假如一个 job 有 1000 个 Task,完成了 999 个,还有一个没完成,那么使用粗粒度,999 个资源就会闲置在那里,资源浪费。
  • 细粒度分配:用资源的时候分配,用完了就立即回收资源,启动会麻烦一点,启动一次分配一次,会比较麻烦。

Driver 的功能是什么?

  • 一个 Spark 作业运行时包括一个 Driver 进程,也是作业的主进程,具有 main 函数,并且有 SparkContext 的实例,是程序的入口点;
  • 功能:负责向集群申请资源,向 master 注册信息,负责了作业的调度,负责作业的解析、生成 Stage 并调度 Task 到 Executor 上。包括 DAG Scheduler,Task Scheduler。

Spark 技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?

​可以画一个技术栈图先,然后分别解释下每个组件的功能和场景

  • Spark Core:是其它组件的基础,Spark 的内核,主要包含:DAG、RDD、Lineage、Cache、broadcast 等,并封装了底层通讯框架,是 Spark 的基础。
  • Spark Streaming:是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如 Kafka、Flume、Twitter、Zero 和 TCP Socket)进行类似 Map、Reduce 和 Join 等复杂操作,将流式计算分解成一系列短小的批处理作业。
  • Spark SQL:Shark 是 SparkSQL 的前身,Spark SQL 的一个重要特点是其能够统一处理关系表和 RDD,使得开发人员可以轻松地使用 SQL 命令进行外部查询,同时进行更复杂的数据分析。
  • BlinkDB:是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
  • MLBase:是 Spark 生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用 MLBase。MLBase 分为四部分:MLlib、MLI、ML Optimizer 和 MLRuntime。
  • GraphX:是 Spark 中用于图和图并行计算。

Spark 中 Worker 的主要工作是什么?

​主要功能:管理当前节点内存,CPU 的使用状况,接收 master 分配过来的资源指令,通过 ExecutorRunner 启动程序分配任务,Worker 就类似于包工头,管理分配新进程,做计算的服务,相当于 process 服务。

​需要注意的是:

  • Worker 不会汇报当前信息给 master,worker 心跳给 master 主要只有 workid,它不会发送资源信息以心跳的方式给 mater,master 分配的时候就知道 Worker,只有出现故障的时候才会发送资源。
  • Worker 不会运行代码,具体运行的是 Executor 是可以运行具体 Application 写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。

MapReduce 和 Spark 的都是并行计算,那么他们有什么相同和区别?

​两者都是用 MR 模型来进行并行计算::

  • Hadoop 的一个作业称为 job,job 里面分为 map task 和 reduce task,每个 task 都是在自己的进程中运行的,当 task 结束时,进程也会结束。
  • Spark 用户提交的任务成为 Application,一个 Application 对应一个 SparkContext,Application 中存在多个 job,每触发一次 action 操作就会产生一个 job。这些 job 可以并行或串行执行,每个 job 中有多个 Stage,Stage 是 shuffle 过程中 DAG Scheduler 通过 RDD 之间的依赖关系划分 job 而来的,每个 Stage 里面有多个 Task,组成 TaskSet 由 Task Scheduler 分发到各个 Executor 中执行,Executor 的生命周期是和 Application 一样的,即使没有 job 运行也是存在的,所以 Task 可以快速启动读取内存进行计算。
  • Hadoop 的 job 只有 map 和 reduce 操作,表达能力比较欠缺而且在 MR 过程中会重复的读写 HDFS,造成大量的 IO 操作,多个 job 需要自己管理关系。
  • Spark 的迭代计算都是在内存中进行的,API 中提供了大量的 RDD 操作如 join,groupBy 等,而且通过 DAG 图可以实现良好的容错。

RDD 机制?

​RDD 分布式弹性数据集,简单的理解成一种数据结构,是 Spark 框架上的通用货币。所有算子都是基于 RDD 来执行的,不同的场景会有不同的 RDD 实现类,但是都可以进行互相转换。RDD 执行过程中会形成 DAG 图,然后形成 Lineage 保证容错性等。从物理的角度来看 RDD 存储的是 block 和 node 之间的映射。

什么是 RDD 宽依赖和窄依赖?

​RDD 和它依赖的 parent RDD(s) 的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

  • 窄依赖指的是每一个 parent RDD 的 Partition 最多被子 RDD 的一个 Partition 使用
  • 宽依赖指的是多个子 RDD 的 Partition 会依赖同一个 parent RDD 的 Partition。

cache 和 persist 的区别?

​cache 和 persist 都是用于将一个 RDD 进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间

  • cache 只有一个默认的缓存级别 MEMORY_ONLY,cache 调用了 persist,而 persist 可以根据情况设置其它的缓存级别;
  • Executor 执行的时候,默认 60% 做 cache,40% 做 task 操作,persist 是最根本的函数,最底层的函数。

cache 后面能不能接其他算子,它是不是 action 操作?

  • cache 可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发 cache。
  • cache 不是 action 操作

reduceByKey 是不是 action?

​不是,很多人都会以为是 action,reduce 是 action。

RDD 通过 Lineage(记录数据更新)的方式为何很高效?

  • lazy 记录了数据的来源,RDD 是不可变的,且是 lazy 级别的,且 RDD 之间构成了链条,lazy 是弹性的基石。由于 RDD 不可变,所以每次操作就产生新的 RDD,不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯是上一个 Stage 的结束,要么就 checkpoint。
  • 记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说 RDD 是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合,写或者修改操作,都是基于集合的 RDD 的写操作是粗粒度的,RDD 的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
  • 简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景如网络爬虫,现实世界中,大多数写是粗粒度的场景。

为什么要进行序列化序列化?

​可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是在使用的时候要反序列化,非常消耗 CPU。

Yarn 中的 Container 是由谁负责销毁的,在 Hadoop MapReduce 中 Container 可以复用么?

​ApplicationMaster 负责销毁,在 Hadoop MapReduce 不可以复用,在 Spark on Yarn 程序 Container 可以复用。

提交任务时,如何指定 Spark Application 的运行模式?

  • cluster 模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode cluster xx.jar
  • client 模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode client xx.jar

不启动 Spark 集群 Master 和 Worker 服务,可不可以运行 Spark 程序?

​可以,只要资源管理器第三方管理就可以,如由 Yarn 管理,Spark 集群不启动也可以使用 Spark;Spark 集群启动的是 Worker 和 Master,这个其实就是资源管理框架,Yarn 中的 ResourceManager 相当于 Master,NodeManager 相当于 Worker,做计算是 Executor,和 Spark 集群的 Worker 和 Manager 可以没关系,归根接底还是 JVM 的运行,只要所在的 JVM 上安装了 Spark 就可以。

Spark on Yarn Cluster 模式下,ApplicationMaster 和 Driver 是在同一个进程么?

​是,Driver 位于 ApplicationMaster 进程中。该进程负责申请资源,还负责监控程序、资源的动态情况。

运行在 Yarn 中 Application 有几种类型的 Container?

  • 运行 ApplicationMaster 的 Container:这是由 ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的 ApplicationMaster 所需的资源;
  • 运行各类任务的 Container:这是由 ApplicationMaster 向 ResourceManager 申请的,并由 ApplicationMaster 与 NodeManager 通信以启动之。

Executor 启动时,资源通过哪几个参数指定?

  • num-executors:Executor 的数量;
  • executor-memory:每个 Executor 使用的内存;
  • executor-cores:每个 Executor 分配的 CPU。

列出你所知道的调度器,说明其工作原理

  • FIFO Schedular:默认的调度器,先进先出;
  • Capacity Schedular:计算能力调度器,选择占用内存小,优先级高的;
  • Fair Schedular:公平调度器,所有 job 占用相同资源。

导致 Executor 产生 FULL GC 的原因,可能导致什么问题?

​可能导致 Executor 僵死问题,海量数据的 shuffle 和数据倾斜等都可能导致 FULL GC。以 shuffle 为例,伴随着大量的 shuffle 写操作,JVM 的新生代不断 GC,Eden Space 写满了就往 Survivor Space 写,同时超过一定大小的数据会直接写到老生代,当新生代写满了之后,也会把老的数据搞到老生代,如果老生代空间不足了,就触发 FULL GC,还是空间不够,那就 OOM 错误了,此时线程被 Blocked,导致整个 Executor 处理数据的进程被卡住。

Spark 累加器有哪些特点?

  • 累加器在全局唯一的,只增不减,记录全局集群的唯一状态;
  • 在 Executor 中修改它,在 Driver 读取;
  • Executor 级别共享的,广播变量是 task 级别的共享,两个 Application 不可以共享累加器,但是同一个 Application 不同的 job 可以共享。

HashPartitioner 的弊端是什么?

​HashPartitioner 分区的原理很简单,对于给定的 key,计算其 HashCode,并除于分区的个数取余,如果余数小于 0,则用余数 + 分区的个数,最后返回的值就是这个 key 所属的分区 ID;弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有 RDD 的所有数据。

RangePartitioner 分区的原理?

​RangePartitioner 分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。RangePartitioner 作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是 rangeBounds。

如何理解 Standalone 模式下,Spark 资源分配是粗粒度的?

Spark 默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候使用分配好的资源,除非资源出现了故障才会重新分配。比如 Spark shell 启动,已提交,一注册,哪怕没有任务,Worker 都会分配资源给 Executor。

union 操作是产生宽依赖还是窄依赖?

​产生窄依赖。

窄依赖父 RDD 的 partition 和子 RDD 的 partition 是不是都是一对一的关系?

​不一定,除了一对一的窄依赖,还包含一对固定个数的窄依赖(就是对父 RDD 的依赖的 Partition 的数量不会随着 RDD 数量规模的改变而改变),比如 join 操作的每个 partition 仅仅和已知的 partition 进行 join,这个 join 操作是窄依赖,依赖固定数量的父 RDD,因为是确定的 partition 关系。

Hadoop 中,MapReduce 操作的 mapper 和 reducer 阶段相当于 Spark 中的哪几个算子?

​相当于 Spark 中的 map 算子和 reduceByKey 算子,当然还是有点区别的,MR 会自动进行排序的,Spark 要看你用的是什么 Partitioner。

什么是 shuffle,以及为什么需要 shuffle?

​shuffle 中文翻译为洗牌,需要 shuffle 的原因是:某种具有共同特征的数据汇聚到一个计算节点上进行计算。

Spark 中的 HashShuffle 的有哪些不足?

  • shuffle 产生海量的小文件在磁盘上,此时会产生大量耗时的、低效的 IO 操作;
  • 容易导致内存不够用,由于内存需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较大的话,容易出现 OOM;
  • 容易出现数据倾斜,导致 OOM。

consolidate 是如何优化 Hash shuffle 时在 map 端产生的小文件?

  • consolidate 为了解决 Hash Shuffle 同时打开过多文件导致 Writer handler 内存使用过大以及产生过多文件导致大量的随机读写带来的低效磁盘 IO;
  • consolidate 根据 CPU 的个数来决定每个 task shuffle map 端产生多少个文件,假设原来有 10 个 task,100 个 reduce,每个节点有 10 个 CPU,那么使用 hash shuffle 会产生 10*100=1000 个文件,consolidate 产生 10*10=100 个文件

​注意:consolidate 部分减少了文件和文件句柄,并行读很高的情况下(task 很多时)还是会很多文件。

spark.default.parallelism 这个参数有什么意义,实际生产中如何设置?

  • 参数用于设置每个 Stage 的默认 Task 数量。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能;
  • 很多人都不会设置这个参数,会使得集群非常低效,你的 CPU,内存再多,如果 Task 始终为 1,那也是浪费,Spark 官网建议 Task 个数为 CPU 的核数 * Executor 的个数的 2~3 倍

spark.shuffle.memoryFraction 参数的含义,以及优化经验?

  • spark.shuffle.memoryFraction 是 shuffle 调优中 重要参数,shuffle 从上一个 task 拉去数据过来,要在 Executor 进行聚合操作,聚合操作时使用 Executor 内存的比例由该参数决定,默认是 20%,如果聚合时数据超过了该大小,那么就会 spill 到磁盘,极大降低性能;
  • 如果 Spark 作业中的 RDD 持久化操作较少,shuffle 操作较多时,建议降低持久化操作的内存占比,提高 shuffle 操作的内存占比比例,避免 shuffle 过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的 GC 导致运行缓慢,意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

Spark 中 Standalone 模式特点,有哪些优点和缺点?

  • 特点:
    • Standalone 是 Master/Slave 架构,集群由 Master 与 Worker 节点组成,程序通过与 Master 节点交互申请资源,Worker 节点启动 Executor 运行;
    • Standalone 调度模式使用 FIFO 调度方式;
    • 无依赖任何其他资源管理系统,Master 负责管理集群资源
  • 优点:
    • 部署简单;
    • 不依赖其他资源管理系统。
  • 缺点:
    • 默认每个应用程序会独占所有可用节点的资源,当然可以通过 spark.cores.max 来决定一个应用可以申请的 CPU cores 个数;
    • 可能有单点故障,需要自己配置 master HA。

FIFO 调度模式的基本原理、优点和缺点?

​基本原理:按照先后顺序决定资源的使用,资源优先满足最先来的 job。第一个 job 优先获取所有可用的资源,接下来第二个 job 再获取剩余资源。以此类推,如果第一个 job 没有占用所有的资源,那么第二个 job 还可以继续获取剩余资源,这样多个 job 可以并行运行,如果第一个 job 很大,占用所有资源,则第二个 job 就需要等待,等到第一个 job 释放所有资源。

​优点和缺点:

  • 适合长作业,不适合短作业;
  • 适合 CPU 繁忙型作业(计算时间长,相当于长作业),不利于 IO 繁忙型作业(计算时间短,相当于短作业)。

FAIR 调度模式的优点和缺点?

​所有的任务拥有大致相当的优先级来共享集群资源,Spark 多以轮询的方式为任务分配资源,不管长任务还是短任务都可以获得资源,并且获得不错的响应时间,对于短任务,不会像 FIFO 那样等待较长时间了,通过参数 spark.scheduler.mode 为 FAIR 指定。

CAPACITY 调度模式的优点和缺点?

  • 原理:

    计算能力调度器支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值(即比较空闲的队列),选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,同时考虑用户资源量限制和内存限制

  • 优点:
    • 计算能力保证:支持多个队列,某个作业可被提交到某一个队列中。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业共享该队列中的资源;
    • 灵活性:空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源资源,便会分配给他们;
    • 支持优先级:队列支持作业优先级调度(默认是 FIFO);
    • 多重租赁:综合考虑多种约束防止单个作业、用户或者队列独占队列或者集群中的资源;
    • 基于资源的调度:支持资源密集型作业,允许作业使用的资源量高于默认值,进而可容纳不同资源需求的作业。不过,当前仅支持内存资源的调度。

常见的数据压缩方式,你们生产集群采用了什么压缩方式,提升了多少效率?

  • 数据压缩,大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,可以使用适当的压缩算法。数组,对象序列化后都可以使用压缩,数更紧凑,减少空间开销。常见的压缩方式有 snappy,LZO,gz 等
  • Hadoop 生产环境常用的是 snappy 压缩方式(使用压缩,实际上是 CPU 换 IO 吞吐量和磁盘空间,所以如果 CPU 利用率不高,不忙的情况下,可以大大提升集群处理效率)。snappy 压缩比一般 20%~30% 之间,并且压缩和解压缩效率也非常高:
    • GZIP 的压缩率最高,但是其实 CPU 密集型的,对 CPU 的消耗比其他算法要多,压缩和解压速度也慢;
    • LZO 的压缩率居中,比 GZIP 要低一些,但是压缩和解压速度明显要比 GZIP 快很多,其中解压速度快的更多;
    • Zippy/Snappy 的压缩率最低,而压缩和解压速度要稍微比 LZO 要快一些。
  • 提升了多少效率可以从两个方面回答:
    • 数据存储节约多少存储,
    • 任务执行消耗时间节约了多少,可以举个实际例子展开描述。

使用 scala 代码实现 WordCount?

1
2
3
4
5
val conf = new SparkConf()
val sc = new SparkContext(conf)
val line = sc.textFile("xxxx.txt")
line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
sc.stop()

Spark RDD 和 MapReduce2 的区别?

  • MR2 只有 2 个阶段,数据需要大量访问磁盘,数据来源相对单一,Spark RDD,可以无数个阶段进行迭代计算,数据来源非常丰富,数据落地介质也非常丰富 Spark 计算基于内存;
  • MR2 需要频繁操作磁盘 IO,需要大家明确的是,如果是 SparkRDD 的话,你要知道每一种数据来源对应的是什么,RDD 从数据源加载数据,将数据放到不同的 partition 针对这些 partition 中的数据进行迭代式计算计算完成之后,落地到不同的介质当中。

Spark 和 MapReduce 快?为什么快呢?快在哪里呢?

​Spark 更加快的主要原因有几点:

  • 基于内存计算,减少低效的磁盘交互;
  • 高效的调度算法,基于 DAG;
  • 容错机制 Lineage,主要是 DAG 和 Lineage,即使 Spark 不使用内存技术,也大大快于 MapReduce。

Spark SQL 为什么比 Hive 快呢?

​计算引擎不一样,一个是 Spark 计算模型,一个是 MapReduce 计算模型。

RDD 的数据结构是怎么样的?

一个 RDD 对象,包含如下 5 个核心属性。

  • 一个分区列表,每个分区里是 RDD 的部分数据(或称数据块)。
  • 一个依赖列表,存储依赖的其他 RDD。
  • 一个名为 compute 的计算函数,用于计算 RDD 各分区的值。
  • 分区器(可选),用于键/值类型的 RDD,比如某个 RDD 是按散列来分区。
  • 计算各分区时优先的位置列表(可选),比如从 HDFS 上的文件生成 RDD 时,RDD 分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。

RDD 算子里操作一个外部 map,比如往里面 put 数据,然后算子外再遍历 map,会有什么问题吗?

​频繁创建额外对象,容易 OOM。

HBase region 多大会分区,Spark 读取 HBase 数据是如何划分 partition 的?

​region 超过了 hbase.hregion.max.filesize 这个参数配置的大小就会自动裂分,默认值是 1G。

​默认情况下,HBase 有多少个 region,Spark 读取时就会有多少个 partition

Spark 调优

数据倾斜

什么是数据倾斜?

数据倾斜指的是,并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。

数据倾斜俩大直接致命后果。

  • 数据倾斜直接会导致一种情况:Out Of Memory。
  • 运行速度慢。

主要是发生在 Shuffle 阶段。同样 Key 的数据条数太多了。导致了某个 Key(下图中的 80 亿条)所在的 Task 数据量太大了。远远超过其他 Task 所处理的数据量。

一个经验结论是:一般情况下,OOM 的原因都是数据倾斜

数据倾斜

如何定位数据倾斜?

数据倾斜一般会发生在 shuffle 过程中。很大程度上是你使用了可能会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。

原因:查看任务 -> 查看 Stage -> 查看代码

  • 某个 task 执行特别慢的情况
  • 某个 task 莫名其妙内存溢出的情况
  • 查看导致数据倾斜的 key 的数据分布情况
  • 是不是有 OOM 情况出现,一般是少数内存溢出的问题
  • 是不是应用运行时间差异很大,总体时间很长
  • 需要了解你所处理的数据 Key 的分布情况,如果有些 Key 有大量的条数,那么就要小心数据倾斜的问题
  • 一般需要通过 Spark Web UI 和其他一些监控方式出现的异常来综合判断
  • 看看代码里面是否有一些导致 Shuffle 的算子出现

数据倾斜的几种典型情况?

  1. 数据源中的数据分布不均匀,Spark 需要频繁交互
  2. 数据集中的不同 Key 由于分区方式,导致数据倾斜
  3. JOIN 操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
  4. 聚合操作中,数据集中的数据分布不均匀(主要)
  5. JOIN 操作中,两个数据集都比较大,其中只有几个 Key 的数据分布不均匀
  6. JOIN 操作中,两个数据集都比较大,有很多 Key 的数据分布不均匀
  7. 数据集中少数几个 key 数据量很大,不重要,其他数据均匀

注意:

  • 需要处理的数据倾斜问题就是 Shuffle 后数据的分布是否均匀问题
  • 只要保证最后的结果是正确的,可以采用任何方式来处理数据倾斜,只要保证在处理过程中不发生数据倾斜就可以

数据倾斜的处理方法?

  1. 数据源中的数据分布不均匀,Spark 需要频繁交互

    解决方案:避免数据源的数据倾斜。

    实现原理:通过在 Hive 中对倾斜的数据进行预处理,以及在进行 Kafka 数据分发时尽量进行平均分配。这种方案从根源上解决了数据倾斜,彻底避免了在 Spark 中执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。

    方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark 作业的性能会大幅度提升。

    方案缺点:治标不治本,Hive 或者 Kafka 中还是会发生数据倾斜。

    适用情况:在一些 Java 系统与 Spark 结合使用的项目中,会出现 Java 代码频繁调用 Spark 作业的场景,而且对 Spark 作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的 Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次 Java 调用 Spark 作业时,执行速度都会很快,能够提供更好的用户体验。

    总结:前台的 Java 系统和 Spark 有很频繁的交互,这个时候如果 Spark 能够在最短的时间内处理数据,往往会给前端有非常好的体验。这个时候可以将数据倾斜的问题抛给数据源端,在数据源端进行数据倾斜的处理。但是这种方案没有真正的处理数据倾斜问题。

  2. 数据集中的不同 Key 由于分区方式,导致数据倾斜

    解决方案一:调整并行度。

    实现原理:增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。

    方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

    方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

    实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,都无法处理。

    总结:调整并行度:适合于有大量 key 由于分区算法或者分区数的问题,将 key 进行了不均匀分区,可以通过调大或者调小分区数来试试是否有效。

    shuffle read task

    解决方案二:缓解数据倾斜(自定义 Partitioner)。

    适用场景:大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。

    解决方案:使用自定义的 Partitioner 实现类代替默认的 HashPartitioner,尽量将所有不同的 Key 均匀分配到不同的 Task 中。

    方案优点:不影响原有的并行度设计。如果改变并行度,后续 Stage 的并行度也会默认改变,可能会影响后续 Stage。

    方案缺点:适用场景有限,只能将不同 Key 分散开,对于同一 Key 对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的 Partitioner,不够灵活。

  3. JOIN 操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)

    解决方案:Reduce side Join 转变为 Map side Join。

    适用场景:在对 RDD 使用 join 类操作,或者是在 Spark SQL 中使用 join 语句时,而且 join 操作中的一个 RDD 或表的数据量比较小(比如几百兆),比较适用此方案。

    实现原理:普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 + map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。

    方案优点:对 join 操作导致的数据倾斜,效果非常好,因为根本就不会发生 shuffle,也就根本不会发生数据倾斜。

    方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。

  4. 聚合操作中,数据集中的数据分布不均匀(主要)

    解决方案:两阶段聚合(局部聚合 + 全局聚合)。

    适用场景:对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL 中使用 group by 语句进行分组聚合时,比较适用这种方案。

    实现原理:将原本相同的 key 通过附加随机前缀的方式,变成多个不同的 key,就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

    附加随机前缀两阶段聚合

    方案优点:对于聚合类的 shuffle 操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将 Spark 作业的性能提升数倍以上。

    方案缺点:仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案,将相同 key 的数据分拆处理。

  5. JOIN 操作中,两个数据集都比较大,其中只有几个 Key 的数据分布不均匀

    解决方案:为倾斜 key 增加随机前/后缀。

    适用场景:两张表都比较大,无法使用 Map 侧 Join。其中一个 RDD 有少数几个 Key 的数据量过大,另外一个 RDD 的 Key 分布较为均匀。

    解决方案:将有数据倾斜的 RDD 中倾斜 Key 对应的数据集单独抽取出来加上随机前缀,另外一个 RDD 每条数据分别与随机前缀结合形成新的 RDD(笛卡尔积,相当于将其数据增到到原来的 N 倍,N 即为随机前缀的总个数),然后将二者 Join 后去掉前缀。然后将不包含倾斜 Key 的剩余数据进行 Join。最后将两次 Join 的结果集通过 union 合并,即可得到全部 Join 结果。

    方案优点:相对于 Map 侧 Join,更能适应大数据集的 Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。

    方案缺点:如果倾斜 Key 非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜 Key 与非倾斜 Key 分开处理,需要扫描数据集两遍,增加了开销。

    注意:具有倾斜 Key 的 RDD 数据集中,key 的数量比较少。

    为倾斜 key 增加随机前/后缀

  6. JOIN 操作中,两个数据集都比较大,有很多 Key 的数据分布不均匀

    解决方案:随机前缀和扩容 RDD 进行 join。

    适用场景:如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义。

    实现思路:将该 RDD 的每条数据都打上一个 N 以内的随机前缀。同时对另外一个正常的 RDD 进行扩容,将每条数据都扩容成 N 条数据,扩容出来的每条数据都依次打上一个 1~N 的前缀。最后将两个处理后的 RDD 进行 join 即可。和上一种方案是尽量只对少数倾斜 key 对应的数据进行特殊处理,由于处理过程需要扩容 RDD,因此上一种方案扩容 RDD 后对内存的占用并不大;而这一种方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,因此只能对整个 RDD 进行数据扩容,对内存资源要求很高。

    方案优点:对 join 类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

    方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个 RDD 进行扩容,对内存资源要求很高。

    实践经验:曾经开发一个数据需求的时候,发现一个 join 导致了数据倾斜。优化之前,作业的执行时间大约是 60 分钟左右;使用该方案优化之后,执行时间缩短到 10 分钟左右,性能提升了 6 倍。

    注意:将倾斜 Key 添加 1-N 的随机前缀,并将被 Join 的数据集相应的扩大 N 倍(需要将 1-N 数字添加到每一条数据上作为前缀)

    随机前缀和扩容 RDD 进行 join

  7. 数据集中少数几个 key 数据量很大,不重要,其他数据均匀

    解决方案:过滤少数倾斜 Key。

    适用场景:如果发现导致倾斜的 key 就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如 99% 的 key 就对应 10 条数据,但是只有一个 key 对应了 100 万数据,从而导致了数据倾斜。

    方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

    方案缺点:适用场景不多,大多数情况下,导致倾斜的 key 还是很多的,并不是只有少数几个。

    实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天 Spark 作业在运行的时候突然 OOM 了,追查之后发现,是 Hive 表中的某一个 key 在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个 key 之后,直接在程序中将那些 key 给过滤掉。

资源调优

资源运行中的集中情况

  • 实践中跑的 Spark job,有的特别慢,查看 CPU 利用率很低,可以尝试减少每个 executor 占用 CPU core 的数量,增加并行的 executor 数量,同时配合增加分片,整体上增加了 CPU 的利用率,加快数据处理速度。
  • 发现某 job 很容易发生内存溢出,我们就增大分片数量,从而减少了每片数据的规模,同时还减少并行的 executor 数量,这样相同的内存资源分配给数量更少的 executor,相当于增加了每个 task 的内存分配,这样运行速度可能慢了些,但是总比 OOM 强。
  • 数据量特别少,有大量的小文件生成,就减少文件分片,没必要创建那么多 task,这种情况,如果只是最原始的 input 比较小,一般都能被注意到;但是,如果是在运算过程中,比如应用某个 reduceBy 或者某个 filter 以后,数据大量减少,这种低效情况就很少被留意到。

运行资源优化配置

一个 CPU core 同一时间只能执行一个线程。而每个 Executor 进程上分配到的多个 task,都是以每个 task 一条线程的方式,多线程并发运行的。

一个应用提交的时候设置多大的内存?设置多少 Core? 设置几个 Executor?

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3

  1. -num-executors

    • 参数说明:该参数用于设置 Spark 作业总共要用多少个 Executor 进程来执行。Driver 在向 YARN 集群管理器申请资源时,YARN 集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的 Executor 进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的 Executor 进程,此时你的 Spark 作业的运行速度是非常慢的。

    • 调优建议:每个 Spark 作业的运行一般设置 50~100 个左右的 Executor 进程比较合适,设置太少或太多的 Executor 进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

  2. -executor-memory

    • 参数说明:该参数用于设置每个 Executor 进程的内存。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常,也有直接的关联。

    • 调优建议:每个 Executor 进程的内存设置 4G~8G 较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors * executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的 1/3~1/2,避免你自己的 Spark 作业占用了队列所有的资源,导致别的同事的作业无法运行。

  3. -executor-cores

    • 参数说明:该参数用于设置每个 Executor 进程的 CPU core 数量。这个参数决定了每个 Executor 进程并行执行 task 线程的能力。因为每个 CPU core 同一时间只能执行一个 task 线程,因此每个 Executor 进程的 CPU core 数量越多,越能够快速地执行完分配给自己的所有 task 线程。

    • 调优建议:Executor 的 CPU core 数量设置为 2~4 个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大 CPU core 限制是多少,再依据设置的 Executor 数量,来决定每个 Executor 进程可以分配到几个 CPU core。同样建议,如果是跟他人共享这个队列,那么 num-executors * executor-cores 不要超过队列总 CPU core 的 1/3~1/2 左右比较合适,也是避免影响其他同事的作业运行。

  4. -driver-memory

    • 参数说明:该参数用于设置 Driver 进程的内存。

    • 调优建议:Driver 的内存通常来说不设置,或者设置 1G 左右应该就够了。唯一需要注意的一点是,如果需要使用 collect 算子将 RDD 的数据全部拉取到 Driver 上进行处理(或者是用 map side join 操作),那么必须确保 Driver 的内存足够大,否则会出现 OOM 内存溢出的问题。

  5. -spark.default.parallelism

    • 参数说明:该参数用于设置每个 stage 的默认 task 数量,也可以认为是分区数。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能。

    • 调优建议:Spark 作业的默认 task 数量为 500~1000 个较为合适。很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致 Spark 自己根据底层 HDFS 的 block 数量来设置 task 的数量,默认是一个 HDFS block 对应一个 task。通常来说,Spark 默认设置的数量是偏少的(比如就几十个 task),如果 task 数量偏少的话,就会导致你前面设置好的 Executor 的参数都前功尽弃。试想一下,无论你的 Executor 进程有多少个,内存和 CPU 有多大,但是 task 只有 1 个或者 10 个,那么 90% 的 Executor 进程可能根本就没有 task 执行,也就是白白浪费了资源!因此 Spark 官网建议的设置原则是,设置该参数为 num-executors * executor-cores 的 2~3 倍较为合适,比如 Executor 的总 CPU core 数量为 300 个,那么设置 1000 个 task 是可以的,此时可以充分地利用 Spark 集群的资源。

  6. -spark.storage.memoryFraction

    • 参数说明:该参数用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6。也就是说,默认 Executor 60% 的内存,可以用来保存持久化的 RDD 数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

    • 调优建议:如果 Spark 作业中,有较多的 RDD 持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果 Spark 作业中的 shuffle 类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的 GC 导致运行缓慢(通过 spark web ui 可以观察到作业的 GC 耗时),意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

  7. -spark.shuffle.memoryFraction

    • 参数说明:该参数用于设置 shuffle 过程中一个 task 拉取到上个 stage 的 task 的输出后,进行聚合操作时能够使用的 Executor 内存的比例,默认是 0.2。也就是说,Executor 默认只有 20% 的内存用来进行该操作。shuffle 操作在进行聚合时,如果发现使用的内存超出了这个 20% 的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

    • 调优建议:如果 Spark 作业中的 RDD 持久化操作较少,shuffle 操作较多时,建议降低持久化操作的内存占比,提高 shuffle 操作的内存占比比例,避免 shuffle 过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的 GC 导致运行缓慢,意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

总结:

  • num-executors:应用运行时 executor 的数量,推荐 50-100 左右比较合适
  • executor-memory:应用运行时 executor 的内存,推荐 4-8G 比较合适
  • executor-cores:应用运行时 executor 的 CPU 核数,推荐 2-4 个比较合适
  • driver-memory:应用运行时 driver 的内存量,主要考虑如果使用 map side join 或者一些类似于 collect 的操作,那么要相应调大内存量
  • spark.default.parallelism:每个 stage 默认的 task 数量,推荐参数为 num-executors * executor-cores 的 2~3 倍较为合适
  • spark.storage.memoryFraction:每一个 executor 中用于 RDD 缓存的内存比例,如果程序中有大量的数据缓存,可以考虑调大整个的比例,默认为 60%
  • spark.shuffle.memoryFraction:每一个 executor 中用于 Shuffle 操作的内存比例,默认是 20%,如果程序中有大量的 Shuffle 类算子,那么可以考虑其它的比例

程序开发调优

避免创建重复的 RDD

需要对名为 hello.txt 的 HDFS 文件进行一次 map 操作,再进行一次 reduce 操作。也就是说,需要对一份数据执行两次算子操作。

错误的做法:对于同一份数据执行多次算子操作时,创建多个 RDD。这里执行了两次 textFile 方法,针对同一个 HDFS 文件,创建了两个 RDD 出来,然后分别对每个 RDD 都执行了一个算子操作。这种情况下,Spark 需要从 HDFS 上两次加载 hello.txt 文件的内容,并创建两个单独的 RDD;// 第二次加载 HDFS 文件以及创建 RDD 的性能开销,很明显是白白浪费掉的。

1
2
3
4
val rdd1 = sc.textFile("hdfs://master:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://master:9000/hello.txt")
rdd2.reduce(...)

正确的用法:对于一份数据执行多次算子操作时,只使用一个 RDD。

尽可能复用同一个 RDD

错误的做法:有一个 <long , String> 格式的 RDD,即 rdd1。接着由于业务需要,对 rdd1 执行了一个 map 操作,创建了一个 rdd2,而 rdd2 中的数据仅仅是 rdd1 中的 value 值而已,也就是说,rdd2 是 rdd1 的子集。

1
2
3
4
5
6
JavaPairRDD<long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)

// 分别对 rdd1 和 rdd2 执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

正确的做法:rdd2 的数据完全就是 rdd1 的子集而已,却创建了两个 rdd,并对两个 rdd 都执行了一次算子操作。此时会因为对 rdd1 执行 map 算子来创建 rdd2,而多执行一次算子操作,进而增加性能开销。其实在这种情况下完全可以复用同一个 RDD。我们可以使用 rdd1,既做 reduceByKey 操作,也做 map 操作。

1
2
3
JavaPairRDD<long , String> 
rdd1 = ...rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

对多次使用的 RDD 进行持久化

正确的做法:cache 方法表示:使用非序列化的方式将 RDD 中的数据全部尝试持久化到内存中。此时再对 rdd1 执行两次算子操作时,只有在第一次执行 map 算子时,才会将这个 rdd1 从源头处计算一次。第二次执行 reduce 算子时,就会直接从内存中提取数据进行计算,不会重复计算一个 rdd。

1
2
3
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁 GC。

1
2
3
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

注意:通常不建议使用 DISK_ONLY 和后缀为 _2 的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,导致网络较大开销

尽量避免使用 shuffle 类算子

如果有可能的话,要尽量避免使用 shuffle 类算子,最消耗性能的地方就是 shuffle 过程。shuffle 过程中,各个节点上的相同 key 都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同 key。而且相同 key 都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的 key 过多,导致内存不够存放,进而溢写到磁盘文件中。因此在 shuffle 过程中,可能会发生大量的磁盘文件读写的 IO 操作,以及数据的网络传输操作。磁盘 IO 和网络数据传输也是 shuffle 性能较差的主要原因。

尽可能避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。传统的 join 操作会导致 shuffle 操作。因为两个 RDD 中,相同的 key 都需要通过网络拉取到一个节点上,由一个 task 进行 join 操作。

1
val rdd3 = rdd1.join(rdd2)

Broadcast + map 的 join 操作,不会导致 shuffle 操作。使用 Broadcast 将一个数据量较小的 RDD 作为广播变量。

1
2
3
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)

注意:以上操作,建议仅仅在 rdd2 的数据量比较少(比如几百 M,或者一两 G)的情况下使用。因为每个 Executor 的内存中,都会驻留一份 rdd2 的全量数据。

使用 map-side 预聚合的 shuffle 操作

如果因为业务需要,一定要使用 shuffle 操作,无法用 map 类的算子来替代,那么尽量使用可以 map-side 预聚合的算子,类似于 MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条相同的 key,因为多条相同的 key 都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。

建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子

map-side 预聚合

使用高性能的算子

  • 使用 reduceByKey/aggregateByKey 替代 groupByKey:map-side
  • 使用 mapPartitions 替代普通 map:函数执行频率
  • 使用 foreachPartitions 替代 foreach:函数执行频率
  • 使用 filter 之后进行 coalesce 操作:filter 后对分区进行压缩
  • 使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作
  • repartitionAndSortWithinPartitions 是 Spark 官网推荐的一个算子,官方建议,如果需要在 repartition 重分区之后,还要进行排序,建议直接使用 repartitionAndSortWithinPartitions 算子

广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M 以上的大集合),那么此时就应该使用 Spark 的广播(Broadcast)功能来提升性能。

默认情况下,Spark 会将该变量复制多个副本,通过网络传输到 task 中,此时每个 task 都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至 1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的 Executor 中占用过多内存导致的频繁 GC,都会极大地影响性能。

广播后的变量,会保证每个 Executor 的内存中,只驻留一份变量副本,而 Executor 中的 task 执行时共享该 Executor 中的那份变量副本。

使用 Kryo 优化序列化性能

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
  • 将自定义的类型作为 RDD 的泛型类型时(比如 JavaRDD,Student 是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable 接口。
  • 使用可序列化的持久化策略时(比如 MEMORY_ONLY_SER),Spark 会将 RDD 中的每个 partition 都序列化成一个大的字节数组。

Spark 默认使用的是 Java 的序列化机制,你可以使用 Kryo 作为序列化类库,效率要比 Java 的序列化机制要高

1
2
3
4
5
6
// 创建 SparkConf 对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为 KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

分区 Shuffle 优化

例如当遇到 userData 和 events 进行 join 时,userData 比较大,而且 join 操作比较频繁,这个时候,可以先将 userData 调用了 partitionBy() 分区,可以极大提高效率。

cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup() 等都能够受益

分区 Shuffle 优化

总结:如果遇到一个 RDD 频繁和其他 RDD 进行 Shuffle 类操作,比如 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup() 等,那么最好将该 RDD 通过 partitionBy() 操作进行预分区,这些操作在 Shuffle 过程中会减少 Shuffle 的数据量

优化数据结构

Java 中,有三种类型比较耗费内存:

  • 对象,每个 Java 对象都有对象头、引用等额外的信息,因此比较占用内存空间。
  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
  • 集合类型,比如 HashMap、LinkedList 等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如 Map.Entry。

Spark 官方建议,在 Spark 编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低 GC 频率,提升性能。

Shuffle 配置调优

spark.shuffle.file.buffer

  • 默认值:32k

  • 参数说明:该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。

  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 64k),从而减少 shuffle write 过程中溢写磁盘文件的次数,也就可以减少磁盘 IO 次数,进而提升性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提升。

spark.reducer.maxSizeInFlight

  • 默认值:48m

  • 参数说明:该参数用于设置 shuffle read task 的 buffer 缓冲大小,而这个 buffer 缓冲决定了每次能够拉取多少数据。

  • 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提升。

spark.shuffle.io.maxRetries

  • 默认值:3

  • 参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

  • 调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如 60 次),以避免由于 JVM 的 FULL GC 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

  • 默认值:5s

  • 参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的,该参数代表了每次重试拉取数据的等待间隔,默认是 5s。

  • 调优建议:建议加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性。

spark.shuffle.memoryFraction

  • 默认值:0.2

  • 参数说明:该参数代表了 Executor 内存中,分配给 shuffle read task 进行聚合操作的内存比例,默认是 20%。

  • 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给 shuffle read 的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升 10% 左右。

spark.shuffle.manager

  • 默认值:sort

  • 参数说明:该参数用于设置 ShuffleManager 的类型。Spark 1.5 以后,有三个可选项:hashsorttungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默认选项,但是 Spark 1.2 以及之后的版本默认都是 SortShuffleManager 了。tungsten-sortsort 类似,但是使用了 tungsten 计划中的堆外内存管理机制,内存使用效率更高。

  • 调优建议:由于 SortShuffleManager 默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的 SortShuffleManager 就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过 bypass 机制或优化的 HashShuffleManager 来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort 要慎用,因为之前发现了一些相应的 bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默认值:200

  • 参数说明:当 ShuffleManager 为 SortShuffleManager 时,如果 shuffle read task 的数量小于这个阈值(默认是 200),则 shuffle write 过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

  • 调优建议:当你使用 SortShuffleManager 时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于 shuffle read task 的数量。那么此时就会自动启用 bypass 机制,map-side 就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此 shuffle write 性能有待提高。

spark.shuffle.consolidateFiles

  • 默认值:false

  • 参数说明:如果使用 HashShuffleManager,该参数有效。如果设置为 true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输出文件,对于 shuffle read task 数量特别多的情况下,这种方法可以极大地减少磁盘 IO 开销,提升性能。

  • 调优建议:如果的确不需要 SortShuffleManager 的排序机制,那么除了使用 bypass 机制,还可以尝试将 spark.shuffle.manager 参数手动指定为 hash,使用 HashShuffleManager,同时开启 consolidate 机制。在实践中尝试过,发现其性能比开启了 bypass 机制的 SortShuffleManager 要高出 10%~30%。

Shuffle 配置调优总结:

  • spark.shuffle.file.buffer:主要是设置的 Shuffle 过程中写文件的缓冲,默认 32k,如果内存足够,可以适当调大,来减少写入磁盘的数量。
  • spark.reducer.maxSizeInFight:主要是设置 Shuffle 过程中读文件的缓冲区,一次能够读取多少数据,如果内存足够,可以适当扩大,减少整个网络传输次数。
  • spark.shuffle.io.maxRetries:主要是设置网络连接失败时,重试次数,适当调大能够增加稳定性。
  • spark.shuffle.io.retryWait:主要设置每次重试之间的间隔时间,可以适当调大,增加程序稳定性。
  • spark.shuffle.memoryFraction:Shuffle 过程中的内存占用,如果程序中较多使用了 Shuffle 操作,那么可以适当调大该区域。
  • spark.shuffle.manager:hash 和 sort 方式,sort 是默认,hash 在 reduce 数量 比较少的时候,效率会很高。
  • spark.shuffle.sort. bypassMergeThreshold:设置的是 Sort 方式中,启用 Hash 输出方式的临界值,如果你的程序数据不需要排序,而且 reduce 数量比较少,那推荐可以适当增大临界值。
  • spark. shuffle.consolidateFiles:如果你使用 Hash shuffle 方式,推荐打开该配置,实现更少的文件输出。