沉沙
2018-09-27
来源 :
阅读 1542
评论 0
摘要:本篇教程探讨了大数据技术 Mapreduce -- PageRank,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术 Mapreduce -- PageRank,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
PageRank 简单理解为网页排名,但是网页是根据什么排名的,接下来就简单介绍一下。
举例:
假设网页 A 的内容中有网页 B,C 和 D 的链接,并且 A 的 PageRank的值为0.25。
那接下里我们就可以计算在网页 A 中的其他网页的PageRank的值了。我们拿网页 B 来进行说明,
在网页 A 中的网页 B 的 PageRank 为 0.25 * (1/n) 其中n为网页 A 中网页链接数,结果则为 0.25*(1/3)。
可以简单理解为A的PageRank被B,C 和 D 平分了,B分到了0.25的三分之一。
然后将所有网页中的关于网页B的pagerank值求出来,就是网页B真实的pagerank了。
但是上面的例子没有考虑到如下的特殊情况:
1 网页A中只有指向自己的网页链接。
2 网页A中没有任何链接。
如果出现以上情况,会导致pagerank结果不准确。
所以出现了下面的公式:
result = sum * n + (1-n)/N
sum 为上面计算出来的,如网页B在所有网页中的pagerank值的总和。
n 可以理解为停留在当前网页继续进行网页跳转浏览的概率
1-n 可以理解为不访问当前网页的任何链接,从浏览器的地址栏输入,转去其他网页的概率。
N 为网页的数量
下面介绍通过MapReduce实现PageRank
简单的流程分析:
Map
取一行数据进行说明
A B C D
网页A中有网页B,C,D的链接;
刚开始给网页A一个默认的pagerank值,然后根据这个值计算其他网页链接在网页A中的PageRank。处理后的数据如下:
A 0.25 B C D
B 0.25*(1/3)
C 0.25*(1/3)
D 0.25*(1/3)
Reduce
然后通过 Reduce 计算出各个网页在其他网页中的 PageRank 的总和 sum,然后代入公式计算实际的PageRank,并更新 A 0.25 B C D 中的数据,这里将0.25更新为计算出来真实的 PageRank。
重复计算各个网页的 PageRank 的值,直到 PageRank 的结果收敛,值趋于稳定。
A 0.25 B C D =>A ? B C D
测试数据:
A B C D
B A D
C C
D B C
测试代码:
RunJob.java
1 import org.apache.hadoop.conf.Configuration;
2 import org.apache.hadoop.fs.FileSystem;
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapreduce.Job;
6 import org.apache.hadoop.mapreduce.Mapper;
7 import org.apache.hadoop.mapreduce.Reducer;
8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
9 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11
12 import java.io.IOException;
13
14 /**
15 * Created by Edward on 2016/7/13.
16 */
17 public class RunJob {
18
19 public static enum ValueEnum{
20 CLOSURE_VALUE;
21 }
22
23 public static void main(String[] args)
24 {
25
26 //access hdfs's user
27 System.setProperty("HADOOP_USER_NAME","root");
28
29 Configuration conf = new Configuration();
30 conf.set("fs.defaultFS", "hdfs://node1:8020");
31
32 try {
33 int i = 0;
34 while(true) {
35 i++;
36 conf.setInt("count", i);
37
38 FileSystem fs = FileSystem.get(conf);
39 Job job = Job.getInstance(conf);
40 job.setJarByClass(RunJob.class);
41 job.setMapperClass(MyMapper.class);
42 job.setReducerClass(MyReducer.class);
43
44 //需要指定 map out 的 key 和 value
45 job.setOutputKeyClass(Text.class);
46 job.setOutputValueClass(Text.class);
47
48 //设置输入类的
49 job.setInputFormatClass(KeyValueTextInputFormat.class);
50 if(i==1)
51 FileInputFormat.addInputPath(job, new Path("/test/pagerank/input"));
52 else
53 FileInputFormat.addInputPath(job, new Path("/test/pagerank/output/pr"+(i-1)));
54
55 Path path = new Path("/test/pagerank/output/pr"+i);
56 if (fs.exists(path))//如果目录存在,则删除目录
57 {
58 fs.delete(path, true);
59 }
60 FileOutputFormat.setOutputPath(job, path);
61
62 boolean b = job.waitForCompletion(true);
63 if (b) {
64 long closure =job.getCounters().findCounter(ValueEnum.CLOSURE_VALUE).getValue();
65 double avg= closure/4000.0;//计算收敛的平均值,浮动小于0.001则认为收敛
66 System.out.println("执行第"+i+"次, closure="+closure+",avg="+avg);
67 if(avg < 0.001){
68 System.out.println("总共执行了"+i+"次,之后收敛");
69 break;
70 }
71 }
72 }
73
74 } catch (Exception e) {
75 e.printStackTrace();
76 }
77 }
78
79
80 public static class MyMapper extends Mapper
81 @Override
82 protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
83 AdjacentNodes adjacentNodes = new AdjacentNodes();
84 int count = context.getConfiguration().getInt("count", 1);
85
86 if(count == 1)
87 {
88 AdjacentNodes firstAdj = new AdjacentNodes();
89 firstAdj.setValue(1.0);
90 firstAdj.setNodes(value.toString().split("\t"));
91 adjacentNodes = firstAdj;
92 }
93 else
94 {
95 //格式化 value: 1.0 B C D
96 adjacentNodes.formatInfo(value.toString());
97 }
98 //A 1.0 B C D
99 context.write(key, new Text(adjacentNodes.toString()));
100
101 double pagerank = adjacentNodes.getValue()/adjacentNodes.getNum();
102 for(int i=0; i<adjacentNodes.getNum(); i++)
103 {
104 String node = adjacentNodes.getNodes()[i];
105 //B 0.333
106 context.write(new Text(node), new Text(pagerank+""));
107 }
108 }
109 }
110
111 public static class MyReducer extends Reducer
112 @Override
113 protected void reduce(Text key, Iterable
114
115 AdjacentNodes adjacentNodes = new AdjacentNodes();
116
117 Double sum = 0.0;
118 for(Text adj : values)
119 {
120 String str = adj.toString();
121 if(str.split("\t").length>1)
122 {
123 adjacentNodes.formatInfo(str);
124 }
125 else{
126 sum += Double.parseDouble(str); //对节点的 pagerank 求和,
127 }
128 }
129
130 //计算pagerank
131 double n = 0.80;
132 double pagerank = sum * n + (1-n)/4.0;// 计算pagerank 公式 sum * n + (1-n)/N
133
134 //计算收敛的差值
135 int closure =(int)(Math.abs(pagerank - adjacentNodes.getValue()) * 1000);
136 //通过context.getCounter(ENUM)方法,每次执行reduce将closure的值进行累加,结果传递给主函数,
137
138 context.getCounter(ValueEnum.CLOSURE_VALUE).increment(closure);
139
140 adjacentNodes.setValue(pagerank);
141 context.write(key, new Text(adjacentNodes.toString()));
142 }
143 }
144 }
AdjacentNodes.java
/**
* Created by Edward on 2016/7/14.
*/
public class AdjacentNodes {
double value = 0.0;
int num = 0;
String[] nodes = null;
public void formatInfo(String str)
{
String[] val = str.split("\t");
this.setValue(Double.parseDouble(val[0]));
this.setNum(val.length-1);
if(this.num != 0)
this.nodes = new String[this.num];
for(int i=1; i<val.length; i++)
{
this.nodes[i-1] = val[i];
}
}
public boolean isEmpty()
{
if(this.num == 0)
return true;
else
return false;
}
public void setNodes(String[] nodes) {
this.nodes = nodes;
this.num = nodes.length;
}
public void setNum(int num) {
this.num = num;
}
public void setValue(double value) {
this.value = value;
}
public double getValue() {
return value;
}
public int getNum() {
return num;
}
public String[] getNodes() {
return nodes;
}
@Override
public String toString() {
StringBuffer stringBuffer = new StringBuffer(this.value+"");
for(int i=0; i<this.num; i++)
{
stringBuffer.append("\t"+this.nodes[i]);
}
return stringBuffer.toString();
}
}
结果数据:
A 0.10135294176208584 B C D
B 0.12838069628609527 A D
C 0.6560527651143326 C
D 0.12838069628609527 B C
总共执行了24次,之后收敛;
PageRank值越高,越值得推荐。
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号