Storm核心概念
Nimbus:Storm集群主节点,负责资源分配和任务调度。我们提交任务和截止任务都是在Nimbus上操作的。一个Storm集群只有一个Nimbus节点。
Supervisor:Storm集群工作节点,接受Nimbus分配任务,管理所有Worker。
Worker:工作进程,每个工作进程中都有多个Task。
Task:任务,每个Spout和Bolt都是一个任务,每个任务都是一个线程。
Topology:计算拓扑,包含了应用程序的逻辑。
Stream:消息流,关键抽象,是没有边界的Tuple序列。
Spout:消息流的源头,Topology的消息生产者。
Bolt:消息处理单元,可以过滤、聚合、查询数据库。
Stream grouping:消息分发策略,一共6种,定义每个Bolt接受何种输入。
Reliability:可靠性,Storm保证每个Tuple都会被处理。
Storm.yaml常用配置项
配置选项名称 | 配置选项作用 |
topology.max.task.parallelism | 每个Topology运行时最大的executor数目 |
topology.workers | 每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖 |
storm.zookeeper.servers | zookeeper集群的节点列表 |
storm.local.dir | |
storm.zookeeper.root | |
ui.port | |
nimbus.host: | Nimbus节点的host |
supervisor.slots.ports | Supervisor 节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来 进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的 Topology分配 |
supervisor.worker.timeout.secs | Worker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会重新分配其运行着的task任务 |
drpc.servers | 在使用drpc服务时,drpc server的服务器列表 |
drpc.port | 在使用drpc服务时,drpc 的服务端口 |
Storm运行依赖zookeeper,需启动zookeeper,在此不做过多介绍可参考:https://www.cnblogs.com/zhaojiankai/p/7126181.html
Zookeeper集群在Storm集群中的作用:
Zookeeper集群负责Nimbus节点和Supervior节点之间的通信,监控各个节点之间的状态。比如通常我们提交任务的时候是在Nimbus节点上执行的,Nimbus节点通过zk集群将任务分发下去,而Supervisor是真正执行任务的地方。Nimbus节点通过zk集群监控各个Supervisor节点的状态,当某个Supervisor节点出现故障的时候,Nimbus节点就会通过zk集群将那个Supervisor节点上的任务重新分发,在其他Supervisor节点上执行。这就意味着Storm集群也是高可用集群,如果Nimbus节点出现故障的时候,整个任务并不会停止,但是任务的管理会出现影响,通常这种情况下我们只需要将Nimbus节点恢复就可以了。Nimbus节点不支持高可用,这也是Storm目前面临的问题之一。不过一般情况下,Nimbus节点的压力不大,通常不会出现问题。
一般情况下,Zookeeper集群的压力并不大,一般只需要部署3台就够了。Zookeeper集群在Storm集群中逻辑上是独立的,但在实际部署的时候,一般会将zk节点部署在Nimbus节点或Supervisor节点上。
安装:
wget
sudo tar -zxvf -C /usr/local
cd /usr/local
sudo mv apache-storm-0.9.6 storm
sudo vi /etc/profile export STORM_HOME=/usr/local/storm export PATH=$PATH:${STORM_HOME}/bin sudo source /etc/profile
2.配置storm.yaml
storm.zookeeper.servers:
- "node3"
nimbus.seeds: ["node3"]
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
ui.port: 8889
storm.local.dir: "/usr/local/storm/data"
注意:以上配置,凡是有冒号的地方,冒号后都要有个空格。
将配置好的拷贝到其余两台机器上
[root@log1 local]# scp -pr apache-storm-1.0.0 root@114.55.29.241:/usr/local/ [root@log1 local]# scp -pr apache-storm-1.0.0 root@114.55.253.15:/usr/local/
启动 nimbus、supervisor、ui
cd /usr/local/storm/bin
./storm nimbus
./storm supervisor
./storm ui
node1:启动、nimbus、supervisor、ui
node2:supervisor
node3:supervisor
Storm常用命令
1、任务提交命令:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology wordcount
与hadoop不同的是:不需要指定输入输出路径 如:hadoop jar /usr/local/wordcount.jar /data.txt /wcout
##杀死任务命令格式:storm kill name -w seconds storm kill wordcount -w 10 ##停用任务命令格式:storm deactivte name storm deactivte wordcount ##启用任务命令格式:storm activate name storm activate wordcount ##平衡任务命令格式:storm rebalance name storm rebalance wordcount
storm策略和storm并行度
storm策略
storm里面有6种类型的stream grouping:
1.Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配。
2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
4. Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5. Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。
6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)
storm并行度
回顾之前,我们已经介绍过,一个Topology可以运行在多个worker上,这样可以提高数据处理能力。因为一个worker就是一个进程,很自然的,我们可以想到,如果一个worker中可以再起多个线程的话,那么效率就会更高。事实上,Storm就是这么干的,worker并不是Storm集群中的最小运行单位。Executer才是Storm集群中的最小运行单位。Executer实际上就是一个线程。你可以这样理解,worker是Topology的最小运行单位,而Executer是Spout或者Bolt的最小运行单位。回顾一下我们的WordCountApp案例中,创建Topology的代码
//定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“word-reader” , new WordReader());
builder.setBolt(“word-normalizer” , new WordNormalizer()).shuffleGrouping(“word-reader” );
builder.setBolt(“word-counter” , new WordCounter()).fieldsGrouping(“word-normalizer” , new Fields(“word”));
StormTopology topology = builder .createTopology();
在这段代码中,我们没有设置并发度,也没有设置worker的数量。Storm默认就会给这个Topology分配1个Worker,在这个Worker启动三个线程,1个用来运行WordReader,1个线程用来运行WordNormalizer,1个线程用来运行WordCounter。
如果我们用图形来表示的话,应该是这样:
这个图的这个图意思是,WordCountApp这个Topology运行在一个Worker上,在这个worker中,分别其了三个线程(executor),分别用来执行Topology的三个组件:WordReader、WordNormalizer、WordCounter。
那么假设我们想用两个线程来执行WordNormalizer,行不行呢?
很简单,目前我们setSpout和setBolt的时候,调用的分别是TopologyBuilder以下两个方法:
setSpout(String id, IRichSpout spout )
setBolt(String id, IRichBolt bolt )
这两个方法,表示使用默认的并发度,也就是1.
我们可以调用另外两个方法,显示的指定并发度。
setSpout(String id, IRichSpout spout , Number parallelism_hint)
setBolt (String id , IRichBolt bolt, Number parallelism_hint )
现在我们修改WordCountApp,设置WordNormalizer 并发度为2
1.builder.setBolt( “word-normalizer” , new WordNormalizer(),2).shuffleGrouping(“word-reader” );
为了更加方面的观察,我们将WordNormalizer中prepare方法打印的内容修改一下:
1.System.out.println( “WordNormalizer.prepare(),taskId:” +context.getThisTaskId()+ “,hashcode:” +this);
现在运行程序,观察输出,输出的日志中,应该包含以下两句话:
1.WordNormalizer.prepare(),taskId:3,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@41106aa2
2.WordNormalizer.prepare(),taskId:4,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@3f4c04b1
我们可以看到,我们设置并行度为2的时候, WordNormalizer被实例化了2次,意味着有2个 WordNormalizer实例,而Storm会分配2个executer来分别运行一个实例。所以,此时我们的Topology运行时,是这样的:
图中,用蓝色特别标记出,有2个线程分别用来执行一个WordNormalizer实例。
理论上,我们也同样可以给WordReader和WordCounter来设置并发度。但是具体问题要具体分析,在本案例,并不适合给WordReader和WordCounter设置过高的并发度。
---------------------
原文:https:///qq_37095882/article/details/77624340
Storm容错机制:
storm acker 机制
首先来看一下什么叫做记录级容错?storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每一个消息单元是否在指定时间内被完全处理了。那什么叫做完全处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple经过了topology中每一个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了。