加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

spark示例之计算类目相似度

(2017-12-26 21:18:59)
标签:

spark

map

groupbykey

broadcast

sql

分类: 大数据处理
一、背景
     已知(userid, goodsid),(goodsid, cateid)组合,需要计算cate类目之间的相似度

getCateSim分别使用map和mapPartitions方式,大数据计算中前者完胜后者,map5分钟的计算mapPartitions需要10分钟
二、代码



package test

import org.apache.spark.sql.types._

import scala.collection.mutable
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}


object Demo {
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("demo")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()


val cookie2Goods = Array((1001, "G11"), (1002, "G12"), (1003, "G13"), (1004, "G12"), (1001, "G21"), (1002, "G21"), (1003, "G22"), (1002, "G31"), (1003, "G31"), (1005, "G31"), (1001, "G41"), (1005, "G42"))
val goods2Cate = Array(("G11", 1), ("G12", 1), ("G13", 1), ("G21", 2), ("G22", 2), ("G31", 3), ("G41", 4), ("G42", 4))


val Cookie2GoodsSchema: StructType = StructType(mutable.ArraySeq(
StructField("CookieId", IntegerType, nullable = false),
StructField("GoodsId", StringType, nullable = false)
))
val Goods2CateSchema: StructType = StructType(mutable.ArraySeq(
StructField("GoodsId", StringType, nullable = false),
StructField("CateId", IntegerType, nullable = false)
))


val cookie2GoodsData = sc.parallelize(cookie2Goods.map(data => Row(data._1, data._2)))
val cookie2GoodsTB = spark.createDataFrame(cookie2GoodsData, Cookie2GoodsSchema)
cookie2GoodsTB.createOrReplaceTempView("Cookie2Goods")

val goods2CateData = sc.parallelize(goods2Cate.map(data => Row(data._1, data._2)))
val goods2CateTB = spark.createDataFrame(goods2CateData, Goods2CateSchema)
goods2CateTB.createOrReplaceTempView("Goods2Cate")


val cookie2Cate = spark.sql(
"select a.CookieId, b.CateId from Cookie2Goods a " +
"inner join Goods2Cate b on (a.GoodsId = b.GoodsId)"
)
val cookieCateList = cookie2Cate.rdd.map(line => (line(0).toString.toInt, line(1).toString.toInt)).collect().toArray


val result = getCateSim(sc, cookieCateList, 2, 2).collect()
println(result)
}

mapPartitions方式
def getCateSim(sc: SparkContext, cookieCateList : Array[(Int, Int)], topN: Int, partionerNum: Int) = {
val cate2CookiesPair = sc.parallelize(cookieCateList).map(elem => (elem._2, elem._1.toString)).groupByKey()

val broadprk = sc.broadcast(cate2CookiesPair.collect())

val cate2CookiesPairPart = cate2CookiesPair.partitionBy(new HashPartitioner(partionerNum)).persist()
val cateSim = cate2CookiesPairPart.map(r => (r._1, r._2)).repartition(partionerNum).mapPartitions({iter=>
for((key, value) <- iter)
yield{
val cateIdLeft = key
val cookieIdSetLeft = value.toSet

val sites = broadprk.value
var simPer = 0.0

// 获得每个cateId对应的所有cateId的关联度
val cateSimInfoList = for(site <- sites)
yield{
val cateIdRight = site._1

if(cateIdLeft != cateIdRight) {
val cookieIdSetRight = site._2.toSet
val sameSetNum = cookieIdSetLeft.intersect(cookieIdSetRight).size
val unionSetNum = cookieIdSetLeft.union(cookieIdSetRight).size
simPer = sameSetNum * 1.0 / unionSetNum
}
(cateIdRight, simPer)
}

// 获取每个cateId对应topN的关联cateId,通过列表返回
val cateTopNSimList = cateSimInfoList.filter(!_._1.equals(cateIdLeft)).sortWith(_._1 > _._2).take(topN).map(data => data._1 + "_" + data._2)
(cateIdLeft, cateTopNSimList)
}
})
cateSim
}

}

Map方式


  def getCateSim(sc: SparkContext, cookieCateList : Array[(Int, Int)], topN: Int) = {
    val cate2CookiesPair = sc.parallelize(cookieCateList).map(elem => (elem._2, elem._1.toString)).groupByKey()

    val broadprk = sc.broadcast(cate2CookiesPair.collect())

    val cateSim = cate2CookiesPair.map({iter =>
      val cateIdLeft = iter._1
      val cookieIdSetLeft = iter._2.toSet

      val sites = broadprk.value
      var simPer = 0.0

      // 获得每个cateId对应的所有cateId的关联度
      val cateSimInfoList = for(site <- sites)
        yield{
          val cateIdRight = site._1

          if(cateIdLeft != cateIdRight) {
            val cookieIdSetRight = site._2.toSet
            val sameSetNum = cookieIdSetLeft.intersect(cookieIdSetRight).size
            val unionSetNum = cookieIdSetLeft.union(cookieIdSetRight).size
            simPer = sameSetNum * 1.0 / unionSetNum
          }
          (cateIdRight, simPer)
        }

      // 获取每个cateId对应topN的关联cateId,通过列表返回
      val cateTopNSimList = cateSimInfoList.filter(!_._1.equals(cateIdLeft)).sortWith(_._2 > _._2).take(topN).map(data => data._1 + "_" + data._2)
      (cateIdLeft, cateTopNSimList)
    })
    cateSim
  }

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有