加载中…
个人资料
铁血柔情
铁血柔情
  • 博客等级:
  • 博客积分:0
  • 博客访问:92,935
  • 关注人气:31
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
相关博文
推荐博文
谁看过这篇博文
加载中…
正文 字体大小:

基于storm的实时GPS数据客流特征分析系统 源码分析之(二):DistrictMatchingBolt

(2013-02-19 11:14:50)
标签:

gps

storm

实时客流矩阵

云计算

it

分类: storm/haddop/云计算
最重要的两个函数是 excute() 和declareOutputFields(OutputFieldsDeclarer declarer)。

excute() 将GPSReceiverSpout接受到的数据提取经度和纬度,调用Sects类 Sect.fetchSect(GPSrecord)方法,查询本地的地理信息数据库,返回该条GPS记录所在的区域标号 districtID,并将这个字段添加到GPS后面,发射给下一个bolt : countBolt。

declareOutputFields() 告诉下一个countBolt , 这个DistrcitMatchingBolt的输出数据格式是:"viechleID", "dateTime", "occupied", "speed","bearing", "latitude", "longitude", "districtID"。

需要说明的是Sects类调用了开源的地理信息系统工具geotools,感兴趣的朋友可以去http://www.geotools.org/ 下载安装包,并将相关的jar包全部添加到Eclipse 的building path里面,就可以调用geotools查询本地的地理信息数据库了。

 

  1.  
  2. 
    
    
    
    package main.java.realODMatrix.bolt;

    import java.io.IOException;
    import java.util.List;
    import java.util.Map;


    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;

    import backtype.storm.tuple.Tuple;
    import main.java.realODMatrix.spout.FieldListenerSpout;
    import main.java.realODMatrix.struct.*;

    public class DistrictMatchingBolt implements IRichBolt {

    private static final long serialVersionUID = -433427751113113358L;

    private OutputCollector _collector;

    Integer districtID ;
    GPSRcrd record;
    Map<<span style="margin: 0px; padding: 0px; border: 0px;">GPSRcrd, Integer> gpsMatch; //map
    Integer taskID;
    String taskname;
    List<<span style="margin: 0px; padding: 0px; border: 0px;">Object> inputLine;
     
    
    Fields matchBoltDeclare=null;

    static String path = "/home/ghchen/sects/sects.shp";
    static Sects sects=null ;
    int count=0;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    // TODO Auto-generated method stub
    this._collector=collector;
    this.taskID=context.getThisTaskId();
    this.taskname=context.getThisComponentId();

    }


    @Override
    public void execute(Tuple input) {

    try {
    if(sects==null){
    sects= new Sects(path);
    }

    List<</span>Object> inputLine = input.getValues();//getFields();
    Fields inputLineFields = input.getFields();

    record=new GPSRcrd(Double.parseDouble((String) inputLine.get(6)),
    Double.parseDouble((String) inputLine.get(5)), Integer.parseInt((String) inputLine.get(3)),
    Integer.parseInt((String) inputLine.get(4)));

    if( Double.parseDouble((String) inputLine.get(6)) > 114.5692938 ||
    Double.parseDouble((String) inputLine.get(6)) <</span> 113.740000 ||
    Double.parseDouble((String) inputLine.get(5)) > 22.839945 ||
    Double.parseDouble((String) inputLine.get(5)) <</span> 22.44
    ) return;


    districtID = sects.fetchSect(record);

    if(districtID!=-1)
    {
    System.out.println(count++ +": GPS Point falls into Sect No. :" + districtID);


    inputLine.add(Integer.toString(districtID));
    //input.getFields().toList().add("districtID");
    List<<span style="margin: 0px; padding: 0px; border: 0px;">String> fieldList= input.getFields().toList();
    fieldList.add("districtID");
    matchBoltDeclare=new Fields(fieldList);
    //FieldListenerSpout.writeToFile("/home/ghchen/output","matchBoltDeclare="+matchBoltDeclare);


    String[] obToStrings=new String[inputLine.size()];
    obToStrings=inputLine.toArray(obToStrings);

    _collector.emit(new Values(obToStrings));
    //_collector.emit(new Values(inputLine));
    }

    } catch (Exception e) {

    e.printStackTrace();
    }

    _collector.ack(input);

    }


    @Override
    public void cleanup() {
    // TODO Auto-generated method stub

    System.out.println("-- District Mathchier ["+taskname+"-"+districtID+"] --");
    for(Map.Entry<</span>GPSRcrd, Integer> entry : gpsMatch.entrySet()){
    System.out.println(entry.getKey()+": "+entry.getValue());
    }

    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields ("viechleID", "dateTime", "occupied", "speed",
    "bearing", "latitude", "longitude", "districtID"));
    }


    @Override
    public Map<</span>String, Object> getComponentConfiguration() {
    // TODO Auto-generated method stub
    return null;
    }

    }

---------

相关博文:

0

阅读 评论 收藏 转载 喜欢 打印举报/Report
  • 评论加载中,请稍候...
发评论

    发评论

    以上网友发言只代表其个人观点,不代表新浪网的观点或立场。

      

    新浪BLOG意见反馈留言板 电话:4000520066 提示音后按1键(按当地市话标准计费) 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 会员注册 | 产品答疑

    新浪公司 版权所有