沉沙
2019-01-14
来源 :
阅读 1601
评论 0
摘要:本篇教程探讨了大数据技术之Spark入门Core(1),希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
本篇教程探讨了大数据技术之Spark入门Core(1),希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。

一:spark 简介
1.1 spark 的来源
Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
1.2 spark 的生态环境
1.3 spark 与hadoop的 mapreduce 对比
MapReduce Hive Storm Mahout GriphSpark Core Spark SQL Spark Streaming Spark ML Spark GraphX Spark R
1.4 spark 可以运行在什么地方
Spark Application运行everywhere local、yarn、memsos、standalon、ec2 .....
二 spark的安装与配置
2.1 配置好hadoop的环境安装scala-2.10.4.tgz
tar -zxvf scala-2.10.4.tgz /opt/modulesvim /etc/profile export JAVA_HOME=/opt/modules/jdk1.7.0_67export HADOOP_HOME=/opt/modules/hadoop-2.5.0-cdh5.3.6export SCALA_HOME=/opt/modules/scala-2.10.4export SPARK_HOME=/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin
2.2 安装 spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz
tar -zxvf spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz mv spark-1.6.1-bin-2.5.0-cdh5.3.6 /opt/modules cd /opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/conf cp -p spark-env.sh.template spark-env.sh cp -p log4j.properties.template log4j.properties vim spark-env.sh 增加:JAVA_HOME=/opt/modules/jdk1.7.0_67SCALA_HOME=/opt/modules/scala-2.10.4HADOOP_CONF_DIR=/opt/modules/hadoop-2.5.0-cdh5.3.6/etc/hadoop
2.3 spark 命令执行与调用
执行spark 命令 bin/spark-shell
2.4 运行测试文件:
hdfs dfs -mkdir /input hdfs dfs -put READ.md /input
2.4.1 执行统计
scala> val rdd = sc.textFile("/input/README.md")
rdd.count (统计多少行)rdd.first (统计第一行)rdd.filter(line => line.contains("Spark")).count (统计存在Spark的字符的有多少行)
scala> rdd.map(line => line.split(" ").size).reduce(_ + _)
三: spark 的wordcount统计
3.1 spark 的wc统计
val rdd=sc.textFile("/input") ####rdd 读文件rdd.collect ###rdd 显示文件的内容 rdd.count ####rdd 显示有多少行数据
3.2 spark 处理数据三步骤
input scala> val rdd =sc.textFile("/input") ####(输入数据) processval WordCountRDD = rdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(( a , b ) => ( a + b )) ######(处理数据) 简写: val WordCountRDD = rdd.flatMap(_.split(" ")).map(_,1)).reduceByKey(_ + _) outputscala> WordCountRDD.saveAsTextFile("/output3")scala> WordCountRDD.collect
四、spark 处理数据:
4.1 spark的数据统计
spark 处理pageview 数据:hdfs dfs -mkdir /pagehdfs dfs -put page_views.data /page 读取数据:val rdd = sc.textFile("/page")处理数据: val PageRdd = rdd.map(line => line.split("\t")).map(arr => (arr(2), 1)).reduceByKey(_ + _) 取数据的前十条数据:PageRdd.take(10);
将数据放入内存:rdd.cacherdd.count rdd.map(line => line.split("\t")).map(arr => (arr(2), 1)).reduceByKey(_ + _).take(10)
五:spark 的Application
5.1 spark 的运行模式
spark 的application -1. Yarn 目前最多 -2. standalone 自身分布式资源管理管理和任务调度 -3 Mesos hadoop 2.x release 2.2.0 2013/10/15 hadoop 2.0.x - al cloudera 2.1.x -bete cdh3.x - 0.20.2 cdh4.x - 2.0.0 hdfs -> HA: QJM : Federation Cloudera Manager 4.x cdh5.x
5.2 spark 的 Standalone mode
Spark 本身知道的一个分布式资源管理系列以及任务调度框架类似于 Yarn 这样的框架 分布式 主节点 Master - ResourceManager 从节点: work -> nodemanager 打开 spark-env.sh 最后增加:SPARK_MASTER_IP=192.168.3.1SPARK_MASTER_PORT=7077SPARK_MASTER_WEBUI_PORT=8080SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=2gSPARK_WORKER_PORT=7078SPARK_WORKER_WEBUI_PORT=8081SPARK_WORKER_INSTANCES=1 ## 每台机器可以运行几个work cd /soft/spark/conf cp -p slaves.template slaves echo "flyfish01.yangyang.com" > slaves ------启动spark cd /soft/spark/sbinstart-slaves.sh 启动所有的从节点,也就是work节点 注意: 使用此命名,运行此命令机器,必须要配置与主节点的无密钥登录,否则启动时时候会出现一些问题,比如说输入密码之类的。./start-master.sh./start-slaves.sh
job 运行在standalone 上面bin/spark-shell --master spark://192.168.3.1:7077
5.3 standalone 上面运行
读取数据:val rdd = sc.textFile("/page")处理数据: val PageRdd = rdd.map(line => line.split("\t")).map(arr => (arr(2), 1)).reduceByKey(_ + _) 取数据的前十条数据:PageRdd.take(10);
5.4 对于一个spark application 两个部分组成
- 1、 Driver program -> 4040 4041 4042 main 方法 SparkContext -- 最最重要 - 2、Executor 资源 一个 jvm (进程) 运行我们的job的task REPL: shell 交互式命令 spark Application job -01 count job -02 stage-01 task-01 (线程) -> map task (进程) task-02 (线程) -> map task (进程) 每个stage 中的所有的task,业务都是相同的,处理的数据不同 stage -02 job -03 从上述运行的程序来看: 如果RDD 调用的函数,返回值不是RDD的时候,就会触发一个job 进行执行 思考: reduceByKey 到底做了什么事情: -1. 分组 将相同的key 的value 进行合并 -2.对value 进行reduce 进行合并经分析,对比mapreduce 中的worldcount 程序运行,推断出spark job 中 stage 的划分依据RDD 之间否产生shuffle 进行划分
倒序查询:val rdd = sc.textFile("/input")val WordContRdd = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)val sortRdd = WordContRdd.map(tuple => (tuple._2, tuple._1)).sortByKey(false)sortRdd.collectsortRdd.take(3)sortRdd.take(3).map(tuple => (tuple._2, tuple._1))
scala 的隐式转换: 隐式转换: 将某个类型转换为另外一个类型。 隐式函数 implicit def
5.5 在企业中开发spark的任务
如何开发spark applicationspark-shell + idea -1, 在idea 中编写代码-2,在spark-shell 中执行代码-3. 使用IDEA 将代码打包成jar包,使用bin/spark-submint 提交运行
5.6 spark 在Linux下面的idea 编程 10万条数据取前10条
package com.ibeifeng.bigdata.senior.coreimport org.apache.spark.SparkConfimport org.apache.spark.SparkContext/** * Created by root on 17-11-2. * * Driver Program * */object SparkApp { def main(args: Array[String]) { // step0: sSparkContext val sparkConf = new SparkConf() .setAppName("SparkApplication") .setMaster("local[2]") // create SparkContext val sc = new SparkContext(sparkConf) //**=========================================*/ //step 1: input data val rdd = sc.textFile("/page/page_views.data") //step 2: process data val pageWordRddTop10 = rdd .map(line => line.split("\t")) .map(x => (x(2),1)) .reduceByKey(_ + _) .map(tuple => (tuple. _2, tuple._1)) .sortByKey(false) .take(10) //Step 3 : output data pageWordRddTop10.foreach(println(_)) //**=========================================*/ //close spark sc.stop() }}
5.7 将代码打包成一个jar包运行
5.8 spark 提交任务
5.8.1 运行在local
bin/spark-submint Scala_Project.jar
5.8.2 运行在standalone
启动spark 的standalone bin/start-master.shbin/start-slave2.sh
bin/spark-submit --master spark://192.168.3.1:7077 Scala_Project.jar
5.7 spark 的historyserver配置
spark 监控运行完成的spark application 分为两个部分:第一: 设置sparkApplication 在运行时,需要记录日志信息第二: 启动historyserver 通过界面查看------配置historyserver cd /soft/spark/confcp -p spark-defaults.conf.template spark-defaults.confvim defaults.conf spark.master spark://192.168.3.1:7077spark.eventLog.enabled truespark.eventLog.dir hdfs://192.168.3.1:8020/SparkJobLogsspark.eventLog.compress true
启动spark-shellbin/spark-shell
bin/spark-submit --master spark://192.168.3.1:7077 Scala_Project.jar
配置spark的服务端historyserver vim spark-env.shSPARK_MASTER_IP=192.168.3.1SPARK_MASTER_PORT=7077SPARK_MASTER_WEBUI_PORT=8080SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=2gSPARK_WORKER_PORT=7078SPARK_WORKER_WEBUI_PORT=8081SPARK_WORKER_INSTANCES=1 ## 每台机器可以运行几个work----#增加SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://flyfish01.yangyang.com:8020/SparkJobLogs -Dspark.history.fs.cleaner.enabled=true"-------------#启动historyserver cd /soft/spark sbin/start-history-server.sh
六: spark 的日志分析
需求一:The average, min, and max content size of responses returned from the server. ContentSize需求二:A count of response code's returned. responseCode需求三:All IPAddresses that have accessed this server more than N times. ipAddresses需求四:The top endpoints requested by count. endPoint
6.1 maven 创建工程:
6.1.1 使用命令行创建
mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=//scala-tools.org/repo-releases -DgroupId=com.ibeifeng.bigdata.spark.app -DartifactId=log-analyzer -Dversion=1.0
6.1.2 导入工程
<img style="cursor: pointer;"
本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号