Spark 的算子的分类

  • 从大方向来说,Spark 算子大致可以分为以下两类:

    • Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。

      Transformation 操作是延迟计算的,也就是说从一个 RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

    • Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

      Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark 系统。

  • 从小方向来说,Spark 算子大致可以分为以下三类:

Value 数据类型的 Transformation 算子

输入分区与输出分区一对一型

map 算子

  • 说明:通过将函数应用于此 RDD 的所有元素来返回新的 RDD。

    将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。

    图中每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。但是,实际只有等到 Action 算子触发后,这个 f 函数才会和其他函数在一个 stage 中对数据进行运算。在图中的第一个分区,数据记录 V1 输入 f,通过 f 转换输出为转换后的分区中的数据记录 V'1。

    map 算子

  • 用法:

    1
    2
    def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
    Return a new RDD by applying a function to all elements of this RDD.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data = sc.parallelize(1 to 10, 3)
    data.map(item => item + 1).collect
    data.map(_ + 1).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
    res1: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

flatMap 算子

  • 说明:首先向该 RDD 的所有元素应用函数,然后将结果展平,以返回新的 RDD。

    将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。

    下图表示 RDD 的一个分区,进行 flatMap 函数操作,flatMap 中传入的函数为 f:T->U, T 和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为 V'1、 V'2、 V'3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。

    flatMap 算子

  • 用法:

    1
    2
    def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data = sc.parallelize(1 to 10, 3)
    data.flatMap(item => item to 10).collect
    data.flatMap(_ to 10).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10, 3, 4, 5, 6, 7, 8, 9, 10, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 6, 7, 8, 9, 10, 7, 8, 9, 10, 8, 9, 10, 9, 10, 10)
    res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10, 3, 4, 5, 6, 7, 8, 9, 10, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 6, 7, 8, 9, 10, 7, 8, 9, 10, 8, 9, 10, 9, 10, 10)

mapPartitions 算子

  • 说明:通过将函数应用于此 RDD 的每个分区来返回新的 RDD。

    mapPartitions 函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。内部实现是生成 MapPartitionsRDD。图中的方框代表一个 RDD 分区,用户通过函数 f(iter)=>iter.filter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、2、3 的分区过滤只剩下元素 3。

    mapPartitions 算子

  • 用法:

    1
    2
    def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
    Return a new RDD by applying a function to each partition of this RDD.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val data = sc.parallelize(1 to 10, 3)
    def func(it: Iterator[Int]): Iterator[Int] = {
    it.filter(_ >= 3)
    }
    data.mapPartitions(func).collect
    data.mapPartitions(_.filter(_ >= 3)).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    func: (it: Iterator[Int])Iterator[Int]
    res0: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10)
    res1: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10)

glom 算子

  • 说明:返回通过将每个分区内的所有元素合并到数组中而创建的 RDD。

    glom 函数将每个分区形成一个数组,内部实现是返回的 GlommedRDD。 图中的每个方框代表一个 RDD 分区。该图表示含有 V1、 V2、 V3 的分区通过函数 glom 形成一数组 Array[(V1),(V2),(V3)]。

    glom 算子

  • 用法:

    1
    2
    def glom(): RDD[Array[T]]
    Return an RDD created by coalescing all elements within each partition into an array.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(1 to 10, 3)
    data.glom().collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

输入分区与输出分区多对一型

union 算子

  • 说明:返回此 RDD 和另一个 RDD 的联合。

    使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。

    图中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。含有 V1、V2、U1、U2、U3、U4 的 RDD 和含有 V1、V8、U5、U6、U7、U8 的 RDD 合并所有元素形成一个 RDD。V1、V1、V2、V8 形成一个分区,U1、U2、U3、U4、U5、U6、U7、U8 形成一个分区。

    union 算子

  • 用法:

    1
    2
    def union(other: RDD[T]): RDD[T]
    Return the union of this RDD and another one.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data1 = sc.parallelize(1 to 10, 3)
    val data2 = sc.parallelize(5 to 15, 3)
    data1.union(data2).collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    data2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:25
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

