大数据技术全面解读 流式计算之storm详解(二)常用命令与wc实例
沉沙 2018-10-10 来源 : 阅读 1644 评论 0

摘要:本篇教程探讨了大数据技术全面解读 流式计算之storm详解(二)常用命令与wc实例,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术全面解读 流式计算之storm详解(二)常用命令与wc实例,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

一、常用命令

  1.提交命令

提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
torm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

   2.杀死任务

storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10

  3.停用任务

storm deactivte  【拓扑名称】
storm deactivte topology-name
#我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。
销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

  4.启用任务

storm activate【拓扑名称】
storm activate topology-name

  5.重新部署任务

storm rebalance  【拓扑名称】
storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。
再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。


二、wordCount示例程序

 ###以下内容可以替换为Jstrom的依赖!包是backtype的topologyBuilder!

   1.引入依赖




    org.apache.storm
    storm-core
    1.0.6
    provided




   JStorm是阿里巴巴基于storm的二次开发,完全兼容storm!



        
            com.alibaba.jstorm
            jstorm-core
            2.1.1
            
        




  // 本地提交时请注释掉作用域(provided不参与打包)

  2。编写相关程序

参考://..net/u010454030/article/details/52576346

    //m635674608.iteye.com//2221179

 


package com.jiangbei;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * wordcount类
 *
 * @author zcc ON 2018/3/6
 **/
public class WordCount {
    public static void main(String[] args) throws Exception{
        // 创建TopologyBuilder用来创建topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("mySpout", new MySpout(), 1);
        builder.setBolt("split", new MyBolt1(), 10).shuffleGrouping("mySpout");
        builder.setBolt("count", new MyBolt2(), 2).fieldsGrouping("split", new Fields("word"));
        // 创建configuration
        Config config = new Config();
        config.setNumWorkers(2);
        // 本地模式很有用
        // config.setDebug(true);

        // 向集群提交
        // StormSubmitter.submitTopologyWithProgressBar("wordcount", config,builder.createTopology());

        // 本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcount", config, builder.createTopology());
    }
}




package com.jiangbei;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 自定义spout
 *
 * @author zcc ON 2018/3/6
 **/
public class MySpout extends BaseRichSpout{
    private SpoutOutputCollector collector;
    /**
     * 初始化方法
     * @param map
     * @param topologyContext
     * @param spoutOutputCollector
     */
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    /**
     * storm框架的操作(类似于while true中的循环体)
     */
    @Override
    public void nextTuple() {
        // 这里的Values是arrayList的一个子类
        collector.emit(new Values("i love china"));
    }

    /**
     * 声明tuple发送流
     * @param outputFieldsDeclarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("love"));
    }
}




package com.jiangbei;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 自定义bolt
 *
 * @author zcc ON 2018/3/6
 **/
public class MyBolt1 extends BaseRichBolt{
    private OutputCollector collector;
    /**
     * 初始化方法
     * @param map
     * @param topologyContext
     * @param outputCollector
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    /**
     * 循环调用的循环体
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple) {
        // 从上一步中的tuple取出value,由于知道是String,而values是list的子类,故通过角标即可
        String line = tuple.getString(0);
        String[] words = line.split(" ");
        for (String word : words) {
            collector.emit(new Values(word, 1));
        }
    }

    /**
     * 声明方法
     * @param outputFieldsDeclarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word","num"));
    }
}




package com.jiangbei;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * 自定义bolt
 *
 * @author zcc ON 2018/3/6
 **/
public class MyBolt2 extends BaseRichBolt{
    private OutputCollector collector;
    private Map map = new HashMap<>();
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer num = tuple.getInteger(1);
        if (map.containsKey(word)) {
            Integer count = map.get(word);
            map.put(word, count + num);
        } else {
            map.put(word, 1);
        }
        System.out.println("count==========>" + map);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}



  本地运行直接运行即可!

  以上的spout、bolt等都是采用的自定义的,主要通过继承BaseRichSpout等来实现!;这里解释一下 其中的一些概念与相关类:

    最后一个bolt必须按照field进行分组,这样才能进行计数!,这里的new Field()里的值必须是上游里面declare的值。才能对应上。

    SpoutOutputCollector——对象提供了发射tuple的方法

  整个过程原理图如下所示:

  

 

  3.Stream Grouping详解

    Storm里面有7种类型的stream grouping

    l Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。(类似MR的hash算法)

    l Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

    l All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

    l Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

    l Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

    l Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

    l Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-IT技术咨询与就业发展一体化服务 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程