一、背景
已知(userid, goodsid),(goodsid, cateid)组合,需要计算cate类目之间的相似度
getCateSim分别使用map和mapPartitions方式,大数据计算中前者完胜后者,map5分钟的计算mapPartitions需要10分钟
二、代码
package test
import org.apache.spark.sql.types._
import scala.collection.mutable
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object Demo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("demo")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
val cookie2Goods = Array((1001, "G11"), (1002, "G12"), (1003, "G13"), (1004, "G12"), (1001, "G21"), (1002, "G21"), (1003, "G22"), (1002, "G31"), (1003, "G31"), (1005, "G31"), (1001, "G41"), (1005, "G42"))
val goods2Cate = Array(("G11", 1), ("G12", 1), ("G13", 1), ("G21", 2), ("G22", 2), ("G31", 3), ("G41", 4), ("G42", 4))
val Cookie2GoodsSchema: StructType = StructType(mutable.ArraySeq(
StructField("CookieId", IntegerType, nullable = false),
StructField("GoodsId", StringType, nullable = false)
))
val Goods2CateSchema: StructType = StructType(mutable.ArraySeq(
StructField("GoodsId", StringType, nullable = false),
StructField("CateId", IntegerType, nullable = false)
))
val cookie2GoodsData = sc.parallelize(cookie2Goods.map(data => Row(data._1, data._2)))
val cookie2GoodsTB = spark.createDataFrame(cookie2GoodsData, Cookie2GoodsSchema)
cookie2GoodsTB.createOrReplaceTempView("Cookie2Goods")
val goods2CateData = sc.parallelize(goods2Cate.map(data => Row(data._1, data._2)))
val goods2CateTB = spark.createDataFrame(goods2CateData, Goods2CateSchema)
goods2CateTB.createOrReplaceTempView("Goods2Cate")
val cookie2Cate = spark.sql(
"select a.CookieId, b.CateId from Cookie2Goods a " +
"inner join Goods2Cate b on (a.GoodsId = b.GoodsId)"
)
val cookieCateList = cookie2Cate.rdd.map(line => (line(0).toString.toInt, line(1).toString.toInt)).collect().toArray
val result = getCateSim(sc, cookieCateList, 2, 2).collect()
println(result)
}
mapPartitions方式
def getCateSim(sc: SparkContext, cookieCateList : Array[(Int, Int)], topN: Int, partionerNum: Int) = {
val cate2CookiesPair = sc.parallelize(cookieCateList).map(elem => (elem._2, elem._1.toString)).groupByKey()
val broadprk = sc.broadcast(cate2CookiesPair.collect())
val cate2CookiesPairPart = cate2CookiesPair.partitionBy(new HashPartitioner(partionerNum)).persist()
val cateSim = cate2CookiesPairPart.map(r => (r._1, r._2)).repartition(partionerNum).mapPartitions({iter=>
for((key, value) <- iter)
yield{
val cateIdLeft = key
val cookieIdSetLeft = value.toSet
val sites = broadprk.value
var simPer = 0.0
// 获得每个cateId对应的所有cateId的关联度
val cateSimInfoList = for(site <- sites)
yield{
val cateIdRight = site._1
if(cateIdLeft != cateIdRight) {
val cookieIdSetRight = site._2.toSet
val sameSetNum = cookieIdSetLeft.intersect(cookieIdSetRight).size
val unionSetNum = cookieIdSetLeft.union(cookieIdSetRight).size
simPer = sameSetNum * 1.0 / unionSetNum
}
(cateIdRight, simPer)
}
// 获取每个cateId对应topN的关联cateId,通过列表返回
val cateTopNSimList = cateSimInfoList.filter(!_._1.equals(cateIdLeft)).sortWith(_._1 > _._2).take(topN).map(data => data._1 + "_" + data._2)
(cateIdLeft, cateTopNSimList)
}
})
cateSim
}
}
Map方式
def getCateSim(sc: SparkContext, cookieCateList : Array[(Int, Int)], topN: Int) = {
val cate2CookiesPair = sc.parallelize(cookieCateList).map(elem => (elem._2, elem._1.toString)).groupByKey()
val broadprk = sc.broadcast(cate2CookiesPair.collect())
val cateSim = cate2CookiesPair.map({iter =>
val cateIdLeft = iter._1
val cookieIdSetLeft = iter._2.toSet
val sites = broadprk.value
var simPer = 0.0
// 获得每个cateId对应的所有cateId的关联度
val cateSimInfoList = for(site <- sites)
yield{
val cateIdRight = site._1
if(cateIdLeft != cateIdRight) {
val cookieIdSetRight = site._2.toSet
val sameSetNum = cookieIdSetLeft.intersect(cookieIdSetRight).size
val unionSetNum = cookieIdSetLeft.union(cookieIdSetRight).size
simPer = sameSetNum * 1.0 / unionSetNum
}
(cateIdRight, simPer)
}
// 获取每个cateId对应topN的关联cateId,通过列表返回
val cateTopNSimList = cateSimInfoList.filter(!_._1.equals(cateIdLeft)).sortWith(_._2 > _._2).take(topN).map(data => data._1 + "_" + data._2)
(cateIdLeft, cateTopNSimList)
})
cateSim
}
加载中,请稍候......