沉沙
2018-10-10
来源 :
阅读 2174
评论 0
摘要:本篇教程探讨了大数据技术全面解读 流式计算之storm详解(三)集群相关进阶,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术全面解读 流式计算之storm详解(三)集群相关进阶,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一、集群提交任务流程分析
1.集群提交操作
参考:https://www.jianshu.com/p/6783f1ec2da0
2.任务分配与启动流程
参考:https://www.cns.com/heitaok/p/5531535.html
二、相关目录树
1.组件本地目录树
2.storm zk目录树
三、集群通信
Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架。
Worker进程内部通信:不同worker的thread通信使用LMAX Disruptor来完成。
不同topologey之间的通信,Storm不负责,需要自己想办法实现,例如使用kafka等;
通信图解:
相关博文参考://..net/bbaiggey/article/details/55510010?locationNum=10&fps=1
四、消息容错机制ack-fail
1.概述
l 在storm中,可靠的信息处理机制是从spout开始的。
l 一个提供了可靠的处理机制的spout需要记录他发射出去的tuple,当下游bolt处理tuple或者子tuple失败时spout能够重新发射。
l Storm通过调用Spout的nextTuple()发送一个tuple。为实现可靠的消息处理,首先要给每个发出的tuple带上唯一的ID,并且将ID作为参数传递给 SoputOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), msgId); messageid就是用来标示唯一的tuple的,而rootid是随机生成的
给每个tuple指定ID告诉Storm系统,无论处理成功还是失败,spout都要接收tuple树上所有节点返回的通知。
如果处理成功,spout的ack()方法将会对编号是 msgId的消息应答确认;如果处理失败或者超时,会调用fail()方法。
2.基本实现
Storm 系统中有一组叫做"acker"的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。
acker任务保存了spout id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为"ack val", 它是树中所有消息的随机id的异或计算结果。
ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。 每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了
要实现ack机制:
1,spout发射tuple的时候指定messageId
2,spout要重写BaseRichSpout的fail和ack方法
3,spout对发射的tuple进行缓存(否则spout的fail方法收到acker发来的messsageId,
spout也无法获取到发送失败的数据进行重发),看看系统提供的接口,
只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,
用户如何取得原来的msg貌似需要自己cache,然后用这个msgId去查询,太坑爹了
3,spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple可以选择重发。
4,设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);
3.代码示例
package ackfail;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
/**
* Created by maoxiangyi on 2016/4/25.
*/
public class MyAckFailTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout", new MySpout(), 1);
topologyBuilder.setBolt("mybolt1", new MyBolt1(), 1).shuffleGrouping("mySpout");
Config conf = new Config();
String name = MyAckFailTopology.class.getSimpleName();
if (args != null && args.length > 0) {
String nimbus = args[0];
conf.put(Config.NIMBUS_HOST, nimbus);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(name, conf, topologyBuilder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, topologyBuilder.createTopology());
Thread.sleep(60 * 60 * 1000);
cluster.shutdown();
}
}
}
package ackfail;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IControlSpout;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
* Created by maoxiangyi on 2016/4/25.
*/
public class MySpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random rand;
private Map
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
rand = new Random();
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
String[] sentences = new String[]{"the cow jumped over the moon",
"the cow jumped over the moon",
"the cow jumped over the moon",
"the cow jumped over the moon", "the cow jumped over the moon"};
String sentence = sentences[rand.nextInt(sentences.length)];
String messageId = UUID.randomUUID().toString().replace("-", "");
Values tuple = new Values(sentence);
collector.emit(tuple, messageId);
buffer.put(messageId,tuple);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void ack(Object msgId) {
System.out.println("消息处理成功,id= " + msgId);
buffer.remove(msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("消息处理失败,id= " + msgId);
Values tuple = buffer.get(msgId);
collector.emit(tuple,msgId);
}
}
package ackfail;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* Created by maoxiangyi on 2016/4/25.
*/
public class MyBolt1 extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(input, new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号