spark常用算子(spark的collect算子)
## Spark 常用算子详解### 简介Spark 作为一个强大的分布式计算框架,其核心是
RDD (弹性分布式数据集)
,而 RDD 上的各种操作则是通过
算子
实现的。算子可以对 RDD 进行转换和操作,例如:-
转换算子 (Transformations)
:产生新的 RDD,不会立即执行,通常用于构建数据处理流程。 -
行动算子 (Actions)
:触发实际计算,返回结果到驱动程序,通常用于输出结果或进行统计分析。本文将详细介绍 Spark 中常用的算子,并结合代码示例说明其用法。### 1. 转换算子#### 1.1 map
作用:
将函数应用于 RDD 中的每个元素,并生成一个新的 RDD。
语法:
```scala rdd.map(func) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val squaredRDD = rdd.map(x => x
x) squaredRDD.collect().foreach(println) // 输出: 1, 4, 9, 16 ```#### 1.2 flatMap
作用:
将函数应用于 RDD 中的每个元素,并将结果展平为一个新的 RDD。
语法:
```scala rdd.flatMap(func) ```
示例:
```scala val rdd = sc.parallelize(List("spark", "hadoop")) val wordsRDD = rdd.flatMap(x => x.split(" ")) wordsRDD.collect().foreach(println) // 输出: spark, hadoop ```#### 1.3 filter
作用:
根据条件过滤 RDD 中的元素,并生成一个新的 RDD。
语法:
```scala rdd.filter(func) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) val evenRDD = rdd.filter(x => x % 2 == 0) evenRDD.collect().foreach(println) // 输出: 2, 4 ```#### 1.4 distinct
作用:
去除 RDD 中的重复元素,生成一个新的 RDD。
语法:
```scala rdd.distinct() ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 2, 3, 3, 3)) val distinctRDD = rdd.distinct() distinctRDD.collect().foreach(println) // 输出: 1, 2, 3 ```#### 1.5 union
作用:
合并两个 RDD,生成一个新的 RDD。
语法:
```scala rdd1.union(rdd2) ```
示例:
```scala val rdd1 = sc.parallelize(List(1, 2, 3)) val rdd2 = sc.parallelize(List(4, 5, 6)) val unionRDD = rdd1.union(rdd2) unionRDD.collect().foreach(println) // 输出: 1, 2, 3, 4, 5, 6 ```#### 1.6 intersection
作用:
获取两个 RDD 的交集,生成一个新的 RDD。
语法:
```scala rdd1.intersection(rdd2) ```
示例:
```scala val rdd1 = sc.parallelize(List(1, 2, 3)) val rdd2 = sc.parallelize(List(2, 3, 4)) val intersectionRDD = rdd1.intersection(rdd2) intersectionRDD.collect().foreach(println) // 输出: 2, 3 ```#### 1.7 subtract
作用:
从第一个 RDD 中移除第二个 RDD 中的元素,生成一个新的 RDD。
语法:
```scala rdd1.subtract(rdd2) ```
示例:
```scala val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(List(2, 4)) val subtractRDD = rdd1.subtract(rdd2) subtractRDD.collect().foreach(println) // 输出: 1, 3, 5 ```#### 1.8 groupBy
作用:
将 RDD 中的元素根据给定的函数分组,生成一个新的 RDD,其中每个元素是一个 (Key, Iterable[Value]) 形式的元组。
语法:
```scala rdd.groupBy(func) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) val groupedRDD = rdd.groupBy(x => x % 2) groupedRDD.collect().foreach(println) // 输出: (0,[2,4,6]), (1,[1,3,5]) ```#### 1.9 reduceByKey
作用:
对每个 Key 的 Value 使用给定的函数进行累加操作,生成一个新的 RDD。
语法:
```scala rdd.reduceByKey(func) ```
示例:
```scala val rdd = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3), ("b", 4))) val reducedRDD = rdd.reduceByKey((x, y) => x + y) reducedRDD.collect().foreach(println) // 输出: (a,4), (b,6) ```#### 1.10 sortByKey
作用:
根据 Key 对 RDD 进行排序,生成一个新的 RDD。
语法:
```scala rdd.sortByKey(ascending = true) ```
示例:
```scala val rdd = sc.parallelize(List(("b", 2), ("a", 1), ("c", 3))) val sortedRDD = rdd.sortByKey() sortedRDD.collect().foreach(println) // 输出: (a,1), (b,2), (c,3) ```### 2. 行动算子#### 2.1 collect
作用:
将 RDD 中的所有元素收集到驱动程序的集合中。
语法:
```scala rdd.collect() ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val data = rdd.collect() data.foreach(println) // 输出: 1, 2, 3, 4 ```#### 2.2 take
作用:
获取 RDD 中的前 n 个元素。
语法:
```scala rdd.take(n) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val firstTwo = rdd.take(2) firstTwo.foreach(println) // 输出: 1, 2 ```#### 2.3 first
作用:
获取 RDD 中的第一个元素。
语法:
```scala rdd.first() ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val firstElement = rdd.first() println(firstElement) // 输出: 1 ```#### 2.4 count
作用:
统计 RDD 中元素的个数。
语法:
```scala rdd.count() ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val count = rdd.count() println(count) // 输出: 4 ```#### 2.5 reduce
作用:
将 RDD 中的元素使用给定的函数进行累加操作,最终返回一个单一的值。
语法:
```scala rdd.reduce(func) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val sum = rdd.reduce((x, y) => x + y) println(sum) // 输出: 10 ```#### 2.6 foreach
作用:
将函数应用于 RDD 中的每个元素,但不返回新的 RDD。
语法:
```scala rdd.foreach(func) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) rdd.foreach(x => println(x
2)) // 输出: 2, 4, 6, 8 ```#### 2.7 saveAsTextFile
作用:
将 RDD 中的元素保存到文本文件中。
语法:
```scala rdd.saveAsTextFile(path) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) rdd.saveAsTextFile("output/data.txt") ```#### 2.8 saveAsSequenceFile
作用:
将 RDD 中的元素保存到 SequenceFile 格式文件中。
语法:
```scala rdd.saveAsSequenceFile(path) ```
示例:
```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) rdd.saveAsSequenceFile("output/data.seq") ```### 总结Spark 的算子是其功能的核心,掌握这些算子可以帮助你高效地处理数据。本文介绍了 Spark 中常用的算子,并提供了相应的代码示例。希望本文能够帮助你更好地理解 Spark 的工作原理,并将其应用于实际的项目开发中。
Spark 常用算子详解
简介Spark 作为一个强大的分布式计算框架,其核心是 **RDD (弹性分布式数据集)**,而 RDD 上的各种操作则是通过 **算子** 实现的。算子可以对 RDD 进行转换和操作,例如:- **转换算子 (Transformations)**:产生新的 RDD,不会立即执行,通常用于构建数据处理流程。 - **行动算子 (Actions)**:触发实际计算,返回结果到驱动程序,通常用于输出结果或进行统计分析。本文将详细介绍 Spark 中常用的算子,并结合代码示例说明其用法。
1. 转换算子
1.1 map**作用:** 将函数应用于 RDD 中的每个元素,并生成一个新的 RDD。**语法:**```scala rdd.map(func) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val squaredRDD = rdd.map(x => x * x) squaredRDD.collect().foreach(println) // 输出: 1, 4, 9, 16 ```
1.2 flatMap**作用:** 将函数应用于 RDD 中的每个元素,并将结果展平为一个新的 RDD。**语法:**```scala rdd.flatMap(func) ```**示例:**```scala val rdd = sc.parallelize(List("spark", "hadoop")) val wordsRDD = rdd.flatMap(x => x.split(" ")) wordsRDD.collect().foreach(println) // 输出: spark, hadoop ```
1.3 filter**作用:** 根据条件过滤 RDD 中的元素,并生成一个新的 RDD。**语法:**```scala rdd.filter(func) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) val evenRDD = rdd.filter(x => x % 2 == 0) evenRDD.collect().foreach(println) // 输出: 2, 4 ```
1.4 distinct**作用:** 去除 RDD 中的重复元素,生成一个新的 RDD。**语法:**```scala rdd.distinct() ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 2, 3, 3, 3)) val distinctRDD = rdd.distinct() distinctRDD.collect().foreach(println) // 输出: 1, 2, 3 ```
1.5 union**作用:** 合并两个 RDD,生成一个新的 RDD。**语法:**```scala rdd1.union(rdd2) ```**示例:**```scala val rdd1 = sc.parallelize(List(1, 2, 3)) val rdd2 = sc.parallelize(List(4, 5, 6)) val unionRDD = rdd1.union(rdd2) unionRDD.collect().foreach(println) // 输出: 1, 2, 3, 4, 5, 6 ```
1.6 intersection**作用:** 获取两个 RDD 的交集,生成一个新的 RDD。**语法:**```scala rdd1.intersection(rdd2) ```**示例:**```scala val rdd1 = sc.parallelize(List(1, 2, 3)) val rdd2 = sc.parallelize(List(2, 3, 4)) val intersectionRDD = rdd1.intersection(rdd2) intersectionRDD.collect().foreach(println) // 输出: 2, 3 ```
1.7 subtract**作用:** 从第一个 RDD 中移除第二个 RDD 中的元素,生成一个新的 RDD。**语法:**```scala rdd1.subtract(rdd2) ```**示例:**```scala val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(List(2, 4)) val subtractRDD = rdd1.subtract(rdd2) subtractRDD.collect().foreach(println) // 输出: 1, 3, 5 ```
1.8 groupBy**作用:** 将 RDD 中的元素根据给定的函数分组,生成一个新的 RDD,其中每个元素是一个 (Key, Iterable[Value]) 形式的元组。**语法:**```scala rdd.groupBy(func) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6)) val groupedRDD = rdd.groupBy(x => x % 2) groupedRDD.collect().foreach(println) // 输出: (0,[2,4,6]), (1,[1,3,5]) ```
1.9 reduceByKey**作用:** 对每个 Key 的 Value 使用给定的函数进行累加操作,生成一个新的 RDD。**语法:**```scala rdd.reduceByKey(func) ```**示例:**```scala val rdd = sc.parallelize(List(("a", 1), ("b", 2), ("a", 3), ("b", 4))) val reducedRDD = rdd.reduceByKey((x, y) => x + y) reducedRDD.collect().foreach(println) // 输出: (a,4), (b,6) ```
1.10 sortByKey**作用:** 根据 Key 对 RDD 进行排序,生成一个新的 RDD。**语法:**```scala rdd.sortByKey(ascending = true) ```**示例:**```scala val rdd = sc.parallelize(List(("b", 2), ("a", 1), ("c", 3))) val sortedRDD = rdd.sortByKey() sortedRDD.collect().foreach(println) // 输出: (a,1), (b,2), (c,3) ```
2. 行动算子
2.1 collect**作用:** 将 RDD 中的所有元素收集到驱动程序的集合中。**语法:**```scala rdd.collect() ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val data = rdd.collect() data.foreach(println) // 输出: 1, 2, 3, 4 ```
2.2 take**作用:** 获取 RDD 中的前 n 个元素。**语法:**```scala rdd.take(n) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val firstTwo = rdd.take(2) firstTwo.foreach(println) // 输出: 1, 2 ```
2.3 first**作用:** 获取 RDD 中的第一个元素。**语法:**```scala rdd.first() ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val firstElement = rdd.first() println(firstElement) // 输出: 1 ```
2.4 count**作用:** 统计 RDD 中元素的个数。**语法:**```scala rdd.count() ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val count = rdd.count() println(count) // 输出: 4 ```
2.5 reduce**作用:** 将 RDD 中的元素使用给定的函数进行累加操作,最终返回一个单一的值。**语法:**```scala rdd.reduce(func) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) val sum = rdd.reduce((x, y) => x + y) println(sum) // 输出: 10 ```
2.6 foreach**作用:** 将函数应用于 RDD 中的每个元素,但不返回新的 RDD。**语法:**```scala rdd.foreach(func) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) rdd.foreach(x => println(x * 2)) // 输出: 2, 4, 6, 8 ```
2.7 saveAsTextFile**作用:** 将 RDD 中的元素保存到文本文件中。**语法:**```scala rdd.saveAsTextFile(path) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) rdd.saveAsTextFile("output/data.txt") ```
2.8 saveAsSequenceFile**作用:** 将 RDD 中的元素保存到 SequenceFile 格式文件中。**语法:**```scala rdd.saveAsSequenceFile(path) ```**示例:**```scala val rdd = sc.parallelize(List(1, 2, 3, 4)) rdd.saveAsSequenceFile("output/data.seq") ```
总结Spark 的算子是其功能的核心,掌握这些算子可以帮助你高效地处理数据。本文介绍了 Spark 中常用的算子,并提供了相应的代码示例。希望本文能够帮助你更好地理解 Spark 的工作原理,并将其应用于实际的项目开发中。