沉沙
2018-10-08
来源 :
阅读 1658
评论 0
摘要:本篇教程探讨了大数据技术学习笔记(31)SparkStreaming详细介绍,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术学习笔记(31)SparkStreaming详细介绍,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
Spark Streaming: Spark用于处理流式数据的模块,类似Storm
核心:DStream(离散流),就是一个RDD
============================================
一、Spark Streaming基础
1、什么是Spark Streaming?
(*)Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
(*)常见的流式处理框架
(1)Apache Storm
(2)Spark Streaming
(3)JStorm:阿里巴巴
(4)Flink:可以很好的管理内存
(*)离线计算和流式计算各自的特点
典型代表 数据的采集 数据源(结果)
离线计算: MR、Spark Core Sqoop 批量操作
流式计算: Storm等等 Flume(Kafka) 实时性
(*)典型的流式计算的框架:参考Hadoop的课件:P91
2、简介Spark Streaming内部结构
3、演示Demo:NetworkWordCount 处理的是流式数据
(*)工具:netcat
(*)文档://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
(*)步骤:启动两个窗口
第一个窗口中:
bin/run-example streaming.NetworkWordCount bigdata11 9999
第二个窗口中:启动消息服务器(先启动)
nc -l -p 9999
注意:如果要演示成功,保证虚拟机的CPU的核数至少2以上
运行:
4、开发自己的NetworkWordCount程序
复制代码
1 package main.scala.demo
2
3 import org.apache.spark.SparkConf
4 import org.apache.spark.storage.StorageLevel
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6
7 /**
8 * Created by YOGA on 2018/2/27.
9 */
10 object MyNetworkWordCount {
11 def main(args: Array[String]) {
12 //核心:通过StreamingContext对象,去创建一个DStream
13 //DStream从外部接收数据(使用的是Linux上的netcat工具)
14
15 //创建一个SparkConf对象
16 //local[2]:相当于有两个工作线程,一个接收一个发送
17 val sparkconf = new SparkConf()
18 .setAppName("MyNetworkWordCount")
19 .setMaster("local[2]")
20
21 //创建StreamContext,表示每隔三秒采集一次数据
22 val ssc = new StreamingContext(sparkconf,Seconds(3))
23
24 //创建DStream,看成一个输入流
25 //IP,端口,缓存到硬盘
26
27 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
28
29 //执行WordCount
30 val words = lines.flatMap(_.split(" "))
31
32 //使用transform完成同样的计数,相当于map操作
33 //val wordPair = words.transform(x=>x.map(x=>(x,1)))
34 //val wordCount = wordPair.reduceByKey(_+_)
35 val wordCount = words.map((_,1)).reduceByKey(_+_)
36
37 /*
38 * 参数一:执行运算
39 * 参数二:窗口的大小
40 * 参数三:创建滑动的距离
41 *
42 * 例子:每9秒钟,把过去30秒的数据进行wordcount
43 * 注意:第二个参数 第三个参数 必须是采样频率的整数倍
44 * */
45 //val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9))
46 //输出
47 wordCount.print()
48
49 //启动StreamingContext
50 ssc.start()
51
52 //等待计算完成
53 ssc.awaitTermination()
54 }
55
56 }
复制代码
二、Spark Streaming进阶
bin/spark-shell --master spark://bigdata11:7077
1、类:StreamingContext(类似:Spark Context、SQLContext)
上下文对象
创建的方式:
(1)通过SparkConf来创建
val sparkconf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//创建StreamingContext,表示每隔3秒采集一次数据
val ssc = new StreamingContext(sparkconf,Seconds(3))
(2)通过SparkContext对象来创建
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc,Seconds(3))
说明:
(1)setMaster("local[2]")
(2)当创建StreamingContext对象,内部会创建一个SparkContext对象
(3)当StreamingContext开始执行,不能添加新的任务
(4)同一个时刻上,JVM只能有一个活动的StreamingContext
2、DStream(离散流):把连续的数据流,变成不连续的离散流,表现形式就是RDD
简单来说:把连续的变成不连续的
操作:Transformation和Action
(*)transform(func)
通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
改写上面WordCount例子,屏蔽35行
//使用transform完成同样的计数,相当于map操作
33 val wordPair = words.transform(x=>x.map(x=>(x,1)))
34 val wordCount = wordPair.reduceByKey(_+_)
(*)updateStateByKey(func)
可以进行累加操作。方法:设置检查点,定义一个累加功能的函数
复制代码
1 package main.scala.demo
2
3 import org.apache.spark.SparkConf
4 import org.apache.spark.storage.StorageLevel
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6
7 /**
8 * Created by YOGA on 2018/2/28.
9 */
10 object MyTotalNetworkWordCount {
11 def main(args: Array[String]) {
12 val sparkconf = new SparkConf()
13 .setAppName("MyNetworkWordCount")
14 .setMaster("local[2]")
15
16 //创建StreamContext,表示每隔三秒采集一次数据
17 val ssc = new StreamingContext(sparkconf,Seconds(3))
18
19 //注意:如果累计,在执行计算的时候,需要保持之前的状态信息
20 //设置检查点
21 ssc.checkpoint("hdfs://192.168.153.11:9000/spark/checkpoint0228")
22
23 //创建DStream,看成一个输入流
24 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
25
26 //执行WordCount
27 val words = lines.flatMap(_.split(" "))
28
29 //每个单词记一次数
30 val pairs = words.map((_,1))
31
32 //定义一个函数,进行累加
33 //参数:1、当前的值 2、之前的值
34 val addFunc = (currentValues:Seq[Int],preValues:Option[Int]) =>{
35 //得到当前的值
36 val currentCount = currentValues.sum
37
38 //先得到之前的值
39 val preCount = preValues.getOrElse(0)
40
41 //返回累加结果
42 Some(currentCount + preCount)
43 }
44
45 //统计每个单词出现的频率:累计
46 val totalCount = pairs.updateStateByKey(addFunc)
47 totalCount.print()
48
49 //启动任务
50 ssc.start()
51 ssc.awaitTermination()
52
53 }
54 }
复制代码
3、窗口操作
例子:每9秒钟,把过去30秒的数据进行WordCount
注释上面的代码35行,放开下面一行代码
复制代码
/*
38 * 参数一:执行运算
39 * 参数二:窗口的大小
40 * 参数三:创建滑动的距离
41 *
42 * 例子:每9秒钟,把过去30秒的数据进行wordcount
43 * 注意:第二个参数 第三个参数 必须是采样频率的整数倍,采样频率3s
44 * */
45 val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9))
复制代码
4、输入和输出
(1)输入:接收器接收外部数据源的数据
(*)基本数据源:文件流、RDD队列流、Socket流
(*)高级数据源:Kafka、Flume
文件流:监听一个目录,当目录下的文件发生变化的时候,将变化的数据读入DStream
复制代码
package main.scala.demo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by YOGA on 2018/2/28.
*/
object MyFileDStream {
def main(args: Array[String]) {
//创建一个SparkConf对象
//local[2]:相当于有两个工作线程,一个接收一个发送
val sparkconf = new SparkConf()
.setAppName("MyNetworkWordCount")
.setMaster("local[2]")
//创建StreamContext,表示每隔三秒采集一次数据
val ssc = new StreamingContext(sparkconf,Seconds(3))
//监听一个目录,当目录下的文件发生变化的时候,将变化的数据读入DStream
val lines = ssc.textFileStream("D:\\temp\\aaa")
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
复制代码
RDD队列流queueStream
:定义一个for循环,生成RDD放入队列
复制代码
package main.scala.demo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDD
/**
* Created by YOGA on 2018/2/28.
*/
object MyRDDQueueDStream {
def main(args: Array[String]){
val sparkconf = new SparkConf()
.setAppName("MyNetworkWordCount")
.setMaster("local[2]")
//创建StreamContext,表示每隔三秒采集一次数据
val ssc = new StreamingContext(sparkconf,Seconds(3))
//创建一个队列,把生成RDD放入队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//初始化
for(i <- 1 to 3){
rddQueue += ssc.sparkContext.makeRDD(1 to 10)
//让线程睡几秒
Thread.sleep(3000)
}
//创建一个RDD的DStream
val inputStream = ssc.queueStream(rddQueue)
//处理:乘以10
val result = inputStream.map(x=> (x,x*10))
result.print()
ssc.start()
ssc.awaitTermination()
}
}
复制代码
运行:
(2)输出操作
5、集成DataFrame和SQL: 使用SparkSQL的方式处理流式数据
把RDD转换成DataFrame,并生成临时表,然后就可以进行SQL查询
复制代码
1 package main.scala.demo
2
3 import org.apache.spark.SparkConf
4 import org.apache.spark.sql.SparkSession
5 import org.apache.spark.storage.StorageLevel
6 import org.apache.spark.streaming.{Seconds, StreamingContext}
7
8 /**
9 * Created by YOGA on 2018/2/28.
10 */
11 object MyNetWorkWordCountBySQL {
12 def main(args: Array[String]) {
13 //核心:通过StreamingContext对象,去创建一个DStream
14 //DStream从外部接收数据(使用的是Linux上的netcat工具)
15
16 //创建一个SparkConf对象
17 //local[2]:相当于有两个工作线程,一个接收一个发送
18 val sparkconf = new SparkConf()
19 .setAppName("MyNetworkWordCount")
20 .setMaster("local[2]")
21
22 //创建StreamContext,表示每隔三秒采集一次数据
23 val ssc = new StreamingContext(sparkconf,Seconds(3))
24
25 //创建DStream,看成一个输入流
26 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
27
28 //得到的所有单词
29 val words = lines.flatMap(_.split(" "))
30 //val wordPair = words.transform(x=> x.map(x=>(x,1)))
31 //val wordCount = wordPair.reduceByKey(_+_)
32
33 //使用sparkSQL处理Spark Streaming的数据
34 words.foreachRDD(rdd =>{
35 //使用SparkSession来创建
36 val spark = SparkSession.builder()
37 .config(rdd.sparkContext.getConf)
38 .getOrCreate()
39
40 //需要把RDD转成一个DataFrame
41 import spark.implicits._
42 val wordCountDF = rdd.toDF("word")
43
44 //注册成一个表
45 wordCountDF.createOrReplaceTempView("words")
46
47 //执行SQL
48 val result = spark.sql("select * from words group by word")
49 result.show()
50
51 Thread.sleep(5000)
52 })
53
54
55 //启动StreamingContext
56 ssc.start()
57
58 //等待计算完成
59 ssc.awaitTermination()
60 }
61 }
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号