加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

spark中combineByKey的使用

(2017-12-18 20:29:52)
标签:

spark

combinebykey

createcombiner

mergevalue

分类: 大数据处理
参考链接:
http://blog.csdn.net/jiangpeng59/article/details/52538254
https://www.cnblogs.com/yongjian/p/6425772.html

一、函数定义
def combineByKey[C](  
      createCombiner: V => C,  
      mergeValue: (C, V) => C,  
      mergeCombiners: (C, C) => C,  
      partitioner: Partitioner,  
      mapSideCombine: Boolean = true,  
      serializer: Serializer = null)
里面的每个参数分别对应聚合操作的各个阶段

二、参数说明
1、createCombiner:V => C ,分区内创建组合函数。这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)。
2、mergeValue: (C, V) => C,分区内合并值函数。该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)。
3、mergeCombiners: (C, C) => C,多分区合并组合器函数。该函数把2个元素C合并 (这个操作在不同分区间进行)。
4、partitioner:自定义分区数,默认为HashPartitioner
5、mapSideCombine:是否在map端进行Combine操作,默认为true

三、工作流程
1、combineByKey会遍历分区中的所有元素,因此每个元素的key要么没遇到过,要么和之前某个元素的key相同。
2、如果这是一个新的元素,函数会调用createCombiner创建那个key对应的累加器初始值。
3、如果这是一个在处理当前分区之前已经遇到的key,会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并。

四、举例说明
val initialScores = Array((("1", "011"), 1), (("1", "012"), 1), (("2", "011"), 1), (("2", "013"),1), (("2", "014"),1))
    val d1 = sc.parallelize(initialScores)

    d1.map(x=>(x._1._1,(x._1._2,1))).combineByKey(
      (v) => (v),
      (acc: (String,Int), v) => (v._1+":"+acc._1,v._2+acc._2),
      (acc1: (String,Int), acc2: (String,Int)) => (acc1._1 +":"+ acc2._1, acc1._2 +acc2._2)
    ).collect().foreach(println)
1、map操作将格式转化为("1", ("011", 1))
2、(acc: (String,Int), v) => (v._1+":"+acc._1,v._2+acc._2):这个时分区内合并值函数,所以("011", 1)等价于(String, Int),这个表达式的意思是将同一个key对应的值的第一个字符串用":"串起来,然后第二个值相加。
3、(acc1: (String,Int), acc2: (String,Int)) => (acc1._1 +":"+ acc2._1, acc1._2 +acc2._2):分区间的合并操作,将第2步的操作扩展到分区间。
4、最后输出值为:
(1,(012:011,2))
(2,(014:013:011,3))

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有