cartesian 算子

  • 说明:返回此 RDD 和另一个 RDD 的进行笛卡尔积运算,即返回 a 和 b 的所有元素对 (a,b) 的 RDD。

    对两个 RDD 内的所有元素进行笛卡尔积操作,该操作不会执行 shuffle 操作。操作后,内部实现返回 CartesianRDD。图中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。图中的大方框代表 RDD,大方框中的小方框代表 RDD 分区。例如:V1 和另一个 RDD 中的 W1、W2、Q5 进行笛卡尔积运算形成 (V1,W1)、(V1,W2)、(V1,Q5)。

    cartesian 算子

  • 用法:

    1
    2
    def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
    Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data1 = sc.parallelize(1 to 5, 3)
    val data2 = sc.parallelize(5 to 10, 3)
    data1.cartesian(data2).collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    data2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:25
    res0: Array[(Int, Int)] = Array((1,5), (1,6), (1,7), (1,8), (1,9), (1,10), (2,5), (2,6), (3,5), (3,6), (2,7), (2,8), (3,7), (3,8), (2,9), (2,10), (3,9), (3,10), (4,5), (4,6), (5,5), (5,6), (4,7), (4,8), (5,7), (5,8), (4,9), (4,10), (5,9), (5,10))

输入分区与输出分区多对多型

groupBy 算子

  • 说明:返回分组元素的 RDD。每个组由一个键和映射到该键的一系列元素组成。不能保证每个组中元素的顺序,并且每次生成的 RDD 时甚至可能会有所不同。

    将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。 函数实现如下:将用户函数预处理,对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。this.map(t => (cleanF(t), t)).groupByKey(p),其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。

    图中方框代表一个 RDD 分区,相同 key 的元素合并到一个组。例如 V1 和 V2 合并为 V,Value 为 V1,V2。形成 V,Seq(V1,V2)。

    groupBy 算子

  • 用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def groupBy[K](f: (T) ⇒ K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    def groupBy[K](f: (T) ⇒ K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
    def groupBy[K](f: (T) ⇒ K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]
    Return an RDD of grouped elements. Each group consists of a key and a sequence of elements mapping to that key. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.

    def groupByKey(): RDD[(K, Iterable[V])]
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
    Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with the existing partitioner/parallelism level. The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    val data1 = sc.parallelize(1 to 10, 3)
    data1.groupBy(_ % 2).collect // 分成两组

    val data2 = sc.parallelize(Array("a", "b", "c", "aa", "bb", "cc", "aaa", "bbb", "ccc"), 1)
    val data3 = data2.keyBy(_.length) // 给 value 加上 key,key 为对应 string 的长度
    data3.groupByKey.collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(4, 6, 2, 8, 10)), (1,CompactBuffer(1, 3, 7, 9, 5)))

    data2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:25
    data3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at keyBy at <console>:25
    res1: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(a, b, c)), (3,CompactBuffer(aaa, bbb, ccc)), (2,CompactBuffer(aa, bb, cc)))

输出分区为输入分区子集型

filter 算子

  • 说明:返回仅包含满足条件的元素的新 RDD。

    filter 函数功能是对元素进行过滤,对每个元素应用 f 函 数,返回值为 true 的元素在 RDD 中保留,返回值为 false 的元素将被过滤掉。内部实现相当于生成 FilteredRDD(this,sc.clean(f))。图中每个方框代表一个 RDD 分区, T 可以是任意的类型。通过用户自定义的过滤函数 f,对每个数据项操作,将满足条件、返回结果为 true 的数据项保留。例如,过滤掉 V2 和 V3 保留了 V1,为区分命名为 V'1。

    下面代码为函数的本质实现:def filter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

    filter 算子

  • 用法:

    1
    2
    def filter(f: (T) ⇒ Boolean): RDD[T]
    Return a new RDD containing only the elements that satisfy a predicate.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(1 to 10, 3)
    data.filter(_ % 2 == 0).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(2, 4, 6, 8, 10)

distinct 算子

  • 说明:返回一个包含该 RDD 中不同元素的新 RDD。

    distinct 将 RDD 中的元素进行去重操作。图中的每个方框代表一个 RDD 分区,通过 distinct 函数,将数据去重。例如,重复数据 V1、V1 去重后只保留一份 V1。

    distinct 算子

  • 用法:

    1
    2
    3
    def distinct(): RDD[T]
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    Return a new RDD containing the distinct elements in this RDD.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(Array(1, 2, 3, 3, 2, 1), 3)
    data.distinct.collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(3, 1, 2)

