大数据技术学习之spark streaming快速入门
沉沙 2019-01-07 来源 : 阅读 248 评论 0

摘要:本篇教程探讨了大数据技术学习之spark streaming快速入门,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。

本篇教程探讨了大数据技术学习之spark streaming快速入门,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。

大数据技术学习之spark streaming快速入门

简介

  Spark Streaming是Spark核心API的扩展,可以实现可伸缩、高吞吐量、具备容错机制的实时流时数据的处理。支持多种数据源,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets。

  可以使用诸如map、reduce、join和window等高级函数进行复杂算法(比如,机器学习和图计算)的处理。最后还可以将处理结果存储到文件系统,数据库和仪表盘。

架构与抽象

抽象

  Spark Streaming接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。

  Spark Streaming提供了一个叫做DStream(discretized stream,离散流)的抽象概念,DStream由一系列的RDD组成,表示每个批次中连续的数据流。DStream可以从输入源(比如,Kafka、Flume、Kinesis等)中创建,也可以从其他DStream中使用高级算子操作转换生成。

  DStream的所有操作其实都是对DStream中所有RDD的操作。比如,在单词统计案例中,flatMap转化操作会应用到每个行RDD上来生成单词RDD。

架构

Receiver:Spark Streaming内置的数据流接收器或自定义接收器,用于从数据源接收源源不断的数据流。

CurrentBuffer:用于缓存输入流接收器接收的数据流。

BlockIntervalTimer:一个定时器,用于将CurrentBuffer中缓存的数据流封装为Block后放入blocksForPushing队列中。

BlocksForPushing:待处理的Block

BlockPushingThread:此线程每隔100毫秒从BlocksForPushing队列中取出一个Block存入存储系统,并缓存到ReceivedBlockQueue队列中。

Block Batch:Block批次,按照批次时间间隔,从ReceivedBlockQueue队列中获取一批Block。

JobGenerator:Job生成器,用于给每一批Block生成一个Job。

DStream 转化操作

  DStream转化操作分为无状态(stateless)和有状态(stateful)两种。

无状态转化操作中,每个批次的处理不依赖于之前批次的数据。

有状态转化操作需要使用之前批次的数据或中间结果来计算当前批次的数据。

无状态转化操作

  无状态转化操作就是把简单的RDD转化操作应用到每个批次上,转化DStream中的每个RDD。

常用的无状态转化操作

函数名称作用scala示例
map()对DStream中的每个元素应用指定函数,返回由各元素输出的元素组成的DStreamds.map(x => x+1)
flatMap()对DStream中的每个元素应用指定函数,返回由各元素输出的迭代器组成的DStreamds.flatMap(x => x.split(" "))
filter返回由给定DStream中通过筛选的元素组成的DStreamds.filter(x => x!=1)
repartition()改变DStream的分区数ds.repartition(10)
reduceByKey将每个批次中键相同的记录聚合ds.reduceByKey((x,y) => x+y)
groupByKey将每个批次中的记录根据键分组ds.groupByKey()

使用map()和reduceByKey()在每个时间区间中对日志根据IP地址进行计数。

//假设ApacheAccessingLog是用来从Apache日志中解析条目的工具类val accessLogDStream = logData.map(line => ApacheAccessingLog.parseFromLogLine(line))val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)val ipCountsDStream = ipDStream.reduceByKey((x,y) => x+y)
//假设ApacheAccessingLog是用来从Apache日志中解析条目的工具类static final class IpTuple implements PairFunction<ApacheAccessLog, String, Long> {    public Tuple2<String, Long> call(ApacheAccessLog log) {        return new Tuple2<>(log.getIpAddress(), 1L);    }}JavaDStream<ApacheAccessLog> accessLogDStream = logData.map(new ParseFromLogLine());JavaPairDStream<String, Long> ipDStream = accessLogDStream.mapToPair(new IpTuple());JavaPairDStream(String, Long) ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());

java

scala

以IP地址为键,将请求计数的数据和传输数据量的数据连接起来

val ipBytesDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize()))val ipBytesSumDStream = ipBytesDStream.reduceByKey((x,y) => x+y)val ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream)
JavaPairDStream<String, Long> ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple());JavaPairDStream<String, Long> ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer());JavaPairDStream<String, Tuple2<Long,Long>> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);

java

scala

使用transform()操作实现自定义转化操作,从日志记录中提取异常值。

val outlierDStream = accessLogsDStream.transform{    rdd => extractOutliers(rdd)}
JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform(    new Function<JavaPairRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() {        public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) {            return extractOutliers(rdd);        }    });

java

scala

