import org.apache.spark.streaming.Durations
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import com.topvid.config.GlobalConfig
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.topvid.util.TimeUtil
import scala.collection.mutable
import scala.util.parsing.json.JSON
object NetworkWordCount {
def main(args: Array[String]): Unit =
{
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkConf = new
SparkConf().setMaster("local[2]").setAppName("demo")
val ssc = new
StreamingContext(sparkConf, Durations.seconds(20))
ssc.checkpoint(GlobalConfig.HADOOP_BASE +
"/spark/checkpoint")
val kafkaParams =
Map[String, String]("metadata.broker.list" -> "127.0.0.1:9092")
// 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic
var topics =
Set[String]("test")
// 一般计数器的功能
val addFunc =
(currValues: Seq[Int], preValueState: Option[Int]) => {
val
currentCount = currValues.sum
val
previousCount = preValueState.getOrElse(0)
Some(currentCount + previousCount)
}
val addFunc2 =
(currValues: Seq[String], prevValueState: Option[String]) =>
{
val
currentCount = currValues.mkString // 负责将之前的数据合并
CompactBuffer
val
previousCount = prevValueState.getOrElse("")
if(prevValueState.isEmpty) Some(currentCount) else
Some(currentCount + "_" + previousCount)
}
val addFunc3 =
(currValues: Seq[String], prevValueState: Option[(String, String,
String)]) => {
val
nowDate = TimeUtil.getNowDate()
val
current = currValues.mkString // 负责将之前的数据合并 CompactBuffer
val
videoNum = current.split("_").length
val
impDays = Array.fill(videoNum)(0).map(x => x.toString).reduce(_
+ "_" + _)
val
previous = prevValueState.getOrElse(("", "", ""))
if(prevValueState.isEmpty){
Some(current, impDays, nowDate)
}
else{
val newPrevious = updatePrevImpData(previous,
nowDate)
if(newPrevious._1.isEmpty){
Some(current, impDays,
nowDate) // 测试时可替换成 "20180501"
}else if(current.isEmpty){
Some(newPrevious._1,
newPrevious._2, nowDate)
}else{
Some(current + "_" +
newPrevious._1, impDays + "_" + newPrevious._2, nowDate)
}
}
}
val messages =
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
ssc,
kafkaParams, topics)
val lines =
messages.map(_._2)
val cookie2Video =
lines.map(x => parseKafkaInput(x))
.map(tup => (tup._1, tup._4))
.reduceByKey(_ + "_" + _)
val totalCookie2Videos =
cookie2Video.updateStateByKey(addFunc3)
totalCookie2Videos.print()
ssc.start()
ssc.awaitTermination()
}
def parseKafkaInput(inputContent: String):
Tuple4[String, String, String, String] = {
val json: Option[Any] =
JSON.parseFull(inputContent)
val map: Map[String,
Any] = json.get.asInstanceOf[Map[String, Any]]
val cookieId =
map.get("cookieid").get.asInstanceOf[String]
val column =
map.get("column").get.asInstanceOf[String]
val eventkey =
map.get("eventkey").get.asInstanceOf[String]
val videoid =
map.get("video").get.asInstanceOf[String]
(cookieId, column,
eventkey, videoid)
}
def updatePrevImpData(prevImpData:
Tuple3[String, String, String], nowDate: String): Tuple3[String,
String, String] = {
val MAX_DAY_DIFF =
3
val prevItemIds =
prevImpData._1
val prevDayDiffs =
prevImpData._2
val prevDate =
prevImpData._3
val dayDiff =
TimeUtil.getDaysDiff(prevDate, nowDate)
val prevItemArr =
prevItemIds.split("_")
val prevDayDiffArr =
prevDayDiffs.split("_")
val prevItemNum =
prevItemArr.length
var newPrevItemStr =
""
var newPrevDayDiffStr =
""
val itemSet =
mutable.Set.empty[String]
for(i <- 0 until
prevItemNum){
val
dayAdd = prevDayDiffArr(i).toInt + dayDiff
if(dayAdd <= MAX_DAY_DIFF &&
!itemSet.contains(prevItemArr(i))){ //
前面的是最新的item
newPrevItemStr += prevItemArr(i) + "_"
newPrevDayDiffStr += dayAdd.toString + "_"
itemSet += prevItemArr(i)
}
}
(newPrevItemStr.dropRight(1), newPrevDayDiffStr.dropRight(1),
nowDate)
}
}