subtract 算子

  • 说明:返回一个新 RDD,其中的元素在第一个 RDD 有,第二个 RDD 没有。

    subtract 相当于进行集合的差操作,RDD1 去除 RDD1 和 RDD2 交集中的所有元素。图中左侧的大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。V1 在两个 RDD 中均有,根据差集运算规则,新 RDD 不保留,V2 在第一个 RDD 有,第二个 RDD 没有,则在新 RDD 元素中包含 V2。

    subtract 算子

  • 用法:

    1
    2
    3
    4
    def subtract(other: RDD[T]): RDD[T]
    def subtract(other: RDD[T], numPartitions: Int): RDD[T]
    def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    Return an RDD with the elements from this that are not in other.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data1 = sc.parallelize(1 to 10, 3)
    val data2 = sc.parallelize(1 to 5, 3)
    data1.subtract(data2).collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    data2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:25
    res0: Array[Int] = Array(6, 9, 7, 10, 8)

sample 算子

  • 说明:返回此 RDD 的采样子集。

    sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。图中的每个方框是一个 RDD 分区。通过 sample 函数, 采样 50% 的数据。V1、V2、U1、U2、U3、U4 采样出数据 V1 和 U1、U2 形成新的 RDD。

    sample 算子

  • 用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
    Return a sampled subset of this RDD.

    withReplacement:can elements be sampled multiple times (replaced when sampled out)
    可以多次采样元素(采样时替换),true 表示有放回抽样,false 表示无放回抽样,

    fraction:expected size of the sample as a fraction of this RDD's size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0
    样本的预期大小,占该 RDD 大小的一部分,无需替换:选择每个元素的概率;分数必须为 [01],并带有替换:选择每个元素的预期次数;小数必须大于或等于 0

    seed:seed for the random number generator
    随机数生成器的种子

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data = sc.parallelize(1 to 10, 3)
    data.sample(true, 0.5, 9).collect
    data.sample(false, 0.5, 9).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(1, 1, 3, 4, 4, 5, 6, 8, 9, 9)
    res1: Array[Int] = Array(2, 3, 4, 6, 8, 9)

Cache 型

cache 算子

  • 说明:使用默认存储级别(MEMORY_ONLY)缓存该 RDD。

    cache 将 RDD 元素从磁盘缓存到内存。相当于 persist(MEMORY_ONLY) 函数的功能。图中每个方框代表一个 RDD 分区,左侧相当于数据分区都存储在磁盘,通过 cache 算子将数据缓存在内存。

    cache 算子

  • 用法:

    1
    2
    def cache(): RDD.this.type
    Persist this RDD with the default storage level (MEMORY_ONLY).

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(1 to 10, 3)
    data.cache()

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: data.type = ParallelCollectionRDD[0] at parallelize at <console>:25

persist 算子

  • 说明:设置此 RDD 的存储级别,以在第一次计算它之后将其值持久化到各个操作中。如果 RDD 尚未设置存储级别,则只能用于分配新的存储级别。本地检查点是一个例外。

    persist 函数对 RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。有以下几种类型的组合,DISK 代表磁盘,MEMORY 代表内存,SER 代表数据是否进行序列化存储。

    下面为函数定义,StorageLevel 是枚举类型,代表存储模式,用户可以按需进行选择。persist(newLevel:StorageLevel),下面列出 persist 函数可以进行缓存的模式。例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class StorageLevel private(
    private var _useDisk: Boolean, //是否使用磁盘
    private var _useMemory: Boolean, //是否使用内存
    private var _useOffHeap: Boolean, //是否使用堆外内存
    private var _deserialized: Boolean, //是否反序列化
    private var _replication: Int = 1) //备份因子,默认为 1
    extends Externalizable {}

    object StorageLevel {
    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
    val MEMORY_ONLY = new StorageLevel(false, true, false, true)
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    }

    下图中方框代表 RDD 分区。disk 代表存储在磁盘,mem 代表存储在内存。数据最初全部存储在磁盘,通过 persist(MEMORY_AND_DISK) 将数据缓存到内存,但是有的分区无法容纳在内存,将含有 V1、 V2、 V3 的 RDD 存储到磁盘,将含有 U1,U2 的 RDD 仍旧存储在内存。

    persist 算子

  • 用法:

    1
    2
    3
    4
    5
    def persist(): RDD.this.type
    Persist this RDD with the default storage level (MEMORY_ONLY).

    def persist(newLevel: StorageLevel): RDD.this.type
    Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. Local checkpointing is an exception.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(1 to 10, 3)
    data.persist(StorageLevel.MEMORY_AND_DISK)

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: data.type = ParallelCollectionRDD[0] at parallelize at <console>:25

