沉沙
2018-10-10
来源 :
阅读 2316
评论 0
摘要:本篇教程探讨了大数据技术大数据技术全面解读 spark(三)自定义分区、排序与查找,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术大数据技术全面解读 spark(三)自定义分区、排序与查找,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
一、自定义分区
1.概述
默认的是Hash的分区策略,这点和Hadoop是类似的
2.实现
package cn.itcast.spark.day3
import java.net.URL
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* Created by root on 2016/5/18.
*/
object UrlCountPartition {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("UrlCountPartition").setMaster("local[2]")
val sc = new SparkContext(conf)
//rdd1将数据切分,元组中放的是(URL, 1)
val rdd1 = sc.textFile("c://itcast.log").map(line => {
val f = line.split("\t")
(f(1), 1)
})
val rdd2 = rdd1.reduceByKey(_ + _)
val rdd3 = rdd2.map(t => {
val url = t._1
val host = new URL(url).getHost
(host, (url, t._2))
})
val ints = rdd3.map(_._1).distinct().collect()
val hostParitioner = new HostParitioner(ints)
// val rdd4 = rdd3.partitionBy(new HashPartitioner(ints.length))
val rdd4 = rdd3.partitionBy(hostParitioner).mapPartitions(it => {
it.toList.sortBy(_._2._2).reverse.take(2).iterator
})
rdd4.saveAsTextFile("c://out4")
//println(rdd4.collect().toBuffer)
sc.stop()
}
}
/**
* 决定了数据到哪个分区里面
* @param ins
*/
class HostParitioner(ins: Array[String]) extends Partitioner {
val parMap = new mutable.HashMap[String, Int]()
var count = 0
for(i <- ins){
parMap += (i -> count)
count += 1
}
override def numPartitions: Int = ins.length
override def getPartition(key: Any): Int = {
parMap.getOrElse(key.toString, 0)
}
}
// 与Hadoop相通,不再赘述
二、自定义排序
基本上就是结合之前的隐式转换了:(这里使用样例类可以不用new就能得到实例,另外也可以用于模式匹配)
package cn.itcast.spark.day3
import org.apache.spark.{SparkConf, SparkContext}
object OrderContext {
implicit val girlOrdering = new Ordering[Girl] {
override def compare(x: Girl, y: Girl): Int = {
if(x.faceValue > y.faceValue) 1
else if (x.faceValue == y.faceValue) {
if(x.age > y.age) -1 else 1
} else -1
}
}
}
/**
* Created by root on 2016/5/18.
*/
//sort =>规则 先按faveValue,比较年龄
//name,faveValue,age
object CustomSort {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("CustomSort").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2),("JuJingYi", 95, 22, 3)))
import OrderContext._
val rdd2 = rdd1.sortBy(x => Girl(x._2, x._3), false)
println(rdd2.collect().toBuffer)
sc.stop()
}
}
/**
* 第一种方式
* @param faceValue
* @param age
case class Girl(val faceValue: Int, val age: Int) extends Ordered[Girl] with Serializable {
override def compare(that: Girl): Int = {
if(this.faceValue == that.faceValue) {
that.age - this.age
} else {
this.faceValue -that.faceValue
}
}
}
*/
/**
* 第二种,通过隐式转换完成排序
* @param faceValue
* @param age
*/
case class Girl(faceValue: Int, age: Int) extends Serializable
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号