大数据技术 storm的StreamId特性
沉沙 2018-09-29 来源 : 阅读 1066 评论 0

摘要:本篇教程探讨了大数据技术 storm的StreamId特性,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 storm的StreamId特性,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

  序:StreamId是storm中实现DAG有向无环图的重要一个特性,但是从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,我们系统在迭代时会带来整体服务的不可用。
  StreamId是storm中实现DAG有向无环图的重要一个特性,官方也提供对应的接口实现让开发者自己灵活化构造自己的ADG图。但是从我这一年从事流式计算的工作中得到的结果也很尴尬的,很多人不知道storm的这一个特性,甚至某些数据中也没有提及。当然这也比较幸运,不知道这个特性就可以少踩点坑了。因为从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,为什么这么说,hey,hey,look dowm。

   

  实际开发中,很多人没有用streamid,其实只是没有显示指定罢了,默认streamid的名称为default,这也就是为什么消息可以由一个bolt发往另一个bolt了。我们自己显示指定streamid可以实现进入某一个bolt的消息,某些消息发给下游的Abolt,另一些消息发给下游的Bbolt。
  比如有这样一个需求砸向你的脸上,有很多其他系统的消息发送到kafka某一个主题中,现在用storm去kafka消费该主题,在bolt-业务这个节点进行消息类型的判断,然后根据判断将消息发送到不同的下游bolt进行处理以便将这些消息发往不同的渠道接口中。这样一个需求我们利用streamid很容易实现,看起来也没有什么问题。关于sreamid的使用可以文章末尾。

  为什么在实际生产我不建议这样使用,生产中经常会面对迭代开发的情况,业务不断的变化,你的代码也要不断的修改,第三方接口的变动,你也要不断的修改与第三方交互的程序。如果这周要修改bolt-微信,然后到发布的时候,你必须停掉整个拓扑任务这明显不是我们想要的,我们期望的是只停掉bolt-微信而不影响其他的业务线。这个时候就会发现这个实现方式很鸡肋的。那我们应该怎么做,看一下我在某信用卡中心的实现方案,看了后,你会替我庆幸我没有为了图前期的简单而采用显示streamid导致后面每该一处很小的功能导致整个拓扑任务不提供服务一段时间。

  我们的系统会收到交易信息,然后根据业务bolt进行处理,然后形成话术推送给不同的渠道bolt,这些渠道bolt对接各个部门(这些部门接受到我们的话术后,将话术推送给微信用户,支付宝用户等),而我们的对外渠道多大15个左右。同时应为业务的不断提出,以及对接部门接口的变化我们这些渠道bolt也要跟随变化。所以我们在业务bolt和渠道bolt中引入了第三方消息系统kafka队列,而不是用storm内部的Disruptor队列。这样原本一个拓扑任务,我们进行拆分为一个业务拓扑,以及多个其渠道拓扑,渠道拓扑与业务拓扑通信通过kafka的主题来协调。如果某一天我们要修改微信渠道的业务,我们只需要停掉微信拓扑即可,整个系统并不会受到影响,原本推送给微信渠道的消息也不会因此丢失它保存在kafka主题中,一旦微信拓扑上线即可立马消费掉。    
  后话,我这样说有点绝对了,具体看系统的情况来权衡。
  streamid在storm中的正确打开方式。

public class ProduceRecordSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;

    private SpoutOutputCollector collector;

    private String recordLines;
    private String type;

    public ProduceRecordSpout(String type, String lines) {
        this.type = type;
        recordLines = lines;
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void nextTuple() {
        Utils.sleep(5000);
        System.out.println("record is "+recordLines);
        List<Object> values = new Values(type, recordLines);
        collector.emit(values, values);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("type", "record"));
    }
}

public class DistributeByTypeBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple input) {
        String type = input.getString(0);
        String word = input.getString(1);
        switch (type) {
            case Type.NUMBER:
                collector.emit("stream-number-saver", input, new Values(type, word));
                collector.emit(input, new Values("other", "message coming"));
                break;
            case Type.STRING:
                collector.emit("stream-string-saver", input, new Values(type, word));
                collector.emit(input, new Values("other", "message coming"));
                break;
            default:
                collector.emit(input, new Values(type, word));
        }
        collector.ack(input);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("stream-number-saver", new Fields("type", "word"));
        declarer.declareStream("stream-string-saver", new Fields("type", "word"));
        declarer.declare(new Fields("type", "word"));
    }
}

public class SaveBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple input) {
        System.out.println("个人微信:intsmaze"+
                "SourceComponent=" + input.getSourceComponent() +
                ", SourceStreamId=" + input.getSourceStreamId() +
                ", type=" + input.getString(0) +
                ", value=" + input.getString(1));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

public class SaveDefaultBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple input) {
        System.out.println("个人微博:猥琐发育的码农"+
                "SourceComponent=" + input.getSourceComponent() +
                ", SourceStreamId=" + input.getSourceStreamId() +
                ", type=" + input.getString(0) +
                ", value=" + input.getString(1));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

public class SaveTwoBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple input) {
        System.out.println("文章链接://www.cnblogs.com/intsmaze/p/7283442.html"+
                "SourceComponent=" + input.getSourceComponent() +
                ", SourceStreamId=" + input.getSourceStreamId() +
                ", type=" + input.getString(0) +
                ", value=" + input.getString(1));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}


public class StreamTopologyMain {
    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout-number", new ProduceRecordSpout(Type.NUMBER,  "80966 31"), 1);
        builder.setSpout("spout-string", new ProduceRecordSpout(Type.STRING, "hello the word"), 1);

        builder.setBolt("bolt-distributor", new DistributeByTypeBolt(), 2)
                .shuffleGrouping("spout-number")
                .shuffleGrouping("spout-string");

        builder.setBolt("bolt-number-saver", new SaveBolt(), 1).shuffleGrouping("bolt-distributor", "stream-number-saver");
        builder.setBolt("bolt-string-saver", new SaveTwoBolt(), 1).shuffleGrouping("bolt-distributor", "stream-string-saver");
        builder.setBolt("bolt-default-saver", new SaveDefaultBolt(), 1).shuffleGrouping("bolt-distributor");

        Config conf = new Config();
        conf.setDebug(false);
        String name = StreamTopologyMain.class.getSimpleName();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, conf, builder.createTopology());
        Thread.sleep(60 * 60 * 1000);
        cluster.shutdown();
        
    }
}


interface Type {
    String NUMBER = "NUMBER";
    String STRING = "STRING";
}

     

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程