加载中…
个人资料
新技术笔记
新技术笔记
  • 博客等级:
  • 博客积分:0
  • 博客访问:403,191
  • 关注人气:197
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
相关博文
推荐博文
正文 字体大小:

实时计算平台Storm的入门教程

(2014-09-26 19:24:48)
标签:

杂谈

分类: 技术荟萃

关于Storm

Twitter将Storm正式开源了,这是一个分布式的、容错的实时计算系统,它被托管在GitHub上,遵循 Eclipse Public License 1.0。Storm是由BackType开发的实时处理系统,BackType现在已在Twitter麾下。GitHub上的最新版本是Storm 0.9.2,基本是用Clojure写的。

Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。 Storm的主工程师Nathan Marz表示:

Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比 Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。更棒的是你可以使用任意编程语言来做开发。

Storm的主要特点如下:

  1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。

  2. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。

  3. 容错性。Storm会管理工作进程和节点的故障。

  4. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。

  5. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。

  6. 快速。系统的设计保证了消息能得到快速的处理,使用ØMQ(ZeroMQ)作为其底层消息队列。

  7. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。

可以和Storm相提并论的系统有Esper、Streambase、 HStreaming和Yahoo S4。其中和Storm最接近的就是S4。两者最大的区别在于Storm会保证消息得到处理。这些系统中有的拥有内建数据存储层,这是Storm所没有的,如果需要持久化,可以使用一个类似于Cassandra或Riak这样的外部数据库。

开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行Storm,提交用于在集群中运行的Topology。Maven用户可以使用clojars.org提供的Storm依赖,地址是 http://clojars.org/repo。

要运行Storm集群,你需要Apache Zookeeper、ØMQ、JZMQ、Java 6和Python 2.6.6。ZooKeeper用于管理集群中的不同组件,ØMQ是内部消息系统,JZMQ是ØMQ的Java Binding。有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群。关于详细的步骤,可以阅读Storm Wiki上的《Setting up a Storm cluster》。设置开发环境参考 Setting up a development environment,创建一个新Storm项目参考 Creating a new Storm project

本教程使用了 storm-starter 项目中的例子。推荐你复制一份项目并按照例子演练一遍。

Storm集群的组件

Storm集群非常类似Hadoop集群。Hadoop上运行的是MapReduce jobs,而Storm运行的是topologies。Jobs和topologies本身是不同的,其中一个最大的不同就是,Mapreduce job最终会结束,而topology则会持续的处理消息(直到你杀掉它)。

Storm集群由一个主节点和多个工作节点组成:master节点和worker节点。master节点运行一个守护进程,叫Nimbus,类似Hadoop中的JobTracker。Nimbus负责在集群中分发代码,分配任务,以及故障检测。

每个worker节点运行一个守护进程,叫Supervisor。Supervisor监听分配到该服务器的任务,开始和结束工作进程。每个worker进程执行topology的一个子集;一个运行中的topology由许多分布在多台机器上的worker进程组成。

Nimbus和Supervisors之间是通过Zookeeper协调。此外,Nimbus和Supervisor是能快速失败(fail-fast)和无状态的(stateless);所有的状态都保存在Zookeeper或者在本地磁盘中。这意味这你可以kill –9杀掉Nimbus或者Supervisors,随后它们会自动恢复好像什么也没发生过。这项设计使得Storm集群变得非常稳定健壮。

image

Storm的术语解释

Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Spout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。在Storm Concepts页面里对这些术语有更详细的描述。

(1)Topologies 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce Job

(2)Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理

(3)Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple

(4)Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作

(5)Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.

Topologies

在Storm中进行实时计算,你可以创建所谓的topologies。一个topology是一个计算图(a graph of computation)。topology的每个节点包括处理逻辑,节点之间数据如何传输的连接。

运行一个topology非常简单。将代码和相关依赖打包成一个简单jar包,运行如下命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