Key-Value 数据类型的 Transformation 算子

输入分区与输出分区一对一

mapValues 算子

  • 说明:在不更改键的情况下,通过映射函数传递键值对 RDD 中的每个值;这也保留了原始 RDD 的分区。

    针对 (Key, Value) 型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。图中的方框代表 RDD 分区。 a=>a+2 代表对 (V1, 1) 这样的 Key-Value 数据对,数据只对 Value 中的 1 进行加 2 操作,返回结果为 3。

    mapValues 算子

  • 用法:

    1
    2
    def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
    Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 3)
    data.map(x => (x, x)).mapValues(a => (a + 2)).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,6), (5,7), (6,8))

对单个 RDD 或两个 RDD 聚合

单个 RDD 聚合

combineByKey 算子

  • 说明:CombineByKeyWithClassTag 的简化版本,它使用现有的分区程序/并行度级别对生成的 RDD 进行哈希分区。此方法是为了向后兼容。它不向混洗提供组合器类标签信息。

    例如,相当于将元素为 (Int, Int) 的 RDD 转变为了 (Int, Seq[Int]) 类型元素的 RDD。图中的方框代表 RDD 分区。如图,通过 combineByKey,将 (V1,2), (V1,1) 数据合并为 (V1, Seq(2, 1))。

    combineByKey 算子

  • 用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
    def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, numPartitions: Int): RDD[(K, C)]
    def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

    createCombiner,createCombiner, which turns a V into a C (e.g., creates a one-element Array)
    V 变成 C(例如,创建一个元素列表),C 不存在的情况下,比如通过 V 创建 seq C

    mergeValue, to merge a V into a C (e.g., adds it to the end of a Array)
    V 合并为 C(例如,将其添加到列表的末尾),当 C 已经存在的情况下,需要 merge,比如把 item V 加到 seq C 中,或者叠加。

    mergeCombiners, to combine two C's into a single one.
    将两个 C 合并为一个。

    mapSideCombine Boolean = true
    为了减小传输量,很多 combine 可以在 map 端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,再 shuffle。

    serializerClass: String = null
    传输需要序列化,用户可以自定义序列化类。

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    val data = sc.parallelize(Array(("a", 88), ("a", 95), ("a", 91), ("b", 93), ("b", 95), ("b", 98)))
    data.combineByKey(
    (v) => (v, 1),
    (c: (Int, Int), v) => (c._1 + v, c._2 + 1),
    (c1: (Int, Int), c2: (Int, Int)) => (c1._1 + c2._1, c1._2 + c2._2)
    ).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[(String, (Int, Int))] = Array((a,(274,3)), (b,(286,3)))

reduceByKey 算子

  • 说明:使用关联和可交换的归约函数合并每个键的值。在将结果发送给 reducer 之前,这还将在每个 Mapper 上本地执行合并,这与 MapReduce 中的 combiner 类似。输出将使用现有分区/并行级别进行哈希分区。

    reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,(Int, Int V) to (Int, Int C),比如叠加。所以 createCombiner reduceByKey 很简单,就是直接返回 v,而 mergeValue 和 mergeCombiners 逻辑是相同的,没有区别。图中的方框代表 RDD 分区。通过用户自定义函数 (A,B) => (A + B) 函数,将相同 key 的数据 (V1,2) 和 (V1,1) 的 value 相加运算,结果为 (V1,3)。

    reduceByKey 算子

  • 用法:

    1
    2
    3
    4
    def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
    def reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]
    def reduceByKey(partitioner: Partitioner, func: (V, V) ⇒ V): RDD[(K, V)]
    Merge the values for each key using an associative and commutative reduce function.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data = sc.parallelize(Array(("a", 1), ("a", 2), ("b", 3), ("b", 4)))
    data.reduceByKey((x, y) => x + y).collect
    data.reduceByKey(_ + _).collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[(String, Int)] = Array((b,7), (a,3))
    res1: Array[(String, Int)] = Array((b,7), (a,3))

