开发环境
开发工具语言:Nodejs
开发依赖包:kafka-node-master
和socket.io
包安装方式:npm install
包名
前段引用本地脚本:《script src="/socket.io/socket.io.js"》《/script》
流程说明
前端每打开一个客户端就会和后台建立一个websocket的长连接,每个连接中都会建立kafka的消息监听。当kafka监听到消息时,会通过每个websocket连接发送的相应的客户端,从而实现数据流的实时显示。
源码说明
1)
Kafka消费端连接并返回连接对象(文件
consumerkafka.js)
exports.initkafka = function initkafka() {
var kafka = require('kafka-node-master');
//
var Consumer = kafka.HighLevelConsumer;
var Consumer = kafka.Consumer;
var Offset = kafka.Offset;
var Client = kafka.Client;
var topic = 'radarobs';
var client = new Client('10.10.243.192:2181');
var topics = [{
topic: topic,
partition: 0,
//offset: 1800000
} ]
var options = {
groupId: 'radar-obs',
autoCommit: true,
//fetchMaxWaitMs: 1000,
autoCommitIntervalMs: 2000,
//fetchMaxBytes: 1024 * 1024,
fromOffset: false
};
var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);
consumer.on('error', function(err) {
console.log('error', err);
});
consumer.on('offsetOutOfRange', function(topic) {
topic.maxNum = 2;
offset.fetch([topic], function(err, offsets) {
var min = Math.min.apply(null,
offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
});
return consumer;
}
2)
Websocket后台连接和kafka监听
(文件websocket.js)
var consumerkafka = require('./consumerkafka');
var consumer = consumerkafka.initkafka(); //init kafka
consumer
var io = require('socket.io').listen(server);
io.sockets.on('connection', function(socket) {
consumer.on('message', function(message) {
socket.emit('radarobs', {
msg: message
});
});
});
3)
客户端websocket连接和接收
define([], function() {
function init(callback) {
var socket = io.connect('localhost');
socket.on('radarobs', function(msg) {
var msg = msg.msg.value;
callback(msg);
console.log(msg)
});
}
return {
init: init
};
})
加载中,请稍候......