加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

spark增量更新updateStateByKey使用示例

(2018-05-04 09:56:02)
标签:

updatestatebykey

spark

增量

分类: 大数据处理
    在推荐系统开发中我们会遇到这样一个问题:流式的推荐系统过,有些视频已经曝光过了,怎么存放这些曝光过的数据以便推荐时过滤掉。这同时需要处理以下几个问题:
1、曝光数的存放位置,hbase、hdfs或是内存
2、曝光数据怎么可以及时更新
3、数据怎么可以定时清理,以便控制数据量
    问题一:流式计算中如果定时(10分钟一次),那么需要频繁的访问hbase,会造成hbase压力很大。hdfs可以保存数据,但是每次必须全量更新hdfs的数据,无法增量更新。所以对于流式系统来说,最好的方式是存放在内存中。
    问题二:曝光数据的更新,我们需要增量更新,可以采用spark自带的updateStateByKey功能。
    问题三:内存中保存的数据不能一直增加,如果有好几份这样的数据,那么内存将逐步减少直至内存为空。曝光数据我们可以只保存K(比如5)天的数据,其他的数据过滤掉即可,从而使得数据量根据DAU稳步增长。
    示例:
一、发送的数据
{"cookieid":"123456", "eventkey":"click", "column":"music", "video":"1235"}
{"cookieid":"123456", "eventkey":"like", "column":"music", "video":"1236"}
{"cookieid":"2345678", "eventkey":"like", "column":"music", "video":"12445"}
二、曝光的数据的存储形式
(123456,(1239_1240_1235_1236,0_0_0_2,20180503))
第一个数据是cookieid(userid),第2个是曝光的videoid,第4个是上次数据更新时间。第3个是相对时间的天数差异,这样方便过于超过阈值的数据。
三、代码实现
import org.apache.spark.streaming.Durations
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import com.topvid.config.GlobalConfig
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.topvid.util.TimeUtil

import scala.collection.mutable
import scala.util.parsing.json.JSON

object NetworkWordCount {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("demo")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(20))
    ssc.checkpoint(GlobalConfig.HADOOP_BASE + "/spark/checkpoint")
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "127.0.0.1:9092") // 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic
    var topics = Set[String]("test")

    // 一般计数器的功能
    val addFunc = (currValues: Seq[Int], preValueState: Option[Int]) => {
      val currentCount = currValues.sum
      val previousCount = preValueState.getOrElse(0)
      Some(currentCount + previousCount)
    }
    val addFunc2 = (currValues: Seq[String], prevValueState: Option[String]) => {
      val currentCount = currValues.mkString // 负责将之前的数据合并 CompactBuffer
      val previousCount = prevValueState.getOrElse("")
      if(prevValueState.isEmpty) Some(currentCount) else Some(currentCount + "_" + previousCount)
    }
    val addFunc3 = (currValues: Seq[String], prevValueState: Option[(String, String, String)]) => {
      val nowDate = TimeUtil.getNowDate()
      val current = currValues.mkString // 负责将之前的数据合并 CompactBuffer
      val videoNum = current.split("_").length
      val impDays = Array.fill(videoNum)(0).map(x => x.toString).reduce(_ + "_" + _)
      val previous = prevValueState.getOrElse(("", "", ""))
      if(prevValueState.isEmpty){
        Some(current, impDays, nowDate)
      } else{
        val newPrevious = updatePrevImpData(previous, nowDate)
        if(newPrevious._1.isEmpty){
          Some(current, impDays, nowDate)  // 测试时可替换成 "20180501"
        }else if(current.isEmpty){
          Some(newPrevious._1, newPrevious._2, nowDate)
        }else{
          Some(current + "_" + newPrevious._1, impDays + "_" + newPrevious._2, nowDate)
        }
      }
    }

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)
    val lines = messages.map(_._2)
    val cookie2Video = lines.map(x => parseKafkaInput(x))
      .map(tup => (tup._1, tup._4))
      .reduceByKey(_ + "_" + _)

    val totalCookie2Videos = cookie2Video.updateStateByKey(addFunc3)
    totalCookie2Videos.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def parseKafkaInput(inputContent: String): Tuple4[String, String, String, String] = {
    val json: Option[Any] = JSON.parseFull(inputContent)
    val map: Map[String, Any] = json.get.asInstanceOf[Map[String, Any]]
    val cookieId = map.get("cookieid").get.asInstanceOf[String]
    val column = map.get("column").get.asInstanceOf[String]
    val eventkey = map.get("eventkey").get.asInstanceOf[String]
    val videoid = map.get("video").get.asInstanceOf[String]
    (cookieId, column, eventkey, videoid)
  }

  def updatePrevImpData(prevImpData: Tuple3[String, String, String], nowDate: String): Tuple3[String, String, String] = {
    val MAX_DAY_DIFF = 3

    val prevItemIds = prevImpData._1
    val prevDayDiffs = prevImpData._2
    val prevDate = prevImpData._3

    val dayDiff = TimeUtil.getDaysDiff(prevDate, nowDate)
    val prevItemArr = prevItemIds.split("_")
    val prevDayDiffArr = prevDayDiffs.split("_")
    val prevItemNum = prevItemArr.length

    var newPrevItemStr = ""
    var newPrevDayDiffStr = ""
    val itemSet = mutable.Set.empty[String]
    for(i <- 0 until prevItemNum){
      val dayAdd = prevDayDiffArr(i).toInt + dayDiff
      if(dayAdd <= MAX_DAY_DIFF && !itemSet.contains(prevItemArr(i))){  // 前面的是最新的item
        newPrevItemStr += prevItemArr(i) + "_"
        newPrevDayDiffStr += dayAdd.toString + "_"
        itemSet += prevItemArr(i)
      }
    }
    (newPrevItemStr.dropRight(1), newPrevDayDiffStr.dropRight(1), nowDate)
  }

}

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有