partitionBy 算子

  • 说明:返回使用指定分区程序分区的 RDD 的副本。

    如果原有 RDD 的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的 ShuffledRDD。图中的方框代表 RDD 分区。通过新的分区策略将原来在不同分区的 V1、 V2 数据都合并到了一个分区。

    partitionBy 算子

  • 用法:

    1
    2
    def partitionBy(partitioner: Partitioner): RDD[(K, V)]
    Return a copy of the RDD partitioned using the specified partitioner.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(Array("a", 1), ("a", 2), ("b", 1), ("b", 3), ("c", 1), ("e", 5), 2)
    data.partitionBy(new HashPartitioner(4)).glom.collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Array[(String, Int)]] = Array(Array((a,2), (c,1)), Array((b,2), (e,5)), Array(), Array((a,1), (b,3)))

两个 RDD 聚合

cogroup 算子

  • 说明:对在两个 RDD 中的 Key-Value 类型的元素,每个 RDD 相同 Key 的元素分别聚合为一个集合,并且返回两个 RDD 中对应 Key 的元素集合的迭代器。

    其中,Key 和 Value,Value 是两个 RDD 下相同 Key 的两个数据集合的迭代器所构成的元组。图中的大方框代表 RDD,大方框内的小方框代表 RDD 中的分区。将 RDD1 中的数据 (U,1)、(U1,2) 和 RDD2 中的数据 (U1,2) 合并为 (U1,((1,2),(2)))。

    cogroup 算子

  • 用法:

    1
    2
    def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
    For each key k in this or other1 or other2, return a resulting RDD that contains a tuple with the list of values for that key in this, other1 and other2.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data1 = sc.parallelize(Array(("a", 1), ("a", 2)))
    val data2 = sc.parallelize(Array(("a", 3), ("a", 4)))
    data1.cogroup(data2).collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    data2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:25
    res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((a,(CompactBuffer(1, 2),CompactBuffer(4, 3))))

连接

join 算子

  • 说明:返回一个 RDD,其中包含所有成对的元素。

    join 对两个需要连接的 RDD 进行 cogroup 函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个 key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。

    下面代码为 join 的函数实现,本质是通过 cogroup 算子先进行协同划分,再通过 flatMapValues 将合并的数据打散。

    this.cogroup(other, partitioner).flatMapValues{case(vs, ws) => for(v<-vs; w<-ws) yield(v, w) }

    下图是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。

    join 算子

  • 用法:

    1
    2
    3
    4
    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
    Return an RDD containing all pairs of elements with matching keys in this and other.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data1 = sc.parallelize(Array(("a", 1), ("a", 2)))
    val data2 = sc.parallelize(Array(("a", 3), ("a", 4)))
    data1.join(data2).collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    data2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:25
    res0: Array[(String, (Int, Int))] = Array((a,(1,3)), (a,(1,4)), (a,(2,3)), (a,(2,4)))

leftOuterJoin 和 rightOuterJoin 算子

  • 说明:leftOuterJoin(左外连接)和 rightOuterJoin(右外连接)相当于在 join 的基础上先判断一侧的 RDD 元素是否为空,如果为空,则填充为空。如果不为空,则将数据进行连接运算,并返回结果。

  • 用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
    def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
    Perform a left outer join of this and other. For each element (k, v) in this, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in other, or the pair (k, (v, None)) if no elements in other have key k. Hash-partitions the output using the existing partitioner/parallelism level.

    def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
    def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
    def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
    Perform a right outer join of this and other. For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (Some(v), w)) for v in this, or the pair (k, (None, w)) if no elements in this have key k. Hash-partitions the resulting RDD using the existing partitioner/parallelism level.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    val data1 = sc.parallelize(Array(("a", 1), ("a", 2)))
    val data2 = sc.parallelize(Array(("a", 3), ("b", 4)))
    data1.leftOuterJoin(data2).collect
    data2.leftOuterJoin(data1).collect
    data1.rightOuterJoin(data2).collect
    data2.rightOuterJoin(data1).collect

    // 输出结果
    data1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    data2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:25
    res0: Array[(String, (Int, Option[Int]))] = Array((a,(2,Some(3))), (a,(1,Some(3))))
    res1: Array[(String, (Int, Option[Int]))] = Array((b,(4,None)), (a,(3,Some(1))), (a,(3,Some(2))))
    res2: Array[(String, (Option[Int], Int))] = Array((b,(None,4)), (a,(Some(1),3)), (a,(Some(2),3)))
    res3: Array[(String, (Option[Int], Int))] = Array((a,(Some(3),2)), (a,(Some(3),1)))

