Spark:5、Spark RDD 编程(Action)

常用算子:Action

1) reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的。

scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

scala> rdd1.reduce(_+_)
res50: Int = 55

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)]

2) collect()

在驱动程序中,以数组的形式返回数据集的所有元素。

scala> var rdd1 = sc.makeRDD(1 to 10, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> rdd1.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3) count()

返回RDD的元素个数。

scala> var rdd2 = sc.makeRDD(1 to 10, 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd2.count()
res1: Long = 10

4) first()

返回RDD的第一个元素(类似于take(1))。

scala> var rdd3 = sc.makeRDD(1 to 10, 2)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24

scala> rdd3.first()
res2: Int = 1

5) take(n)

返回一个由数据集的前n个元素组成的数组。

scala> var rdd4 = sc.makeRDD(1 to 10, 2)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> rdd4.take(5)
res3: Array[Int] = Array(1, 2, 3, 4, 5)

6) takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子。

scala> var rdd5 = sc.parallelize(1 to 10, 2)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd5.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd5.takeSample(true, 5, 3)
res5: Array[Int] = Array(3, 5, 5, 9, 7)

7) takeOrdered(n)

返回前几个的排序。

scala> val rdd6 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:24

scala> rdd6.top(2)
res6: Array[Int] = Array(12, 10)

scala> rdd6.takeOrdered(2)
res7: Array[Int] = Array(2, 3)

scala> rdd6.takeOrdered(4)
res8: Array[Int] = Array(2, 3, 4, 10)

8) aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

scala> var rdd7 = sc.makeRDD(1 to 10, 2)
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:24

scala> rdd7.aggregate(1)(
     | {(x: Int, y: Int) => x + y},
     | {(a: Int, b: Int) => a + b})
res9: Int = 58

scala> rdd7.aggregate(1)(
     | {(x: Int, y: Int) => x * y},
     | {(a: Int, b: Int) => a + b})
res10: Int = 30361

9) fold(num)(func)

折叠操作,aggregate的简化操作,seqop和combop一样。

scala> var rdd8 = sc.makeRDD(1 to 4,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24

scala> rdd8.aggregate(1)(
     | {(x : Int,y : Int) => x + y},
     | {(a : Int,b : Int) => a + b}
     | )
res59: Int = 13

scala> rdd8.fold(1)(_+_)
res60: Int = 13

10) saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本。

scala> val rdd8 = sc.parallelize(1 to 10, 2)
rdd8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd8.saveAs
saveAsObjectFile   saveAsTextFile

scala> rdd8.saveAsTextFile("hdfs://hadoop001:9000/my_rdd")

11) saveAsObjectFile(path)

用于将RDD中的元素序列化成对象,存储到文件中。

scala> val rdd10 = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3), (3, 6), (3, 8)))
rdd10: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> rdd10.saveAsObjectFile("hdfs://hadoop001:9000/my_obj")

12) countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

scala> rdd.countByKey()
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

13) foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

scala> var rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

scala> var sum = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
sum: org.apache.spark.Accumulator[Int] = 0

scala> rdd.foreach(sum+=_)

scala> sum.value
res68: Int = 55

scala> rdd.collect().foreach(println)
1
2
3
4
5
6
7
8
9
10

RDD中数值元素的统计操作

Spark 对包含数值数据的 RDD 提供了一些描述性的统计操作。Spark 的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以StatsCounter对象返回。

scala> var rdd1 = sc.makeRDD(1 to 100)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at makeRDD at <console>:32

scala> rdd1.sum()
res34: Double = 5050.0

scala> rdd1.max()
res35: Int = 100

RDD算子中接受的函数注意事项

Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。 在 Scala 中,我们可以把定义的内联函数、方法的引用或静态方法传递给 Spark,就像 Scala 的其他函数式 API 一样。我们还要考虑其他一些细节,比如所传递的函数及其引用 的数据需要是可序列化的(实现了 Java 的 Serializable 接口)。 传递一个对象的方法或者字段时,会包含对整个对象的引用。

class SearchFunctions(val query: String) extends java.io.Serializable{
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
    // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this" 
    rdd.filter(isMatch)
  }
  def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = { 
// 问题:"query"表示"this.query",因此我们要传递整个"this" 
rdd.filter(x => x.contains(query)) 
  }
  def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String] = {
    // 安全:只把我们需要的字段拿出来放入局部变量中 
val query_ = this.query
    rdd.filter(x => x.contains(query_))
  } 
}
如果在 Scala 中出现了 NotSerializableException,通常问题就在于我们传递了一个不可序列 化的类中的函数或字段。

不同RDD类型的转换

有些函数只能用于特定类型的 RDD,比如 mean() 和 variance() 只能用在数值 RDD 上, 而 join() 只能用在键值对 RDD 上。在 Scala 和 Java 中,这些函数都没有定义在标准的 RDD 类中,所以要访问这些附加功能,必须要确保获得了正确的专用 RDD 类。
在 Scala 中,将 RDD 转为有特定函数的 RDD(比如在 RDD[Double] 上进行数值操作)是 由隐式转换来自动处理的。

您的支持将鼓励我继续创作!
-------------本文结束感谢您的阅读-------------