将执行这个类:backtype.storm.MyTopology,参数arg1和arg2。这个类的最主要功能定义了topology,并将其提交给Nimbus。storm jar 部分负责连接Nimbus和上传jar。

因为topology定义是Thrift结构,Nimbus是一个Thrift service,所以你可以使用任何编程语言创建和提交topologies。上面的例子是最简单的方式,使用JVM-based的语言。

Streams

Storm中最核心的抽象就是stream。stream是一个无边界的tuples序列。Storm提供了基本流转换的分布式的可靠的方法。例如,你可以将tweets流转换成一个trending topics流。

基本的原始Storm提供了spouts和bolts的流转换。Spouts和bolts提供了接口,实现你的应用逻辑。

spout是流源头。例如,一个spout可以是从Kestrel队列中读取tuples并以流形式发射(emit)出。或者一个spout可以连接Twitter API,发出一个tweets流。

一个bolt使用任意数量的输入流,做些处理,可能再发射出新的流。复杂的流转换,例如从tweets流中计算出trending topics流,需要多个步骤,所以需要多个bolts。bolts可以做任何事情,包括运行函数、过滤tuples、流的聚合、流连接、数据库交互等等。

spouts和bolts的网络封装到topology,后者是你提交给storm集群运行的最上层的抽象。一个topology是一个由spout或bolt节点组成的流变换的图,图的边指示哪个bolts订阅给哪个stream。当一个spout或bolt发射一个tuple给stream,它将发送这个tuple到订阅了这个stream的每一个bolt。

image

在topology中,节点之间的连接指示这tuples如何传递。例如,如果Spout A和Bolt B之间存在连接,Spout A和Bolt C之间存在连接,Bolt B和BoltC之间存在连接,那么每时刻Spout A发送出一个tuple,它将发给Bolt B和Bolt C,所有Bolt B的输出tuple将都会发给Bolt  C。

每个Storm topology中,在各个节点中都是并行执行。在topology中,你可以指定每个节点的并行数,然后Storm将会开启相应数量的线程来运行。

每个topology会永久运行,直到你kill它。Storm会自动再分配失败的任务。此外,Storm会保证不会有数据丢失,即使是宕机消息丢失。

Data Model

Storm使用tuples作为它的数据模型。每个tuple是值的名字序列(a named list of values),且tuple中域(field)可以是任何类型对象。Storm支持所有的原始类型,strings、byte arrays,作为tuple field values。如果要使用新类型对象,需要为这个类型实现一个序列化(a serializer)。

每个节点都必须声明它发送出的tuple的输出域。例如,这个bolt声明了它发送2个tuple,对应的域类型是double和triple

```java public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector;@Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } } ```

A simple topology

来看看一个简单topology实例,深入探索相关的概念和编码。下面是ExclamationTopology的定义:

java TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1");

这个topology包括1个spout和2个bolts。这个spout发送words,每个bolt将输入字符串附加上“!!!”。各节点安排在一条线:spout发给第一个bolt,后者在发给第二个bolt。例如spout发送[“bob”] and [“john”],经过两个bolt,将会发送出[“bob!!!!!!”] and [“john!!!!!!”]。

代码中使用的setSpout和setBolt方法,三个参数:用户自定义的ID,包含处理逻辑的对象,在节点上并行的数量。

这个对象包含两个接口的处理逻辑的实现:spout的一个接口 IRichSpout ,bolt的一个接口 IRichBolt。

最后一个参数,你希望节点上并行数,是可选项。它声明了这个集群中由多少个线程来运行这个组件。如果省略这个参数,Storm缺省下只分配一个线程给一个节点。

setBolt返回一个 InputDeclarer 对象,用于声明Bolt的输入。这个例子中,“exclaim1”定义了它需要读取所有来自“words”组件以shuffle grouping方式发出的tuples,“exclaim2”定义了需要读取所有来自“exclaim1”组件以shuffle grouping方式发出的tuples。shuffle grouping意思是将tuples从输入任务中随机分配给bolts任务。关于组件之间分组数据的方式将后面grouping章节。