Action 算子

本质上在 Action 算子中通过 SparkContext 进行了提交作业的 runJob 操作,触发了 RDD DAG 的执行。

无输出

foreach 算子

  • 说明:foreach 对 RDD 中的每个元素都应用 f 函数操作,不返回 RDD 和 Array, 而是返回 Uint。下图表示 foreach 算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为 println(),控制台打印所有数据项。

    foreach 算子

  • 用法:

    1
    def foreach[U](f: (T) ⇒ U)(implicit executor: ExecutionContext): Unit

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    val data = sc.parallelize(1 to 5)
    var sum = sc.accumulator(0)
    data.foreach(sum += _)
    sum.value
    data.collect().foreach(println)

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    sum: org.apache.spark.Accumulator[Int] = 0
    res0: Int = 15
    1
    2
    3
    4
    5

HDFS

saveAsTextFile 算子

  • 说明:使用元素的字符串表示形式将此 RDD 保存为文本文件。将数据输出,存储到 HDFS 的指定目录。

    下面为 saveAsTextFile 函数的内部实现,其内部通过调用 saveAsHadoopFile 进行实现:

    this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

    将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。下图中左侧方框代表 RDD 分区,右侧方框代表 HDFS 的 Block。通过函数将 RDD 的每个分区存储为 HDFS 中的一个 Block。

    saveAsTextFile 算子

  • 用法:

    1
    2
    def saveAsTextFile(path: String): Unit
    Save this RDD as a text file, using string representations of elements.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    val data = sc.parallelize(1 to 10, 3)
    data.partitions.size
    data.saveAsTextFile("/test/rdd")

    // 输出结果
    data: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Int = 3

    [root@master]# hdfs dfs -ls /test/rdd
    Found 4 items
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/_SUCCESS
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/part-00000
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/part-00001
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/part-00002

saveAsObjectFile 算子

  • 说明:将此 RDD 保存为序列化对象的 SequenceFile。

    saveAsObjectFile 将分区中的每 10 个元素组成一个 Array,然后将这个 Array 序列化,映射为 (Null, BytesWritable(Y)) 的元素,写入 HDFS 为 SequenceFile 的格式。下面代码为函数内部实现。

    map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))

    下图中的左侧方框代表 RDD 分区,右侧方框代表 HDFS 的 Block。通过函数将 RDD 的每个分区存储为 HDFS 上的一个 Block。

    saveAsObjectFile 算子

  • 用法:

    1
    2
    3
    4
    5
    def saveAsObjectFile(path: String): Unit
    Save this RDD as a SequenceFile of serialized objects.

    def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
    Save this RDD as a compressed text file, using string representations of elements.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    val data = sc.parallelize(1 to 10, 3)
    data.partitions.size
    data.saveAsObjectFile("/test/rdd")

    // 输出结果
    data: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Int = 3

    [root@master]# hdfs dfs -ls /test/rdd
    Found 4 items
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/_SUCCESS
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/part-00000
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/part-00001
    -rw-r--r-- 3 root supergroup 0 2021-05-07 16:57 /test/rdd/part-00002

    [root@master]# hadoop fs -cat /test/rdd/part-00000
    SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT

Scala 集合和数据类型

collect 算子

  • 说明:返回一个包含此 RDD 中所有元素的数组。

    collect 相当于 toArray,toArray 已经过时不推荐使用,collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。下图中左侧方框代表 RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到 Driver 程序所在的节点,以数组形式存储。

    collect 算子

  • 用法:

    1
    2
    def collect(): List[T]
    Return an array that contains all of the elements in this RDD.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(1 to 10, 3)
    data.collect

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

collectAsMap 算子

  • 说明:collectAsMap 对 (K,V) 型的 RDD 数据返回一个单机 HashMap。对于重复 K 的 RDD 元素,后面的元素覆盖前面的元素。

    下图中的左侧方框代表 RDD 分区,右侧方框代表单机数组。数据通过 collectAsMap 函数返回给 Driver 程序计算结果,结果以 HashMap 形式存储。

    collectAsMap 算子

  • 用法:

    1
    2
    def collectAsMap(): Map[K, V]
    Return the key-value pairs in this RDD to the master as a Map.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(Array(("a", 1), ("b,2"), ("a", 3)))
    data.collectAsMap

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: scala.collection.Map[String,Int] = Map(a -> 3, b -> 2)

