spark算子(spark算子类型)

本篇文章给大家谈谈spark算子,以及spark算子类型对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和区别

在对RDDPair(一种特殊的 RDD,即RDD[(key, Row)])进行操作时经常会用埋隐到 reduceByKey() 和 groupByKey() 两个算子。下面看看两者的区别和使用方法:

使用reduceByKey()的时候, 本地的数据先进行merge 然后再传输到不同节点再进行merge,最终得到最终结果。

而使用groupByKey()的时候,并 不进行本地的merge ,全码茄部数据传出,弯模厅得到全部数据后才会进行聚合成一个sequence,groupByKey()传输速度明显慢于reduceByKey()。

虽然groupByKey().map(func)也能实现reduceByKey(func)功能,但是 优先使用reduceByKey(func) 。

测试结果:

[img]

关于Spark算子aggregateByKey、foldByKey、combineByKey

关于spark的算子有很多,今天详细介绍下3个算子 aggregateByKey、foldByKey、combineByKey

首先我们看下源码里关于这个算子的介绍:

然后我idea装了翻译插件,这里看下机器翻译的效果:

可能第一次看不是特别理解它表达的意思,比如V啊U啊看不懂,这不要紧,重点是中间的一句话:

我们可以很明显的看到'分区中'和'分区之前袜间'这样的字眼,想必这个算子的操作对象应该是分区内和分区间的。然后在看看这个函数,接受2个参数,这种入参的方式第一次看的人可能不太习惯,这叫‘柯里化’,这里不细说,简单理解就是我要传入2个参数到这个算子中,然后返回一个RDD[(K,U)],接下来我们直接看这个算子的效果。(先看看猪是怎么跑的)

(1)zeroValue:给每一个分区中的每一个key一个初始值;

(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

(3)combOp:函数用于合并每个分区中的结果。

以上看完后,你应该有个初级的认知了,但是在细节上可能还是有点懵,接下来在详细说下,首先既然是分区间和分区内,我们先看下刚刚那个rdd的算子是在分区上是怎么发布的。

哦,原来key为a,a,c在同一个分区,bcc在同一个分区上。

看到这你应该已经清楚很多了吧(这软件属实不好用,画的太累- -)。整体这个算子就是说:对每个分区内,第一个出现的key赋值0(这边0只是表示初始值,并不是只能为0),让它和下一个value进行比较,因为后面的逻辑处理需要两个数,但第一个出现key的value是没有值和它比较的,所以要 对第一次出现key给个初始值。之后就是分区内我要做什么操作,分区间我要做什么操作。

因此这边我如果要用aggregateByKey写个wordcount其实就是

最后我们深入下源码细节:

喔~  原来它底层是用combineByKeyWithClassTag这个方法。我们先记着这个。

这时候就发现foldByKey底层调用和刚才那个aggregateBeKey有些眼熟啊

果然,看样子他们底层确实是类似,主要区别在于foldByKey的分区内和分区间的函数逻辑是一样的。所以他的参数缩减为1个, 既作用于分区内又作用于分区间 。

这边我就不画图了,foldByKey写个wordcount其实就是:

看到这里你应该明白了其实这个3个算子底层调用都是同一个函数。大体套路都是:初始,分区内,分区间。combineByKey会比较特别一些。我们先看下它是怎么用的。

可能我表述的比较粗糙,但大体意思就是说第一次假如是(a,3),然后value转化结构,变成(3,1),然后对于下一个进来的(a,2) ,就是(3+2,1+1),就是a的值是(5,2),这就是分区内相同key的操作。分区间操作斗配就是空悔指比如一个分区a(5,2),另一个分区是a(4,3),那分区间操作就是(5+4,2+3)。关于combineByKey 还有一个细节就是参数 上,它参数是有附带类型的,这边假如我acc那个形参如果没有加上类型,是会报错的。那为什么aggregateByKey和foldByKey没有标注类型却不会报错呢?

先看下aggregateByKey:

而combineByKey是没有这个的,所以需要注明上这个类型。(这边可能表述不是很准确)

以上就是关于spark算子的一些介绍,欢迎大家纠正补充,一起学习~

Spark算子flatMap一对多生成数据,map一对一生成数据

Spark的map算子只能一对一生成数据,要想一对多生成,用flatMap。

spark三类算子小总结

其实很早之前就想对spark做一下自己的阐述,一直也无奈于不能系统的进行以下自己的解释,现在还是想粗略的说一下我自己对spark的一些认识。

spark相对于mapreduce来说,计算模型可以提供更强大的功吵运能,他使用的是迭代模型,我们在处理完一个纳碰山阶段以后,可以继续往下洞中处理很多个阶段,而不只是像mapreduce一样只有两个阶段。

spark大致分为这三种算子:

 1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。

          在这里,我会将对map、flatMap、glom、union、cartesian(笛卡尔操作)、groupBy、filter、distinct(去重)、subtract这9种算子进行描述。

 2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。

            而对于Key-Value的算子,就简单的解释一下mapValues、combineByKey、reduceByKey、partitionBy、cogroup、join、leftOutJoin、rightOutJoin这几类进行我的解释。

    3、Action算子,这类算子会触发SparkContext提交作业。

            针对action算子,foreach、collect、collectAsMap、reduceByKeyLocally、lookup、count、top、

reduce、fold、aggregate。大致就是这几项了。

一、Value数据类型的Transformation算子

1)map

        val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

        //rdd有5个元素,将他们分成3个partition

        val b = a.map(_.length)//导入数据使用parallelize方式

        val c = a.zip(b)

        c .collect

        res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

        Map是把操作映射到每个values里面去。

       上述示意图为:

2)flatMap

        val a = sc.parallelize(1 to 10, 5)  

        //rdd有10个元素,将1到10分成5个partition

        a.flatMap(1 to _).collect

        res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5,6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3,           4,5, 6, 7, 8, 9, 10)

        //每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出

        sc.parallelize(List(1, 2, 3), 2).flatMap(x = List(x, x, x)).collect

        res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

        flatMap是把一个vlaue变成数组,再打断。

        该实例的操作示意图为:

3)subtract

          val a = sc.parallelize(1 to 9, 3)

          val b = sc.parallelize(1 to 3, 3)

          val c = a.subtract(b)

          c.collect

          res3: Array[Int] = Array(6, 9, 4,7, 5, 8)

          针对这个实例画出示意图:

4)glom

            val a = sc.parallelize(1 to 100, 3)

            a.glom.collect

            res8: Array[Array[Int]] = Array(Array(1, 2, 3,..., 33), Array(34,35,..., 65, 66), Array(67, ..., 100))

        针对这个实例画出示意图:

5)union

         val a = sc.parallelize(1 to 3, 1)

         val b = sc.parallelize(5 to 7, 1)

         (a ++ b).collect

         res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

        针对这个实例画出示意图:

6)cartesian(笛卡尔操作)

          val x =sc.parallelize(List(1,2,3,4,5))

          val y =sc.parallelize(List(6,7,8,9,10))

          x.cartesian(y).collect

          res0: Array[(Int, Int)] =Array((1,6), (1,7), (1,8), (1,9),(1,10), (2,6), (2,7), (2,8), (2,9),(2,10), (3,6), (3,7), (3,8), (3,9),(3,10), (4,6), (5,6), (4,7), (5,7),

(4,8), (5,8), (4,9), (4,10), (5,9),(5,10))

        针对这个实例画出示意图:

7)groupBy(生成相应的key,相同的放在一起)

         val a = sc.parallelize(1 to 9, 3)

         a.groupBy(x = { if (x % 2 == 0) "even" else "odd" }).collect

         res42: Array[(String, Seq[Int])] =Array((even,ArrayBuffer(2, 4, 6,8)), (odd,ArrayBuffer(1, 3, 5, 7,9)))

        针对这个实例画出示意图:

8)filter

         val a = sc.parallelize(1 to 10, 3)

         val b = a.filter(_ % 2 == 0)

         b.collect

         res3: Array[Int] = Array(2, 4, 6, 8, 10)

        针对这个实例画出示意图:

9)distinct(去重)

        val c =sc.parallelize(List("Gnu", "Cat","Rat", "Dog", "Gnu", "Rat"), 2)

        rdd有6个元素,将这六个元素分成2个partition

        c.distinct.collect//将重复出现的元素使用distinct函数去除再形成数组

        res6: Array[String] = Array(Dog,Gnu, Cat, Rat)

        针对这个实例画出示意图:

二、Key-Value数据类型的Transformation算子

 1)mapValues

        val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

       //RDD有6个元素,分别为dog,lion,cat...,将他们分成2个partition

       val b = a.map(x = (x.length, x))

       b.mapValues("x" + _ + "x").collect

       res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

针对这个实例画出示意图:

 2)combineByKey

          val a =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

          val c = b.zip(a)

          val d = c.combineByKey(List(_), (x:List[String], y:String) = y :: x,(x:List[String], y:List[String]) = x ::: y)

          d.collect

          res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu,rabbit, salmon, bee, bear, wolf)))

          该实例的操作示意图为:

 3)reduceByKey

          val a = sc.parallelize(List("dog", "tiger", "dog", "cat", "dog", "eagle", "cat"), 2)

          //rdd有7个元素,将他们分成2个partition

          val b = a.map(x = (x.length, x))

          b.reduceByKey(_ + _).collect   //使用reduceByKey(_+_)的方式

          res87: Array[(Int, String)] = Array((3,dog), (1,tiger), (2,cat),(1,eagle))

          该实例的操作示意图为:

 4)partitionBy

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并

 5)cogroup

         对两个RDD中的KV元素,每个RDD相同key中元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同key的元素进行合并

          val a = sc.parallelize(List(1, 2, 1, 3), 1)

          val b = a.map((_, "b"))

          val c = a.map((_, "c"))

          b.cogroup(c).collect

          res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(ArrayBuffer(b),ArrayBuffer(c))),(3,(ArrayBuffer(b),ArrayBuffer(c))),(1,(ArrayBuffer(b, b),ArrayBuffer(c, c))))

          该实例的操作示意图为:

  6)join

          val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val d = c.keyBy(_.length)

          b.join(d).collect

          res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

          该实例的操作示意图为:

  7)leftOutJoin

        将LEFT左边的表名1中的所有记录全部保留,而将右边的表名2中的字段B与表名1.字段A相对应的记录显示出来

          val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val d = c.keyBy(_.length)

          b.leftOuterJoin(d).collect

          res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

          该实例的操作示意图为:

  8)rightOutJoin(右外连接)

          val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

          val d = c.keyBy(_.length)

          b.rightOuterJoin(d).collect

          res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

          该实例的操作示意图为:

三、Action算子

  1)foreach

           val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)//导入数据使用parallelize方式

           c.foreach(x = println(x + "s are yummy"))//得到一条数据就处理一条数据

           该实例的操作示意图为:

 2)fold

            val a = sc.parallelize(List(1,2,3), 3)

            a.fold(0)(_ + _)

            res59: Int = 6

           针对这个实例画出示意图:

 3)aggregate

            val z = sc.parallelize(List(1,2,3,4,5,6), 2)

            // lets first print out the contents of the RDD with partition labels

            def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

                iter.toList.map(x = "[partID:" + index + ", val: " + x + "]").iterator

           }

            z.mapPartitionsWithIndex(myfunc).collect

           res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val:3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

           z.aggregate(0)(math.max(_, _), _ + _)

           res40: Int = 9

           针对这个实例画出示意图:

  4)collect

          val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

          c.collect  //通过collect算子将两个partition结合成一个

          res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

          针对这个实例画出示意图:

  5)collectAsMap

            RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value

            val a = sc.parallelize(List(1, 2, 1, 3), 1)

            val b = a.zip(a)

            b.collectAsMap

            res1: scala.collection.Map[Int,Int] = Map(2 - 2, 1 - 1, 3 - 3)

           针对这个实例画出示意图:

  6)reduceByKeyLocally

            该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。

            val a =sc.parallelize(List("dog","cat", "owl", "gnu", "ant"), 2)

            val b = a.map(x = (x.length,x))

            b.reduceByKey(_ + _).collect

            res86: Array[(Int, String)] =Array((3,dogcatowlgnuant))

           针对这个实例画出示意图:

  7)lookup

           val a =sc.parallelize(List("dog","tiger", "lion", "cat", "panther", "eagle"), 2)

           val b = a.map(x = (x.length, x))

           b.lookup(5)

           res0: Seq[String] = WrappedArray(tiger, eagle)

           针对这个实例画出示意图:

  8)count

            val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

            c.count

            res2: Long = 4

           针对这个实例画出示意图:

  9)top 

            val c = sc.parallelize(Array(6, 9, 4,7, 5, 8), 2)

            c.top(2)

            res28: Array[Int] = Array(9, 8)

           针对这个实例画出示意图:

  10)reduce

            val a = sc.parallelize(1 to 100, 3)

            a.reduce(_ + _)

            res41: Int = 5050

           针对这个实例画出示意图:

这篇文章也是写了一个礼拜才出来的成果,本身不是很熟悉,需要一个算子一个算子去理解,有些地方也没有很准确,但也是我理解能力范围之内了,网站上不太能查到有关于这方面的知识,所以绝大部分靠自学,理解不对的地方也请多多包涵啦。对于mapreduce和spark的理解也仅限于参考网上的知识加上自己的一些见解。

-------------------------------------------参考:-------------------------------------------

------------------------------------------------------------参考:-----------------------------------------------------------

Spark之RDD算子-转换算子

转换(Transformation)算子 就是对RDD进行操作的接口函数,其作用是将一个或多个RDD变换成新的RDD。

使用Spark进行数据计算,在利用创建算子生成RDD后,数据处理的春前算法设计和程序编写的最关键部分,就是利用变换算子对原始数据产生的RDD进行一步一步的变换,最终得到期望的计算结果。

对于变换算子可理解为分两类:1,对Value型RDD进行变换的算子;2,对Key/Value型RDD进行变换算子。在每个变换中有仅对一个RDD进行变换的,也有是对两个RDD进行变换的。

将当前RDD进行重新分区,生成一个以numPartitions参数指定的分区数存储的新RDD。参数shuffle为true时在变换过程中进行shuffle操作,否则不进行shuffle。

在Linux系统中,有许多对数据进行处理的shell命令,我们可能通过pipe变换将一些shell命令用于Spark中生成新的RDD。

对原RDD中的元素按照函数f指定的规则进行排扒敬清序,并可通过ascending参数进行升序或降序设置,排序后的结果生成新的RDD,新的RDD的分区数量可以由参数numPartitions指定,默认与原RDD相同的分区数。

输入参数为另一个RDD,返回两个RDD中所有元素的笛卡尔积。

输入参数为另一个RDD,返回原始RDD与输入参数RDD的补集,即生成由原始RDD中而不在输入参数RDD中的元素构成新的RDD,参数numPartitions指定新RDD分区数。

返回原始RDD与另一个RDD的并集。

生成由原始RDD的值为Key,另一个RDD的值为Value依次配对稿销构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD

将Key/Value型RDD中的元素的Key提取出来,所有Key值构成一个序列形成新的RDD。

将Key/Value型RDD中的元素的Value值使用输入参数函数f进行变换构成一个新的RDD。

Spark算子:RDD基本转换操作(7)–zipWithIndex、zipWithUniqueId

关键字:Spark算子、Spark RDD基本转换、zipWithIndex、zipWithUniqueId

zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

该函数将RDD中元素和一卖竖个唯一ID组合成键/值对,该唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分虚春区数)

看下面的例子:

//总分区数为2

/差配耐/第一个分区第一个元素ID为0,第二个分区第一个元素ID为1

//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4

//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5

关于spark算子和spark算子类型的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表