有状态转化操作

  DStream的有状态转化操作是跨时间区间跟踪数据的操作,先前批次的数据也被用来在新的批次中计算结果。

  有状态转化操作主要有两种类型:滑动窗口和updateStateByKey()。前者以一个时间阶段为滑动窗口进行操作,后者用来跟踪每个键的状态变化。

设置检查点

  有状态转化操作需要在StreamingContext中打开检查点机制确保容错性。

ssc.checkpoint("hdfs://...")

基于窗口的转化操作

简介

  基于窗口的操作会在一个比StreamingContext批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

  基于窗口的转化操作需要两个参数,分别是窗口时长和滑动时长。两者都是批次间隔的整数倍。

窗口时长:控制每次计算最近的windowDuration/batchInterval个批次的数据。

滑动步长:默认值与批次间隔相等。用来控制对新DStream进行计算的时间间隔。

简单案例

使用window()对窗口进行计数

val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))val windowCounts = accessLogsWindow.count()
JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(Durations.seconds(30), Duration.seconds(10));JavaDStream<Integer> windowCounts = accessLogsWindow.count();

java

scala

使用reduceByKeyAndWindow对每个IP地址的访问量计数

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))val ipCountDStream = ipDStream.reduceByKeyAndWindow(    {(x,y) => x+y}, //加入新进入窗口的批次中的元素    {(x,y) => x-y}, //移除离开窗口的老批次中的元素    Seconds(30), //窗口时长    Seconds(10) //滑动步长)
class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> {    public Tuple2<String, Long> call(ApacheAccessLog entry) {        return new Tuple2(entry.getIpAddress(), 1L);    }}class AddLongs extends Function2<Long, Long, Long>() {    public Long call(Long v1, Long v2) {        return v1 + v2;    }}class SubtractLongs extends Function2<Long, Long, Long>() {    public Long call(Long v1, Long v2) {        return v1 - v2;    }}JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(new ExtractIp());JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.reduceByKeyAndWindow(    new AddLongs(), //加上新进入窗口的批次中的元素    new SubtractLongs(), //移除离开窗口的老批次中的元素    Durations.seconds(30), //窗口时长    Durations.seconds(10) //滑动步长)

java

scala

使用countByWindow和countByValueAndWindow对窗口计数

JavaDStream<String> ip = accessLogsDStream.map(new Function<ApacheAccessLog, String>() {   public String call(ApacheAccessLog entry) {        return entry.getIpAddress();   }});JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(Dirations.seconds(30), Durations.seconds(10));JavaPairDStream<String, Long> ipAddre***equestCount = ip.countByValueAndWindow(Dirations.seconds(30), Durations.seconds(10));

scala

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}val ipAddre***equestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

java

updateStateByKey转化操作

简介

  updateStateByKey提供了跨批次维护状态的功能,用于键值对形式的DStream。

  updateStateByKey提供了一个update(events, oldState)函数,接收与某键相关的事件及该键之前对应的状态,返回该键对应的新状态。

events:当前批次中收到的事件列表

oldState:一个可选的状态对象,存放在Option内;如果一个键没有之前的状态,这个值为空。

newState:由函数返回,也以Option形式存在;可以返回一个空的Option表示删除该状态。

简单案例

  使用updateStateByKey()跟踪日志消息中各HTTP响应代码的计数。

scala

def updateRunningSum(values: Seq[Long], state: Option[Long]) = {    Some(state.getOrElse(0L) + values.size)}val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L))val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)

java

class UpdateRunningSum implements Function2<List<Long>, Optional<Long>, Optional<Long>> {    public Optional<Long> call(List<Long> nums, Optional<Long> current) {        long sum = current.or(0L);        return Optional.of(sum + nums.size());    }};JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair(    new PairFunction<ApacheAccessLog, Integer, Long>() {        public Tuple2<Integer, Long> call(ApacheAccessLog log) {            return new Tuple2(log.getResponseCode(), 1L);        }    }).updateStateByKey(new UpdateRunningSum());

DStream 行动操作

  DStream行动操作同RDD的行动操作。比如,将DStream保存为SequenceFile文件。

scala

val writableIpAddre***equestCount = ipAddre***equestCount.map{    (ip, count) => <new Text(ip), new LongWritable(count))}writableIpAddre***equestCount.saveAsHadoopFiles[SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")}

java

JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(    new PairFunction<Tuple2<String, Long>, Text, LongWritable>() {        public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) {            return new Tuple2(new Text(e._1()), new LongWritable(e._2()));        }    });writableDStream.saveAsHadoopFiles("outputDir", "txt", Text.class, LongWritable.class, SequenceFileOutputFormat.class);


本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!


本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程