加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

Kafka和 websocket应用

(2017-03-15 09:28:50)
标签:

kafka

websocket

nodejs

开发环境

开发工具语言:Nodejs

开发依赖包:kafka-node-master socket.io

包安装方式:npm install 包名

前段引用本地脚本:《script src="/socket.io/socket.io.js"》《/script》

 

流程说明

前端每打开一个客户端就会和后台建立一个websocket的长连接,每个连接中都会建立kafka的消息监听。当kafka监听到消息时,会通过每个websocket连接发送的相应的客户端,从而实现数据流的实时显示。

 

源码说明

1)         Kafka消费端连接并返回连接对象(文件 consumerkafka.js

 

exports.initkafka = function initkafka() {

         var kafka = require('kafka-node-master');

//      var Consumer = kafka.HighLevelConsumer;

         var Consumer = kafka.Consumer;

         var Offset = kafka.Offset;

         var Client = kafka.Client;

         var topic = 'radarobs';

 

         var client = new Client('10.10.243.192:2181');

         var topics = [{

                   topic: topic,

                   partition: 0,

                   //offset: 1800000

         } ]

 

         var options = {

        groupId: 'radar-obs',

                   autoCommit: true,

                   //fetchMaxWaitMs: 1000,

                   autoCommitIntervalMs: 2000,

                   //fetchMaxBytes: 1024 * 1024,

                   fromOffset: false

         };

 

         var consumer = new Consumer(client, topics, options);

         var offset = new Offset(client);

 

         consumer.on('error', function(err) {

                   console.log('error', err);

         });

        

         consumer.on('offsetOutOfRange', function(topic) {

                   topic.maxNum = 2;

                   offset.fetch([topic], function(err, offsets) {

                            var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);

                            consumer.setOffset(topic.topic, topic.partition, min);

                   });

         });

         return consumer;

}

 

2)         Websocket后台连接和kafka监听 (文件websocket.js

 

         var consumerkafka = require('./consumerkafka');

         var consumer = consumerkafka.initkafka(); //init kafka consumer

 

         var io = require('socket.io').listen(server);

        

         io.sockets.on('connection', function(socket) {

                   consumer.on('message', function(message) {

                            socket.emit('radarobs', {

                                     msg: message

                            });

                   });

         });

 

 

3)         客户端websocket连接和接收

 

define([], function() {

         function init(callback) {

                   var socket = io.connect('localhost');

                   socket.on('radarobs', function(msg) {

                            var msg = msg.msg.value;

                            callback(msg);

                            console.log(msg)

                   });

         }

 

         return {

                   init: init

         };

 

})

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有