加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

reduceByKey,combineByKey,foldByKey,join,union

(2016-02-01 09:34:50)
分类: scala学习
xie suanfa shixian
xian chaijiewenti
zaixiang meiyibufen de shixian silu
http://www.tuicool.com/articles/miueaqv
http://www.iteblog.com/archives/1291

combineByKey()
   *
   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)

* - `mergeCombiners`, to combine two C's into a single one.
      In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
def combineByKey[C](
createCombiner
: V => C,
在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey第一次遇到值为k的Key(类型K),那么将对这个(k,v)调用combineCombiner函数,它的作用是将v转换为c(类型是C,聚合对象的类型,c作为局和对象的初始值)
mergeValue: (C, V) => C,
在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey不是第一次(或者第二次,第三次…)遇到值为k的Key(类型K),那么将对这个(k,v)调用mergeValue函数,它的作用是将v累加到聚合对象(类型C)中
mergeCombiners: (C, C) => C,
因为combineByKey是在分布式环境下执行,RDD的每个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合,它的函数类型是(C,C)=>C,每个参数是分区聚合得到的聚合对象。
partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { //实现略 }

10 scala> val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),
11   (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
12 data: org.apache.spark.rdd.RDD[(Int, String)] =
13   ParallelCollectionRDD[15] at parallelize at :12
14  
15 scala> val result = data.combineByKey(List(_),
16   (x: List [String], y: String) => y :: x, (x: List[String], y: List[String]) => x ::: y)
17 result: org.apache.spark.rdd.RDD[(Int, List[String])] =
18   ShuffledRDD[19] at combineByKey at :14
19  
20 scala> result.collect
21 res20: Array[(Int, List[String])] = Array((1,List(www, iteblog, com)),
22    (2,List(bbs, iteblog, com)), (3,List(good)))
23  
24 scala> val data = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3)))
25 data: org.apache.spark.rdd.RDD[(String, Int)] =
26   ParallelCollectionRDD[24] at parallelize at :12
27  
28 scala> val result = data.combineByKey(x => x,
29   (x: Int, y:Int) => x + y, (x:Int, y: Int) => x + y)
30 result: org.apache.spark.rdd.RDD[(String, Int)] =
31   ShuffledRDD[25] at combineByKey at :14
32  
33 scala> result.collect
34 res27: Array[(String, Int)] = Array((iteblog,4), (bbs,1))


0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有