如果你想让组件exclaim2同时读取words组件和exclaim1组件两者的tuples,你可以这个定义exclaim2,输入定义可以指定多个来源,形成链式:

java builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");

进一步看看这个topology中spout和bolt的实现。Spouts负责产生新的消息给这个topology。TestWordSpout 是从一个列表中每隔100ms随机选择单词生成一个tuple。在 TestWordSpout 的nextTuple()实现如下:

java public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }

Blot中的ExclamationBolt完整实现如下:


```java public static class ExclamationBolt implements IRichBolt { OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}

public void execute(Tuple tuple) {
    _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    _collector.ack(tuple);
}

public void cleanup() {
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

public Map getComponentConfiguration() {
    return null;
} } ```

其中prepare方法提供给bolt一个OutputCollector,用于发送出一个tuple。Tuple可以被任何时间发送,在prepare、execute、cleanup方法,甚至是其他线程中异步方式。这里的prepare实现的很简单,将OutputCollector存成一个实例变量,在后面的execute方法中使用。

其中execute方法是从一个bolt输入中接收一个tuple,ExclaimationBolt提取tuple中第一个域,附加上“!!!”成一个新字符串,发送成一个新的tuple。如果你实现一个bolt读取多个输入源,你可以使用Tuple#getSourceComponent方法找到Tuple来自哪个组件。execute中执行了一些方法:输入tuple被作为emit的第一个参数,输入tuple is acked on the final line。这些API是Storm保证不丢失数据的基础。

其中cleanup方法,是在Bolt关闭时调用,需要清理掉所有被打开的资源。在集群中并不会保证执行这个方法:例如,服务器宕机了,是没有办法调用这个方法的。cleanup方法主要还是用于你以local mode方式运行topologies,在运行和和杀掉很多topologies时没有资源泄露。

其中declareOutputFields方法声明了ExclamationBolt发送的1-tuples,是以名叫word的一个域。

其中getComponentConfiguration方法允许你为这个组件运行配置不同参数。参看Configuration说明。

方法cleanup和getComponentConfiguration通常在bolt实现时是不需要的。你可以通过继承BaseRichBolt(这个基类提供了一个缺省的实现)来更简单的定义一个bolt。如下:


```java public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _collector = collector;
}

public void execute(Tuple tuple) {
    _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    _collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}     } ```

Running ExclamationTopology in local mode

Storm有两种操作模式:local mode和distributed mode。在local mode中,Storm通过线程来模拟worker nodes,全部运行在进程中。Local mode主要是方便topologies的测试和开发。在distributed mode中,提交topology给master同时,也需要将运行的代码提交。master会分发代码,并分配worker来运行topology。如果worker挂了,master会重新分配任务到其他地方。

下面是以local mode方式运行ExclamationTopology的代码:

```java Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“test”, conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology(“test”); cluster.shutdown(); ```

其中,setNumWorkers可以指定分配多少进程用于运行topology。topology中每个组件以多线程方式运行。setDebug可以设置debug模式,每个组件发送的每个message都会log记录。

学习如何创建你的开发环境,以local mode运行topologies,详情可以参考: Creating a new Storm project.

Stream grouping分类

1、Shuffle Grouping:随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同.

2、Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts.

3、All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到.

4、Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.

5、Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.

6、Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

Storm如何保证消息被处理

storm保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构),并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。一个tuple能根据新获取到的spout而触发创建基于此的上千个tuple
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",
                                     22133,
                                     "sentence_queue",
                                     new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10).shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20).fieldsGrouping(2, new Fields("word"));

这个topology从kestrel queue读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个tuple负责读取句子,每一个tuple分别对应计算每一个单词出现的次数,大概样子如下所示:

image

一个tuple的生命周期:

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个tuple-id来追踪这个tuple。举例来说, KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:

collector.emit(new Values("field1", "field2", 3) , msgId);

