加载中…
个人资料
铁血柔情
铁血柔情
  • 博客等级:
  • 博客积分:0
  • 博客访问:92,935
  • 关注人气:31
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
相关博文
推荐博文
谁看过这篇博文
加载中…
正文 字体大小:

基于storm的实时GPS数据客流特征分析系统 源码分析之(一):GPSReceiverSpout

(2013-02-17 17:42:31)
标签:

storm

云计算

实时客流矩阵

gps

it

分类: storm/haddop/云计算
先介绍背景再贴源码:
项目背景:实时GPS数据客流特征分析系统,数据来源是大约5万两出租车和公交车的车载GPS仪,其目的是要研究出行者的出行特征、实时路况、客流特征等。


开发环境:请参考:storm的开发环境部署配置教程 

函数详解:

最重要的两个函数是 nextTuple() 和declareOutputFields(OutputFieldsDeclarer declarer)。

nextTuple()告诉storm下一个tuple是什么内容,其处理过程是先用一个socket函数接受来自网络的实时GPS数据,用lineSplit()将GPS以逗号分隔 成字符串数组,发送给下一个处理单元 DistrictMatchingBolt。
例如一条原始GPS记录:粤BXXXXX,114.121765,22.569218,2013-02-08 17:29:58,1065382,28,101,0,蓝色
 _collector.emit(new Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],GPSRecord[6] , GPSRecord[2],GPSRecord[1])) 这一条语句则提取了GPS记录中的第0、3、7、5、6、2、1列字符串发送给下一个处理单元。


declareOutputFields()告诉下一个处理单元DistrictMatchingBolt: spout的输出数据即DistrictMatchingBolt的输入数据格式的列数和内容,即:"vehicle_number","date_time","occupied","speed","bearing","lantitude","longitude" 共7列。

 

  1.  
  2. 
    
    public class GPSReceiverSpout implements IRichSpout {
        private static final long serialVersionUID = 1L;
    private SpoutOutputCollector _collector;
        private BufferedReader fileReader;
        //private TopologyContext context;
        //private String file="/home/ghchen/2013-01-05.1/2013-01-05--11_05_48.txt";
        private TupleInfo tupleInfo=new TupleInfo();
        
        static Socket sock=null;
        
        @Override
        public void close() {
        }

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    {
    _collector = collector;

    System.out.println("This is open function in FieldSpout !");


    }


        @SuppressWarnings("unused")
    @Override
    public void nextTuple() {
         int count=0;
         int ch=0;
         int err=0;
         try {
         if(sock==null){
         sock=new Socket("172.20.14.XXX",portXXXX);}
         while(true){
    byte[] b3= new byte[3];
    if(sock!=null ){
    try{
    sock.getInputStream().read(b3,0,3);
    ch=b3[0];
    }catch ( Exception e){
    System.out.println("connection reset, reconnecting ...");
    sock.close();
    Thread.sleep(100);
    sock=new Socket("172.20.14.XXX",portXXXX);;
    }

    }else{
    sock=new Socket("172.20.14.XXX",portXXXX);;
    break ;
    }
         int len=SocketJava.bytesToShort(b3, 1);
         if(len<<span style="margin: 0px; padding: 0px; border: 0px; color: rgb(0, 153, 153);">0) break;
         byte[] bytelen= new byte[len];
         sock.getInputStream().read(bytelen);
         if(bytelen==null){
         System.out.println("read the second part from byte from socket failed ! ");
         break;
         }
         sock.getInputStream().markSupported();
         sock.getInputStream().mark(3);

         String gpsString=SocketJava.DissectOneMessage(ch,bytelen);
           String[] GPSRecord=null;
         if(gpsString!=null){
         GPSRecord =gpsString.split(TupleInfo.getDelimiter());

         _collector.emit(new Values(GPSRecord[0],GPSRecord[3],GPSRecord[7],GPSRecord[5],
         GPSRecord[6] , GPSRecord[2],GPSRecord[1]));
         //}

         }else{
         break;
         }

         }
         } catch (IOException e) {
          e.printStackTrace();
         } catch (Exception e) {
          e.printStackTrace();
         }



        }

        @Override
        public void ack(Object id) {
         System.out.println("OK:"+id);
        }
        

        @Override
        public void fail(Object id) {
         System.out.println("Fail:"+id);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
     
         TupleInfo tuple = new TupleInfo();
         Fields fieldsArr;
         try {
         fieldsArr= tuple.getFieldList();
         declarer.declare(fieldsArr);

    } catch (Exception e) {
    throw new RuntimeException("error:fail to new Tuple object in declareOutputFields, tuple is null",e);
    }

        }

    @Override
    public void activate() {
    }

    @Override
    public void deactivate() {
    }

    @Override
    public Map<<span style="margin: 0px; padding: 0px; border: 0px;">String, Object> getComponentConfiguration() {
    return null;
    }

    static int count=0;
    public static void writeToFile(String fileName, Object obj){
    try {

    FileWriter fwriter;
    fwriter= new FileWriter(fileName,true);
    BufferedWriter writer= new BufferedWriter(fwriter);

    writer.write(obj.toString());

    writer.close();

    } catch (IOException e1) {

    e1.printStackTrace();
    }
    }    
    }  

相关博文:

0

阅读 评论 收藏 转载 喜欢 打印举报/Report
  • 评论加载中,请稍候...
发评论

    发评论

    以上网友发言只代表其个人观点,不代表新浪网的观点或立场。

      

    新浪BLOG意见反馈留言板 电话:4000520066 提示音后按1键(按当地市话标准计费) 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 会员注册 | 产品答疑

    新浪公司 版权所有