大数据技术 kafkaspot在ack机制下如何保证内存不溢
沉沙 2018-09-27 来源 : 阅读 2156 评论 0

摘要:本篇教程探讨了大数据技术 kafkaspot在ack机制下如何保证内存不溢,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 kafkaspot在ack机制下如何保证内存不溢,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

 
storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送;如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送。
 
但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为key。
public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;
    // key:messageId,Data
    private HashMap waitAck = new HashMap();
    private SpoutOutputCollector collector;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void nextTuple() {
        String sentence = "the cow jumped over the moon";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
        //指定messageId,开启ackfail机制
        collector.emit(new Values(sentence), messageId);
    }
    @Override
    public void ack(Object msgId) {
        System.out.println("消息处理成功:" + msgId);
        System.out.println("删除缓存中的数据...");
        waitAck.remove(msgId);
    }
    @Override
    public void fail(Object msgId) {
        System.out.println("消息处理失败:" + msgId);
        System.out.println("重新发送失败的信息...");
        //重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游
        collector.emit(new Values(waitAck.get(msgId)),msgId);
    }
}
 
那么kafkaspout会不会也是这样还保存这已发送未收到bolt响应的消息呢?如果这样,如果消息处理不断失败,不断重发,消息不断积累在kafkaspout节点上,kafkaspout端会不就会出现内存溢出?
 
其实并没有,回想kafka的原理,Kafka会为每一个consumergroup保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。也就是说,kafkaspot在消费kafka的数据是,通过offset读取到消息并发送给bolt后,kafkaspot只是保存者当前的offset值。
当失败或成功根据msgId查询offset值,然后再去kafka消费该数据来确保消息的重新发送。
 
那么虽然offset数据小,但是当offset的数据量上去了还是会内存溢出的?
其实并没有,kafkaspout发现缓存的数据超过限制了,会把某端的数据清理掉的。
 
 
kafkaspot中发送数据的代码
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
可以看到msgID里面包装了offset参数。
它不缓存已经发送出去的数据信息。
 
当他接收到来至bolt的响应后,会从接收到的msgId中得到offset。以下是从源码中折取的关键代码:
public void ack(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
          m.ack(id.offset);
     }
 }
 m.ack(id.offset);
 public void ack(Long offset) {
     _pending.remove(offset);//处理成功移除offset
     numberAcked++;
 }
 public void fail(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
         m.fail(id.offset);
      }
  }
  m.fail(id.offset);
  public void fail(Long offset) {
     failed.add(offset);//处理失败添加offset
        numberFailed++;
   }
    
    SortedSet _pending = new TreeSet();
    SortedSet failed = new TreeSet();
 
源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。
   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved