import org.apache.spark.streaming.Durations
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import com.topvid.config.GlobalConfig
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.topvid.util.TimeUtil
import scala.collection.mutable
import scala.util.parsing.json.JSON
object NetworkWordCount {
  def main(args: Array[String]): Unit =
{
   
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    val sparkConf = new
SparkConf().setMaster("local[2]").setAppName("demo")
    val ssc = new
StreamingContext(sparkConf, Durations.seconds(20))
   
ssc.checkpoint(GlobalConfig.HADOOP_BASE +
"/spark/checkpoint")
    val kafkaParams =
Map[String, String]("metadata.broker.list" -> "127.0.0.1:9092")
// 然后创建一个set,里面放入你要读取的Topic,这个就是我们所说的,它给你做的很好,可以并行读取多个topic
    var topics =
Set[String]("test")
    // 一般计数器的功能
    val addFunc =
(currValues: Seq[Int], preValueState: Option[Int]) => {
      val
currentCount = currValues.sum
      val
previousCount = preValueState.getOrElse(0)
     
Some(currentCount + previousCount)
    }
    val addFunc2 =
(currValues: Seq[String], prevValueState: Option[String]) =>
{
      val
currentCount = currValues.mkString // 负责将之前的数据合并
CompactBuffer
      val
previousCount = prevValueState.getOrElse("")
     
if(prevValueState.isEmpty) Some(currentCount) else
Some(currentCount + "_" + previousCount)
    }
    val addFunc3 =
(currValues: Seq[String], prevValueState: Option[(String, String,
String)]) => {
      val
nowDate = TimeUtil.getNowDate()
      val
current = currValues.mkString // 负责将之前的数据合并 CompactBuffer
      val
videoNum = current.split("_").length
      val
impDays = Array.fill(videoNum)(0).map(x => x.toString).reduce(_
+ "_" + _)
      val
previous = prevValueState.getOrElse(("", "", ""))
     
if(prevValueState.isEmpty){
     
  Some(current, impDays, nowDate)
      }
else{
     
  val newPrevious = updatePrevImpData(previous,
nowDate)
     
  if(newPrevious._1.isEmpty){
     
    Some(current, impDays,
nowDate)  // 测试时可替换成 "20180501"
     
  }else if(current.isEmpty){
     
    Some(newPrevious._1,
newPrevious._2, nowDate)
     
  }else{
     
    Some(current + "_" +
newPrevious._1, impDays + "_" + newPrevious._2, nowDate)
     
  }
     
}
    }
    val messages =
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
      ssc,
kafkaParams, topics)
    val lines =
messages.map(_._2)
    val cookie2Video =
lines.map(x => parseKafkaInput(x))
     
.map(tup => (tup._1, tup._4))
     
.reduceByKey(_ + "_" + _)
    val totalCookie2Videos =
cookie2Video.updateStateByKey(addFunc3)
   
totalCookie2Videos.print()
    ssc.start()
   
ssc.awaitTermination()
  }
  def parseKafkaInput(inputContent: String):
Tuple4[String, String, String, String] = {
    val json: Option[Any] =
JSON.parseFull(inputContent)
    val map: Map[String,
Any] = json.get.asInstanceOf[Map[String, Any]]
    val cookieId =
map.get("cookieid").get.asInstanceOf[String]
    val column =
map.get("column").get.asInstanceOf[String]
    val eventkey =
map.get("eventkey").get.asInstanceOf[String]
    val videoid =
map.get("video").get.asInstanceOf[String]
    (cookieId, column,
eventkey, videoid)
  }
  def updatePrevImpData(prevImpData:
Tuple3[String, String, String], nowDate: String): Tuple3[String,
String, String] = {
    val MAX_DAY_DIFF =
3
    val prevItemIds =
prevImpData._1
    val prevDayDiffs =
prevImpData._2
    val prevDate =
prevImpData._3
    val dayDiff =
TimeUtil.getDaysDiff(prevDate, nowDate)
    val prevItemArr =
prevItemIds.split("_")
    val prevDayDiffArr =
prevDayDiffs.split("_")
    val prevItemNum =
prevItemArr.length
    var newPrevItemStr =
""
    var newPrevDayDiffStr =
""
    val itemSet =
mutable.Set.empty[String]
    for(i <- 0 until
prevItemNum){
      val
dayAdd = prevDayDiffArr(i).toInt + dayDiff
     
if(dayAdd <= MAX_DAY_DIFF &&
!itemSet.contains(prevItemArr(i))){  //
前面的是最新的item
     
  newPrevItemStr += prevItemArr(i) + "_"
     
  newPrevDayDiffStr += dayAdd.toString + "_"
     
  itemSet += prevItemArr(i)
     
}
    }
   
(newPrevItemStr.dropRight(1), newPrevDayDiffStr.dropRight(1),
nowDate)
  }
}