您的当前位置:首页正文

Storm配置运行

来源:华佗健康网

Storm核心概念

Nimbus:Storm集群主节点,负责资源分配和任务调度。我们提交任务和截止任务都是在Nimbus上操作的。一个Storm集群只有一个Nimbus节点。

Supervisor:Storm集群工作节点,接受Nimbus分配任务,管理所有Worker。

Worker:工作进程,每个工作进程中都有多个Task。

Task:任务,每个Spout和Bolt都是一个任务,每个任务都是一个线程。

Topology:计算拓扑,包含了应用程序的逻辑。

Stream:消息流,关键抽象,是没有边界的Tuple序列。

Spout:消息流的源头,Topology的消息生产者。

Bolt:消息处理单元,可以过滤、聚合、查询数据库。

Stream grouping:消息分发策略,一共6种,定义每个Bolt接受何种输入。

Reliability:可靠性,Storm保证每个Tuple都会被处理。

                  Storm.yaml常用配置项

配置选项名称

配置选项作用

topology.max.task.parallelism

每个Topology运行时最大的executor数目

topology.workers

每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖

storm.zookeeper.servers

zookeeper集群的节点列表

storm.local.dir

storm.zookeeper.root

ui.port

nimbus.host:

Nimbus节点的host

supervisor.slots.ports

Supervisor 节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来 进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的 Topology分配

supervisor.worker.timeout.secs

Worker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会重新分配其运行着的task任务

drpc.servers

在使用drpc服务时,drpc server的服务器列表

drpc.port

在使用drpc服务时,drpc 的服务端口

 

Storm运行依赖zookeeper,需启动zookeeper,在此不做过多介绍可参考:https://www.cnblogs.com/zhaojiankai/p/7126181.html

Zookeeper集群在Storm集群中的作用:

Zookeeper集群负责Nimbus节点和Supervior节点之间的通信,监控各个节点之间的状态。比如通常我们提交任务的时候是在Nimbus节点上执行的,Nimbus节点通过zk集群将任务分发下去,而Supervisor是真正执行任务的地方。Nimbus节点通过zk集群监控各个Supervisor节点的状态,当某个Supervisor节点出现故障的时候,Nimbus节点就会通过zk集群将那个Supervisor节点上的任务重新分发,在其他Supervisor节点上执行。这就意味着Storm集群也是高可用集群,如果Nimbus节点出现故障的时候,整个任务并不会停止,但是任务的管理会出现影响,通常这种情况下我们只需要将Nimbus节点恢复就可以了。Nimbus节点不支持高可用,这也是Storm目前面临的问题之一。不过一般情况下,Nimbus节点的压力不大,通常不会出现问题。

一般情况下,Zookeeper集群的压力并不大,一般只需要部署3台就够了。Zookeeper集群在Storm集群中逻辑上是独立的,但在实际部署的时候,一般会将zk节点部署在Nimbus节点或Supervisor节点上。

 

安装:

wget 
sudo tar -zxvf  -C /usr/local 
cd /usr/local
sudo mv apache-storm-0.9.6 storm
sudo vi /etc/profile
export STORM_HOME=/usr/local/storm
export PATH=$PATH:${STORM_HOME}/bin
sudo source /etc/profile

 

2.配置storm.yaml

 

storm.zookeeper.servers:
- "node3"

nimbus.seeds: ["node3"]

supervisor.slots.ports: 
- 6700
- 6701
- 6702
- 6703

ui.port: 8889 

storm.local.dir: "/usr/local/storm/data"

注意:以上配置,凡是有冒号的地方,冒号后都要有个空格。

将配置好的拷贝到其余两台机器上

[root@log1 local]# scp -pr apache-storm-1.0.0 root@114.55.29.241:/usr/local/
[root@log1 local]# scp -pr apache-storm-1.0.0 root@114.55.253.15:/usr/local/

 启动 nimbus、supervisor、ui

cd /usr/local/storm/bin

./storm nimbus

./storm supervisor

./storm ui

 

node1:启动、nimbus、supervisor、ui

node2:supervisor

node3:supervisor

 

 

 

Storm常用命令

  1、任务提交命令:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】

  storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology  wordcount

  与hadoop不同的是:不需要指定输入输出路径 如:hadoop jar /usr/local/wordcount.jar /data.txt /wcout