reduceByKeyLocally 算子

  • 说明:使用关联和可交换的 reduce 函数合并每个键的值,但是将结果作为 Map 返回。

    实现的是先 reduce 再 collectAsMap 的功能,先对 RDD 的整体进行 reduce 操作,然后再收集所有结果返回为一个 HashMap。

  • 用法:

    1
    2
    def reduceByKeyLocally(func: Function2[V, V, V]): Map[K, V]
    Merge the values for each key using an associative and commutative reduce function, but return the result immediately to the master as a Map.

  • 示例:

    1
    2
    3
    4
    5
    6
    var data = sc.parallelize(Array(("a", 0), ("a", 2), ("b", 1), ("b", 2), ("c", 1)))
    data.reduceByKeyLocally((x, y) => x + y)

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: scala.collection.Map[String,Int] = Map(a -> 2, b -> 3, c -> 1)

lookup 算子

  • 说明:返回 RDD 中 key 的值列表。 lookup 函数对 (Key, Value) 型的 RDD 操作,返回指定 Key 对应的元素形成的 Seq。这个函数处理优化的部分在于,如果这个 RDD 包含分区器,则只会对应处理 K 所在的分区,然后返回由 (K,V) 形成的 Seq。如果 RDD 不包含分区器,则需要对全 RDD 元素进行暴力扫描处理,搜索指定 K 对应的元素。

    下图中的左侧方框代表 RDD 分区,右侧方框代表 Seq,最后结果返回到 Driver 所在节点的应用中。

    lookup 算子

  • 用法:

    1
    2
    def lookup(key: K): List[V]
    Return the list of values in the RDD for key.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    var data = sc.parallelize(Array(("a", 0), ("a", 2), ("b", 1)))
    data.lookup("a")
    data.lookup("b")

    // 输出结果
    data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Seq[Int] = WrappedArray(0, 2)
    res1: Seq[Int] = WrappedArray(1)

count 算子

  • 说明: 返回整个 RDD 的元素个数。

    下图中,返回数据的个数为 5。一个方块代表一个 RDD 分区。

    count 算子

  • 用法:

    1
    2
    def count(): Long
    Return the number of elements in the RDD.

  • 示例:

    1
    2
    3
    4
    5
    6
    val data = sc.parallelize(1 to 5)
    data.count

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Long = 5

top 算子

  • 说明:返回此 RDD 中的前 k 个(最大)元素,并保持顺序。

    相近函数:

    • top 返回最大的 k 个元素。
    • take 返回最小的 k 个元素。
    • takeOrdered 返回前 k 个(最小)元素,并且在返回的数组中保持元素的顺序。
    • first 相当于 top(1) 返回整个 RDD 中的前 k 个元素。
  • 用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def top(num: Int): List[(K, V)]
    Returns the top k (largest) elements from this RDD using the natural ordering for T and maintains the order.

    def top(num: Int, comp: Comparator[(K, V)]): List[(K, V)]
    Returns the top k (largest) elements from this RDD as defined by the specified Comparator[T] and maintains the order.

    def take(num: Int): List[(K, V)]
    Take the first num elements of the RDD.

    def takeOrdered(num: Int): List[(K, V)]
    Returns the first k (smallest) elements from this RDD using the natural ordering for T while maintain the order.

    def takeOrdered(num: Int, comp: Comparator[(K, V)]): List[(K, V)]
    Returns the first k (smallest) elements from this RDD as defined by the specified Comparator[T] and maintains the order.

    def first(): (K, V)
    Return the first element in this RDD.

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val data = sc.parallelize(1 to 10)
    data.top(3)
    data.take(3)
    data.takeOrdered(3)
    data.first

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(10, 9, 8)
    res1: Array[Int] = Array(1, 2, 3)
    res2: Array[Int] = Array(1, 2, 3)
    res3: Int = 1

