加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

尚硅谷大数据技术之电影推荐系统

(2019-09-18 09:41:58)
标签:

it

java培训

linux

大数据

尚硅谷

分类: 大数据学科

5.3.5 更新实时推荐结果

当计算出候选电影的推荐优先级的数组updatedRecommends后,这个数组将被发送到Web 后台服务器,与后台服务器上userId 的上次实时推荐结果recentRecommends进行合并、替换并选出优先级E K大的电影作为本次新的实时推荐。具体而言:

a.合并:将updatedRecommends recentRecommends 并集合成为一个新的数组;

b.替换(去重):当updatedRecommends recentRecommends 有重复的电影movieId 时,recentRecommends movieId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommendsmovieId 的推荐优先级;

c.选取TopK:在合并、替换后的数组上,根据每个movie 的推荐优先级,选择出前K 大的电影,作为本次实时推荐的最终结果。

5.4 实时系统联调

我们的系统实时推荐的数据流向是:业务系统 -> 日志 -> flume 日志采集 -> kafka streaming数据清洗和预处理 -> spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。

5.4.1 启动实时系统的基本组件

启动实时推荐系统StreamingRecommender以及mongodbredis

5.4.2 启动zookeeper

bin/zkServer.sh start

5.4.3 启动kafka

bin/kafka-server-start.sh -daemon ./config/server.properties

5.4.4 构建Kafka Streaming程序

recommender下新建moduleKafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖:


<</span>dependencies>
    <</span>dependency>
        <</span>groupId>org.apache.kafka</</span>groupId>
        <</span>artifactId>kafka-streams</</span>artifactId>
        <</span>version>0.10.2.1</</span>version>
    </</span>dependency>
    <</span>dependency>
        <</span>groupId>org.apache.kafka</</span>groupId>
        <</span>artifactId>kafka-clients</</span>artifactId>
        <</span>version>0.10.2.1</</span>version>
    </</span>dependency>
</</span>dependencies>

<</span>build>
    <</span>finalName>kafkastream</</span>finalName>
    <</span>plugins>
        <</span>plugin>
            <</span>groupId>org.apache.maven.plugins</</span>groupId>
            <</span>artifactId>maven-assembly-plugin</</span>artifactId>
            <</span>configuration>
                <</span>archive>
                    <</span>manifest>
                        <</span>mainClass>com.atguigu.kafkastream.Application</</span>mainClass>
                    </</span>manifest>
                </</span>archive>
                <</span>descriptorRefs>
                    <</span>descriptorRef>jar-with-dependencies</</span>descriptorRef>
                </</span>descriptorRefs>
            </</span>configuration>
            <</span>executions>
                <</span>execution>
                    <</span>id>make-assembly</</span>id>
                    <</span>phase>package</</span>phase>
                    <</span>goals>
                        <</span>goal>single</</span>goal>
                    </</span>goals>
                </</span>execution>
            </</span>executions>
        </</span>plugin>
    </</span>plugins>
</</span>build>

src/main/java下新建javacom.atguigu.kafkastreaming.Application


public class Application {
   
public static void main(String[] args){

        String brokers =
"localhost:9092";
        String zookeepers =
"localhost:2181";

       
// 定义输入和输出的topic
       
String from = "log";
        String to =
"recommender";

       
// 定义kafka streaming的配置
       
Properties settings = new Properties();
        settings.put(StreamsConfig.
APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.
BOOTSTRAP_SERVERS_CONFIG, brokers);
        settings.put(StreamsConfig.
ZOOKEEPER_CONNECT_CONFIG, zookeepers);

        StreamsConfig config =
new StreamsConfig(settings);

       
// 拓扑建构器
       
TopologyBuilder builder = new TopologyBuilder();

       
// 定义流处理的拓扑结构
       
builder.addSource("SOURCE", from)
                .addProcessor(
"PROCESS", () -> new LogProcessor(), "SOURCE")
                .addSink(
"SINK", to, "PROCESS");

        KafkaStreams streams =
new KafkaStreams(builder, config);
        streams.start();
    }
}

这个程序会将topic为“log”的信息流获取来做处理,并以“recommender”为新的topic转发出去。

流处理程序LogProcess.java

public class LogProcessor implements Processor<</span>byte[],byte[]> {
   
private ProcessorContext context;

   
public void init(ProcessorContext context) {
       
this.context = context;
    }

   
public void process(byte[] dummy, byte[] line) {
        String input =
new String(line);
       
// 根据前缀过滤日志信息,提取后面的内容
       
if(input.contains("MOVIE_RATING_PREFIX:")){
            System.
out.println("movie rating coming!!!!" + input);
            input = input.split(
"MOVIE_RATING_PREFIX:")[1].trim();
            
context.forward("logProcessor".getBytes(), input.getBytes());

        }
    }
   
public void punctuate(long timestamp) {
    }
   
public void close() {
    }
}

完成代码后,启动Application

5.4.5 配置并启动flume

flumeconf目录下新建log-kafka.properties,对flume连接kafka做配置:

agent.sources = exectail

agent.channels = memoryChannel

agent.sinks = kafkasink

 

# For each one of the sources, the type is defined

agent.sources.exectail.type = exec

# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录

agent.sources.exectail.command = tail –f

/mnt/d/Projects/BigData/MovieRecommenderSystem/businessServer/src/main/log/agent.log

agent.sources.exectail.interceptors=i1

agent.sources.exectail.interceptors.i1.type=regex_filter

# 定义日志过滤前缀的正则

agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+

# The channel can be defined as follows.

agent.sources.exectail.channels = memoryChannel

 

# Each sink's type must be defined

agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkasink.kafka.topic = log

agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092

agent.sinks.kafkasink.kafka.producer.acks = 1

agent.sinks.kafkasink.kafka.flumeBatchSize = 20

 

#Specify the channel the sink should use

agent.sinks.kafkasink.channel = memoryChannel

 

# Each channel's type is defined.

agent.channels.memoryChannel.type = memory

 

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 10000

配置好后,启动flume

./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console

5.4.6 启动业务系统后台





将业务代码加入系统中。注意在src/main/resources/ 下的log4j.properties中,log4j.appender.file.File的值应该替换为自己的日志目录,与flume中的配置应该相同。

启动业务系统后台,访问localhost:8088/index.html;点击某个电影进行评分,查看实时推荐列表是否会发生变化。









本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有