##杀死任务命令格式:storm kill name -w seconds
storm kill wordcount -w 10

##停用任务命令格式:storm deactivte name 
storm deactivte wordcount

##启用任务命令格式:storm activate name
storm activate wordcount

##平衡任务命令格式:storm rebalance name
storm rebalance wordcount

storm策略和storm并行度

storm策略
storm里面有6种类型的stream grouping:
1.Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。轮询,平均分配。
2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
4. Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5. Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,不平均分配。
6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来或者处理它的消息的taskid(OutputCollector.emit方法也会返回taskid)

storm并行度
回顾之前,我们已经介绍过,一个Topology可以运行在多个worker上,这样可以提高数据处理能力。因为一个worker就是一个进程,很自然的,我们可以想到,如果一个worker中可以再起多个线程的话,那么效率就会更高。事实上,Storm就是这么干的,worker并不是Storm集群中的最小运行单位。Executer才是Storm集群中的最小运行单位。Executer实际上就是一个线程。你可以这样理解,worker是Topology的最小运行单位,而Executer是Spout或者Bolt的最小运行单位。回顾一下我们的WordCountApp案例中,创建Topology的代码
//定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“word-reader” , new WordReader());
builder.setBolt(“word-normalizer” , new WordNormalizer()).shuffleGrouping(“word-reader” );
builder.setBolt(“word-counter” , new WordCounter()).fieldsGrouping(“word-normalizer” , new Fields(“word”));
StormTopology topology = builder .createTopology();
在这段代码中,我们没有设置并发度,也没有设置worker的数量。Storm默认就会给这个Topology分配1个Worker,在这个Worker启动三个线程,1个用来运行WordReader,1个线程用来运行WordNormalizer,1个线程用来运行WordCounter。
如果我们用图形来表示的话,应该是这样:

 

这个图的这个图意思是,WordCountApp这个Topology运行在一个Worker上,在这个worker中,分别其了三个线程(executor),分别用来执行Topology的三个组件:WordReader、WordNormalizer、WordCounter。
那么假设我们想用两个线程来执行WordNormalizer,行不行呢?
很简单,目前我们setSpout和setBolt的时候,调用的分别是TopologyBuilder以下两个方法:
setSpout(String id, IRichSpout spout )
setBolt(String id, IRichBolt bolt )
这两个方法,表示使用默认的并发度,也就是1.
我们可以调用另外两个方法,显示的指定并发度。
setSpout(String id, IRichSpout spout , Number parallelism_hint)
setBolt (String id , IRichBolt bolt, Number parallelism_hint )
现在我们修改WordCountApp,设置WordNormalizer 并发度为2

1.builder.setBolt( “word-normalizer” , new WordNormalizer(),2).shuffleGrouping(“word-reader” );
为了更加方面的观察,我们将WordNormalizer中prepare方法打印的内容修改一下:
1.System.out.println( “WordNormalizer.prepare(),taskId:” +context.getThisTaskId()+ “,hashcode:” +this);
现在运行程序,观察输出,输出的日志中,应该包含以下两句话:
1.WordNormalizer.prepare(),taskId:3,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@41106aa2
2.WordNormalizer.prepare(),taskId:4,hashcode:com.tianshouzhi.study.wordcountapp.bolts.WordNormalizer@3f4c04b1
我们可以看到,我们设置并行度为2的时候, WordNormalizer被实例化了2次,意味着有2个 WordNormalizer实例,而Storm会分配2个executer来分别运行一个实例。所以,此时我们的Topology运行时,是这样的:

 

图中,用蓝色特别标记出,有2个线程分别用来执行一个WordNormalizer实例。
理论上,我们也同样可以给WordReader和WordCounter来设置并发度。但是具体问题要具体分析,在本案例,并不适合给WordReader和WordCounter设置过高的并发度。
---------------------
原文:https:///qq_37095882/article/details/77624340

 Storm容错机制:

storm acker 机制

首先来看一下什么叫做记录级容错?storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每一个消息单元是否在指定时间内被完全处理了。那什么叫做完全处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple经过了topology中每一个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了。 

转载于:https://www.cnblogs.com/Bkxk/p/10249039.html

因篇幅问题不能全部显示,请点此查看更多更全内容