reduceByKey,combineByKey,foldByKey,join,union
(2016-02-01 09:34:50)分类: scala学习 |
xie suanfa shixian
xian chaijiewenti
zaixiang meiyibufen de shixian silu
http://www.tuicool.com/articles/miueaqv
http://www.iteblog.com/archives/1291
combineByKey()
xian chaijiewenti
zaixiang meiyibufen de shixian silu
http://www.tuicool.com/articles/miueaqv
http://www.iteblog.com/archives/1291
combineByKey()
* * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey第一次遇到值为k的Key(类型K),那么将对这个(k,v)调用combineCombiner函数,它的作用是将v转换为c(类型是C,聚合对象的类型,c作为局和对象的初始值) In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey不是第一次(或者第二次,第三次…)遇到值为k的Key(类型K),那么将对这个(k,v)调用mergeValue函数,它的作用是将v累加到聚合对象(类型C)中
mergeCombiners: (C, C) => C,因为combineByKey是在分布式环境下执行,RDD的每个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合,它的函数类型是(C,C)=>C,每个参数是分区聚合得到的聚合对象。
partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { //实现略 }
10 |
scala> val data
=
sc.parallelize(List(( 1 ,
"www" ), ( 1 ,
"iteblog" ), ( 1 ,
"com" ), |
11 |
( 2 ,
"bbs" ), ( 2 ,
"iteblog" ), ( 2 ,
"com" ), ( 3 ,
"good" ))) |
12 |
data :
org.apache.spark.rdd.RDD[(Int, String)]
= |
13 |
ParallelCollectionRDD[ 15 ] at
parallelize at : 12 |
14 |
|
15 |
scala> val result
=
data.combineByKey(List( _ ), |
16 |
(x : List [String],
y : String)
= > y :: x,
(x : List[String], y :
List[String]) = > x
::: y) |
17 |
result :
org.apache.spark.rdd.RDD[(Int, List[String])]
= |
18 |
ShuffledRDD[ 19 ] at
combineByKey at : 14 |
19 |
|
20 |
scala> result.collect |
21 |
res 20 : Array[(Int,
List[String])] =
Array(( 1 ,List(www, iteblog,
com)), |
22 |
( 2 ,List(bbs, iteblog, com)),
( 3 ,List(good))) |
23 |
|
24 |
scala> val data
=
sc.parallelize(List(( "iteblog" ,
1 ), ( "bbs" ,
1 ), ( "iteblog" ,
3 ))) |
25 |
data :
org.apache.spark.rdd.RDD[(String, Int)]
= |
26 |
ParallelCollectionRDD[ 24 ] at
parallelize at : 12 |
27 |
|
28 |
scala> val result
= data.combineByKey(x
= > x, |
29 |
(x : Int,
y : Int) = > x
+ y, (x : Int, y :
Int) = > x + y) |
30 |
result :
org.apache.spark.rdd.RDD[(String, Int)]
= |
31 |
ShuffledRDD[ 25 ] at
combineByKey at : 14 |
32 |
|
33 |
scala> result.collect |
34 |
res 27 :
Array[(String, Int)] =
Array((iteblog, 4 ),
(bbs, 1 )) |