接下来, 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪这个消息的树形结构是否创建,根据messageid调用Spout里面的ack函数以确认tuple是否被完全处理。如果tuple超时就会调用spout的fail方法。由此看出同一个tuple不管是acked还是fail都是由创建他的那个spout发出的,所以即使spout在集群环境中执行了很多的task,这个tule也不会被不同的task去acked或failed.

当kestrelspout从kestrel队列中得到一个消息后会打开这个他,这意味着他并不会把此消息拿走,消息的状态会显示为pending,直到等待确认此消息已经处理完成,处于pending状态直到ack或者fail被调用,处于"Pending"的消息不会再被其他队列消费者使用.如果在这过程中spout中处理此消息的task断开连接或失去响应则此pending的消息会回到"等待处理"状态.

Storm的一些常见应用场景

1、流聚合

流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段。

builder.setBolt(5, new MyJoiner(), parallelism)
  .fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))

2、批处理

有时候为了性能或者一些别的原因, 你可能想把一组tuple一起处理, 而不是一个个单独处理。

3、BasicBolt

(1)读一个输入tuple
(2)根据这个输入tuple发射一个或者多个tuple
(3)在execute的方法的最后ack那个输入tuple
遵循这类模式的bolt一般是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口: IbasicBolt

4、内存内缓存+Fields grouping组合

在bolt的内存里面缓存一些东西非常常见。缓存在和fields grouping结合起来之后就更有用了。比如,你有一个bolt把短链接变成长链接(bit.ly, t.co之类的)。你可以把短链接到长链接的对应关系利用LRU算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。看一下下面两段代码有什么不一样:

builder.setBolt(2, new ExpandUrl(), parallelism)
  .shuffleGrouping(1);
builder.setBolt(2, new ExpandUrl(), parallelism)
  .fieldsGrouping(1, new Fields("url"));

5、计算top N

比如你有一个bolt发射这样的tuple: "value", "count"并且你想一个bolt基于这些信息算出top N的tuple。最简单的办法是有一个bolt可以做一个全局的grouping的动作并且在内存里面保持这top N的值。
这个方式对于大数据量的流显然是没有扩展性的, 因为所有的数据会被发到同一台机器。一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 然后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N, 代码大概是这样的:

builder.setBolt(2, new RankObjects(), parallellism)
  .fieldsGrouping(1, new Fields("value"));
builder.setBolt(3, new MergeObjects())
  .globalGrouping(2);

这个模式之所以可以成功是因为第一个bolt的fields grouping使得这种并行算法在语义上是正确的。
用TimeCacheMap来高效地保存一个最近被更新的对象的缓存

6、用TimeCacheMap来高效地保存一个最近被更新的对象的缓存

有时候你想在内存里面保存一些最近活跃的对象,以及那些不再活跃的对象。 TimeCacheMap 是一个非常高效的数据结构,它提供了一些callback函数使得我们在对象不再活跃的时候我们可以做一些事情.

7、分布式RPC:CoordinatedBolt和KeyedFairBolt

用storm做分布式RPC应用的时候有两种比较常见的模式:它们被封装在CoordinatedBolt和KeyedFairBolt里面. CoordinatedBolt包装你的bolt,并且确定什么时候你的bolt已经接收到所有的tuple,它主要使用Direct Stream来做这个。KeyedFairBolt同样包装你的bolt并且保证你的topology同时处理多个DRPC调用,而不是串行地一次只执行一个。

 

参考资料:

1、Storm Tutorial http://storm.incubator.apache.org/documentation/Tutorial.html
2、Twitter Storm:开源实时Hadoop http://www.infoq.com/cn/news/2011/09/twitter-storm-real-time-hadoop/

0

阅读 评论 收藏 转载 喜欢 打印举报/Report
  • 评论加载中,请稍候...
发评论

    发评论

    以上网友发言只代表其个人观点,不代表新浪网的观点或立场。

      

    新浪BLOG意见反馈留言板 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 会员注册 | 产品答疑

    新浪公司 版权所有