reduce 算子

  • 说明:使用指定的可交换和关联的二进制运算符对 RDD 的元素进行运算。

    reduce 函数相当于对 RDD 中的元素进行 reduceLeft 函数的操作。

    reduceLeft 先对两个元素 (K, V) 进行 reduce 函数操作,然后将结果和迭代器取出的下一个元素 (K, V) 进行 reduce 函数操作,直到迭代器遍历完所有元素,得到最后结果。在 RDD 中,先对每个分区中的所有元素 (K, V) 的集合分别进行 reduceLeft。每个分区形成的结果相当于一个元素 (K, V),再对这个结果集合进行 reduceLeft 操作。

    下图中的方框代表一个 RDD 分区,通过用户自定函数 f 将数据进行 reduce 运算。

    reduce 算子

  • 用法:

    1
    2
    def reduce(f: Function2[(K, V), (K, V), (K, V)]): (K, V)
    Reduces the elements of this RDD using the specified commutative and associative binary operator.

  • 示例:

    1
    2
    3
    4
    5
    6
    var data = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 1), ("d", 2)))
    data.reduce((x, y) => (x._1 + "@" + y._1, x._2 + y._2))

    // 输出结果
    data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: (String, Int) = (c@d@a@b,6)

fold 算子

  • 说明:使用给定的关联函数和中性的“零值”,汇总每个分区的元素,然后汇总所有分区的结果。

    fold 和 reduce 的原理相同,但是与 reduce 不同,相当于每个 reduce 时,迭代器取的第一个元素是 zeroValue。

    下图中通过下面的用户自定义函数进行 fold 运算,图中的一个方框代表一个 RDD 分区。读者可以参照 reduce 函数理解。

    fold 算子

  • 用法:

    1
    2
    def fold(zeroValue: (K, V))(f: Function2[(K, V), (K, V), (K, V)]): (K, V)
    Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value".

  • 示例:

    1
    2
    3
    4
    5
    6
    var data = sc.parallelize(Array(("a", 1), ("a", 2), ("b", 1), ("b", 2)), 2)
    data.fold(("A", 10))((x, y) => (x._1 + "@" + y._1, x._2 + y._2))

    // 输出结果
    data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: (String, Int) = (A@A@a@b@A@c@d,36)

aggregate 算子

  • 说明:使用给定的合并功能和中性的“零值”,汇总每个分区的元素,然后汇总所有分区的结果。此函数可以返回与该 RDD 的类型 T 不同的结果类型 U。因此,我们需要一个将 T 合并为 U 的操作,以及一个将两个 U 合并的操作。

    aggregate 先对每个分区的所有元素进行 aggregate 操作,再对分区的结果进行 fold 操作。

    aggregate 与 fold 和 reduce 的不同之处在于,aggregate 相当于采用归并的方式进行数据聚合,这种聚合是并行化的。而在 fold 和 reduce 函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚合,并返回最终聚合结果。

    下图图通过用户自定义函数对 RDD 进行 aggregate 的聚合操作,图中的每个方框代表一个 RDD 分区。

    aggregate 算子

  • 用法:

    1
    2
    def aggregate[U](zeroValue: U)(seqOp: Function2[U, (K, V), U], combOp: Function2[U, U, U]): U
    Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value".

  • 示例:

    1
    2
    3
    4
    5
    6
    var data = sc.parallelize(Array(("a", 1), ("a", 2), ("b", 1), ("b", 2)), 2)
    data.aggregate(("A", 10))((x, y) => (x._1 + "@" + y._1, x._2 + y._2), (x, y) => (x._1 + "@" + y._1, x._2 + y._2))

    // 输出结果
    data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: (String, Int) = (A@A@a@b@A@c@d,36)

takeSample 算子

  • 说明:在数组中返回此 RDD 的固定大小的采样子集

    takeSample 函数和上面的 sample 函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是 RDD,而是相当于对采样后的数据进行 collect,返回结果的集合为单机的数组。图中左侧的方框代表分布式的各个节点上的分区,右侧方框代表单机上返回的结果数组。通过 takeSample 对数据采样,设置为采样一份数据,返回结果为 V1。

    takeSample 算子

  • 用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
    Return a fixed-size sampled subset of this RDD in an array

    withReplacement
    whether sampling is done with replacement
    是否通过更换进行采样

    num
    size of the returned sample
    返回样本的大小

    seed
    seed for the random number generator
    随机数生成器的种子

  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    val data = sc.parallelize(1 to 10, 3)
    data.takeSample(true, 1, 9)
    data.takeSample(false, 1, 9)

    // 输出结果
    data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
    res0: Array[Int] = Array(1)
    res1: Array[Int] = Array(3)