spark算子(spark算子类型)
本篇文章给大家谈谈spark算子,以及spark算子类型对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和区别
- 2、关于Spark算子aggregateByKey、foldByKey、combineByKey
- 3、Spark算子flatMap一对多生成数据,map一对一生成数据
- 4、spark三类算子小总结
- 5、Spark之RDD算子-转换算子
- 6、Spark算子:RDD基本转换操作(7)–zipWithIndex、zipWithUniqueId
Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和区别
在对RDDPair(一种特殊的 RDD,即RDD[(key, Row)])进行操作时经常会用埋隐到 reduceByKey() 和 groupByKey() 两个算子。下面看看两者的区别和使用方法:
使用reduceByKey()的时候, 本地的数据先进行merge 然后再传输到不同节点再进行merge,最终得到最终结果。
而使用groupByKey()的时候,并 不进行本地的merge ,全码茄部数据传出,弯模厅得到全部数据后才会进行聚合成一个sequence,groupByKey()传输速度明显慢于reduceByKey()。
虽然groupByKey().map(func)也能实现reduceByKey(func)功能,但是 优先使用reduceByKey(func) 。
测试结果:
[img]关于Spark算子aggregateByKey、foldByKey、combineByKey
关于spark的算子有很多,今天详细介绍下3个算子 aggregateByKey、foldByKey、combineByKey
首先我们看下源码里关于这个算子的介绍:
然后我idea装了翻译插件,这里看下机器翻译的效果:
可能第一次看不是特别理解它表达的意思,比如V啊U啊看不懂,这不要紧,重点是中间的一句话:
我们可以很明显的看到'分区中'和'分区之前袜间'这样的字眼,想必这个算子的操作对象应该是分区内和分区间的。然后在看看这个函数,接受2个参数,这种入参的方式第一次看的人可能不太习惯,这叫‘柯里化’,这里不细说,简单理解就是我要传入2个参数到这个算子中,然后返回一个RDD[(K,U)],接下来我们直接看这个算子的效果。(先看看猪是怎么跑的)
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;
(3)combOp:函数用于合并每个分区中的结果。
以上看完后,你应该有个初级的认知了,但是在细节上可能还是有点懵,接下来在详细说下,首先既然是分区间和分区内,我们先看下刚刚那个rdd的算子是在分区上是怎么发布的。
哦,原来key为a,a,c在同一个分区,bcc在同一个分区上。
看到这你应该已经清楚很多了吧(这软件属实不好用,画的太累- -)。整体这个算子就是说:对每个分区内,第一个出现的key赋值0(这边0只是表示初始值,并不是只能为0),让它和下一个value进行比较,因为后面的逻辑处理需要两个数,但第一个出现key的value是没有值和它比较的,所以要 对第一次出现key给个初始值。之后就是分区内我要做什么操作,分区间我要做什么操作。
因此这边我如果要用aggregateByKey写个wordcount其实就是
最后我们深入下源码细节:
喔~ 原来它底层是用combineByKeyWithClassTag这个方法。我们先记着这个。
这时候就发现foldByKey底层调用和刚才那个aggregateBeKey有些眼熟啊
果然,看样子他们底层确实是类似,主要区别在于foldByKey的分区内和分区间的函数逻辑是一样的。所以他的参数缩减为1个, 既作用于分区内又作用于分区间 。
这边我就不画图了,foldByKey写个wordcount其实就是:
看到这里你应该明白了其实这个3个算子底层调用都是同一个函数。大体套路都是:初始,分区内,分区间。combineByKey会比较特别一些。我们先看下它是怎么用的。
可能我表述的比较粗糙,但大体意思就是说第一次假如是(a,3),然后value转化结构,变成(3,1),然后对于下一个进来的(a,2) ,就是(3+2,1+1),就是a的值是(5,2),这就是分区内相同key的操作。分区间操作斗配就是空悔指比如一个分区a(5,2),另一个分区是a(4,3),那分区间操作就是(5+4,2+3)。关于combineByKey 还有一个细节就是参数 上,它参数是有附带类型的,这边假如我acc那个形参如果没有加上类型,是会报错的。那为什么aggregateByKey和foldByKey没有标注类型却不会报错呢?
先看下aggregateByKey:
而combineByKey是没有这个的,所以需要注明上这个类型。(这边可能表述不是很准确)
以上就是关于spark算子的一些介绍,欢迎大家纠正补充,一起学习~
Spark算子flatMap一对多生成数据,map一对一生成数据
Spark的map算子只能一对一生成数据,要想一对多生成,用flatMap。
spark三类算子小总结
其实很早之前就想对spark做一下自己的阐述,一直也无奈于不能系统的进行以下自己的解释,现在还是想粗略的说一下我自己对spark的一些认识。
spark相对于mapreduce来说,计算模型可以提供更强大的功吵运能,他使用的是迭代模型,我们在处理完一个纳碰山阶段以后,可以继续往下洞中处理很多个阶段,而不只是像mapreduce一样只有两个阶段。
spark大致分为这三种算子:
1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
在这里,我会将对map、flatMap、glom、union、cartesian(笛卡尔操作)、groupBy、filter、distinct(去重)、subtract这9种算子进行描述。
2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。
而对于Key-Value的算子,就简单的解释一下mapValues、combineByKey、reduceByKey、partitionBy、cogroup、join、leftOutJoin、rightOutJoin这几类进行我的解释。
3、Action算子,这类算子会触发SparkContext提交作业。
针对action算子,foreach、collect、collectAsMap、reduceByKeyLocally、lookup、count、top、
reduce、fold、aggregate。大致就是这几项了。
一、Value数据类型的Transformation算子
1)map
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//rdd有5个元素,将他们分成3个partition
val b = a.map(_.length)//导入数据使用parallelize方式
val c = a.zip(b)
c .collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
Map是把操作映射到每个values里面去。
上述示意图为:
2)flatMap
val a = sc.parallelize(1 to 10, 5)
//rdd有10个元素,将1到10分成5个partition
a.flatMap(1 to _).collect
res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5,6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4,5, 6, 7, 8, 9, 10)
//每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出
sc.parallelize(List(1, 2, 3), 2).flatMap(x = List(x, x, x)).collect
res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
flatMap是把一个vlaue变成数组,再打断。
该实例的操作示意图为:
3)subtract
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(6, 9, 4,7, 5, 8)
针对这个实例画出示意图:
4)glom
val a = sc.parallelize(1 to 100, 3)
a.glom.collect
res8: Array[Array[Int]] = Array(Array(1, 2, 3,..., 33), Array(34,35,..., 65, 66), Array(67, ..., 100))
针对这个实例画出示意图:
5)union
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)
针对这个实例画出示意图:
6)cartesian(笛卡尔操作)
val x =sc.parallelize(List(1,2,3,4,5))
val y =sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] =Array((1,6), (1,7), (1,8), (1,9),(1,10), (2,6), (2,7), (2,8), (2,9),(2,10), (3,6), (3,7), (3,8), (3,9),(3,10), (4,6), (5,6), (4,7), (5,7),
(4,8), (5,8), (4,9), (4,10), (5,9),(5,10))
针对这个实例画出示意图:
7)groupBy(生成相应的key,相同的放在一起)
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x = { if (x % 2 == 0) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] =Array((even,ArrayBuffer(2, 4, 6,8)), (odd,ArrayBuffer(1, 3, 5, 7,9)))
针对这个实例画出示意图:
8)filter
val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res3: Array[Int] = Array(2, 4, 6, 8, 10)
针对这个实例画出示意图:
9)distinct(去重)
val c =sc.parallelize(List("Gnu", "Cat","Rat", "Dog", "Gnu", "Rat"), 2)
rdd有6个元素,将这六个元素分成2个partition
c.distinct.collect//将重复出现的元素使用distinct函数去除再形成数组
res6: Array[String] = Array(Dog,Gnu, Cat, Rat)
针对这个实例画出示意图:
二、Key-Value数据类型的Transformation算子
1)mapValues
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
//RDD有6个元素,分别为dog,lion,cat...,将他们分成2个partition
val b = a.map(x = (x.length, x))
b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
针对这个实例画出示意图:
2)combineByKey
val a =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) = y :: x,(x:List[String], y:List[String]) = x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu,rabbit, salmon, bee, bear, wolf)))
该实例的操作示意图为:
3)reduceByKey
val a = sc.parallelize(List("dog", "tiger", "dog", "cat", "dog", "eagle", "cat"), 2)
//rdd有7个元素,将他们分成2个partition
val b = a.map(x = (x.length, x))
b.reduceByKey(_ + _).collect //使用reduceByKey(_+_)的方式
res87: Array[(Int, String)] = Array((3,dog), (1,tiger), (2,cat),(1,eagle))
该实例的操作示意图为:
4)partitionBy
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并
5)cogroup
对两个RDD中的KV元素,每个RDD相同key中元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同key的元素进行合并
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(ArrayBuffer(b),ArrayBuffer(c))),(3,(ArrayBuffer(b),ArrayBuffer(c))),(1,(ArrayBuffer(b, b),ArrayBuffer(c, c))))
该实例的操作示意图为:
6)join
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
该实例的操作示意图为:
7)leftOutJoin
将LEFT左边的表名1中的所有记录全部保留,而将右边的表名2中的字段B与表名1.字段A相对应的记录显示出来
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val d = c.keyBy(_.length)
b.leftOuterJoin(d).collect
res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))
该实例的操作示意图为:
8)rightOutJoin(右外连接)
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect
res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))
该实例的操作示意图为:
三、Action算子
1)foreach
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)//导入数据使用parallelize方式
c.foreach(x = println(x + "s are yummy"))//得到一条数据就处理一条数据
该实例的操作示意图为:
2)fold
val a = sc.parallelize(List(1,2,3), 3)
a.fold(0)(_ + _)
res59: Int = 6
针对这个实例画出示意图:
3)aggregate
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
// lets first print out the contents of the RDD with partition labels
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x = "[partID:" + index + ", val: " + x + "]").iterator
}
z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val:3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])
z.aggregate(0)(math.max(_, _), _ + _)
res40: Int = 9
针对这个实例画出示意图:
4)collect
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect //通过collect算子将两个partition结合成一个
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
针对这个实例画出示意图:
5)collectAsMap
RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collectAsMap
res1: scala.collection.Map[Int,Int] = Map(2 - 2, 1 - 1, 3 - 3)
针对这个实例画出示意图:
6)reduceByKeyLocally
该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
val a =sc.parallelize(List("dog","cat", "owl", "gnu", "ant"), 2)
val b = a.map(x = (x.length,x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] =Array((3,dogcatowlgnuant))
针对这个实例画出示意图:
7)lookup
val a =sc.parallelize(List("dog","tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x = (x.length, x))
b.lookup(5)
res0: Seq[String] = WrappedArray(tiger, eagle)
针对这个实例画出示意图:
8)count
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4
针对这个实例画出示意图:
9)top
val c = sc.parallelize(Array(6, 9, 4,7, 5, 8), 2)
c.top(2)
res28: Array[Int] = Array(9, 8)
针对这个实例画出示意图:
10)reduce
val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res41: Int = 5050
针对这个实例画出示意图:
这篇文章也是写了一个礼拜才出来的成果,本身不是很熟悉,需要一个算子一个算子去理解,有些地方也没有很准确,但也是我理解能力范围之内了,网站上不太能查到有关于这方面的知识,所以绝大部分靠自学,理解不对的地方也请多多包涵啦。对于mapreduce和spark的理解也仅限于参考网上的知识加上自己的一些见解。
-------------------------------------------参考:-------------------------------------------
------------------------------------------------------------参考:-----------------------------------------------------------
Spark之RDD算子-转换算子
转换(Transformation)算子 就是对RDD进行操作的接口函数,其作用是将一个或多个RDD变换成新的RDD。
使用Spark进行数据计算,在利用创建算子生成RDD后,数据处理的春前算法设计和程序编写的最关键部分,就是利用变换算子对原始数据产生的RDD进行一步一步的变换,最终得到期望的计算结果。
对于变换算子可理解为分两类:1,对Value型RDD进行变换的算子;2,对Key/Value型RDD进行变换算子。在每个变换中有仅对一个RDD进行变换的,也有是对两个RDD进行变换的。
将当前RDD进行重新分区,生成一个以numPartitions参数指定的分区数存储的新RDD。参数shuffle为true时在变换过程中进行shuffle操作,否则不进行shuffle。
在Linux系统中,有许多对数据进行处理的shell命令,我们可能通过pipe变换将一些shell命令用于Spark中生成新的RDD。
对原RDD中的元素按照函数f指定的规则进行排扒敬清序,并可通过ascending参数进行升序或降序设置,排序后的结果生成新的RDD,新的RDD的分区数量可以由参数numPartitions指定,默认与原RDD相同的分区数。
输入参数为另一个RDD,返回两个RDD中所有元素的笛卡尔积。
输入参数为另一个RDD,返回原始RDD与输入参数RDD的补集,即生成由原始RDD中而不在输入参数RDD中的元素构成新的RDD,参数numPartitions指定新RDD分区数。
返回原始RDD与另一个RDD的并集。
生成由原始RDD的值为Key,另一个RDD的值为Value依次配对稿销构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD
将Key/Value型RDD中的元素的Key提取出来,所有Key值构成一个序列形成新的RDD。
将Key/Value型RDD中的元素的Value值使用输入参数函数f进行变换构成一个新的RDD。
Spark算子:RDD基本转换操作(7)–zipWithIndex、zipWithUniqueId
关键字:Spark算子、Spark RDD基本转换、zipWithIndex、zipWithUniqueId
zipWithIndex
def zipWithIndex(): RDD[(T, Long)]
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
该函数将RDD中元素和一卖竖个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分虚春区数)
看下面的例子:
//总分区数为2
/差配耐/第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
关于spark算子和spark算子类型的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。