摘要:本篇教程探讨了大数据技术全面解读 流式计算之storm详解(二)常用命令与wc实例,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术全面解读 流式计算之storm详解(二)常用命令与wc实例,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一、常用命令
1.提交命令
提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
torm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
2.杀死任务
storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10
3.停用任务
storm deactivte 【拓扑名称】
storm deactivte topology-name
#我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。
销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。
4.启用任务
storm activate【拓扑名称】
storm activate topology-name
5.重新部署任务
storm rebalance 【拓扑名称】
storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。
再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。
二、wordCount示例程序
###以下内容可以替换为Jstrom的依赖!包是backtype的topologyBuilder!
1.引入依赖
JStorm是阿里巴巴基于storm的二次开发,完全兼容storm!
// 本地提交时请注释掉作用域(provided不参与打包)
2。编写相关程序
参考://..net/u010454030/article/details/52576346
//m635674608.iteye.com//2221179
package com.jiangbei;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* wordcount类
*
* @author zcc ON 2018/3/6
**/
public class WordCount {
public static void main(String[] args) throws Exception{
// 创建TopologyBuilder用来创建topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("mySpout", new MySpout(), 1);
builder.setBolt("split", new MyBolt1(), 10).shuffleGrouping("mySpout");
builder.setBolt("count", new MyBolt2(), 2).fieldsGrouping("split", new Fields("word"));
// 创建configuration
Config config = new Config();
config.setNumWorkers(2);
// 本地模式很有用
// config.setDebug(true);
// 向集群提交
// StormSubmitter.submitTopologyWithProgressBar("wordcount", config,builder.createTopology());
// 本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcount", config, builder.createTopology());
}
}
package com.jiangbei;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 自定义spout
*
* @author zcc ON 2018/3/6
**/
public class MySpout extends BaseRichSpout{
private SpoutOutputCollector collector;
/**
* 初始化方法
* @param map
* @param topologyContext
* @param spoutOutputCollector
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
/**
* storm框架的操作(类似于while true中的循环体)
*/
@Override
public void nextTuple() {
// 这里的Values是arrayList的一个子类
collector.emit(new Values("i love china"));
}
/**
* 声明tuple发送流
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("love"));
}
}
package com.jiangbei;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 自定义bolt
*
* @author zcc ON 2018/3/6
**/
public class MyBolt1 extends BaseRichBolt{
private OutputCollector collector;
/**
* 初始化方法
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
/**
* 循环调用的循环体
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
// 从上一步中的tuple取出value,由于知道是String,而values是list的子类,故通过角标即可
String line = tuple.getString(0);
String[] words = line.split(" ");
for (String word : words) {
collector.emit(new Values(word, 1));
}
}
/**
* 声明方法
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","num"));
}
}
package com.jiangbei;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* 自定义bolt
*
* @author zcc ON 2018/3/6
**/
public class MyBolt2 extends BaseRichBolt{
private OutputCollector collector;
private Map
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer num = tuple.getInteger(1);
if (map.containsKey(word)) {
Integer count = map.get(word);
map.put(word, count + num);
} else {
map.put(word, 1);
}
System.out.println("count==========>" + map);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
本地运行直接运行即可!
以上的spout、bolt等都是采用的自定义的,主要通过继承BaseRichSpout等来实现!;这里解释一下 其中的一些概念与相关类:
最后一个bolt必须按照field进行分组,这样才能进行计数!,这里的new Field()里的值必须是上游里面declare的值。才能对应上。
SpoutOutputCollector——对象提供了发射tuple的方法
整个过程原理图如下所示:
3.Stream Grouping详解
Storm里面有7种类型的stream grouping
l Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。(类似MR的hash算法)
l Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
l All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
l Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
l Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
l Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
l Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-IT技术咨询与就业发展一体化服务 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号