沉沙
2018-10-08
来源 :
阅读 2071
评论 0
摘要:本篇教程探讨了大数据技术学习笔记(18)Pig的自定义函数,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
本篇教程探讨了大数据技术学习笔记(18)Pig的自定义函数,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。
<
Pig的自定义函数有三种:
1、自定义过滤函数:相当于where条件
2、自定义运算函数:
3、自定义加载函数:使用load语句加载数据,生成一个bag
默认:一行解析成一个Tuple
需要MR的jar包
复制代码
一.自定义过滤函数
复制代码
package demo.pig;
import java.io.IOException;
import org.apache.pig.FilterFunc;
import org.apache.pig.data.Tuple;
//实现自定义的过滤函数,实现:查询过滤薪水大于2000的员工
public class IsSalaryTooHigh extends FilterFunc{
@Override
public Boolean exec(Tuple tuple) throws IOException {
/*参数tuple:调用的时候 传递的参数
*
* 在PigLatin调用
* myresult1 = filter emp by demo.pig.IsSalaryTooHigh(sal)
*/
//取出薪水
int sal = (int) tuple.get(0);
return sal>2000?true:false;
}
}
复制代码
二.自定义运算函数
复制代码
package demo.pig;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
//根据员工的薪水判断级别
public class CheckSalaryGrade extends EvalFunc<String>{
@Override
public String exec(Tuple tuple) throws IOException {
// myresult2 = foreach emp generate ename,sal,demo.pig.CheckSalaryGrade(sal);
int sal = (int)tuple.get(0);
if(sal<1000) return "Grade A";
else if(sal>=1000 && sal<3000) return "Grade B";
else return "Grade C";
}
}
复制代码
三.自定义加载函数
复制代码
package demo.pig;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class MyLoadFunc extends LoadFunc{
//定义一个变量保存输入流
private RecordReader reader ;
@Override
public InputFormat getInputFormat() throws IOException {
// 输入数据的格式:字符串
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// 从输入流读取一行,如何解析生成返回的tuple
//数据:I love Beijing
Tuple result = null;
try{
//判断是否读入了数据
if(!this.reader.nextKeyValue()){
//没有数据
return result; //----> 是nullֵ
}
//数据:I love Beijing
String data = this.reader.getCurrentValue().toString();
//生成返回的结果:Tuple
result = TupleFactory.getInstance().newTuple();
//分词
String[] words = data.split(" ");
//每一个单词单独生成一个tuple,再把tuple放入bag中
//再把这个bag放入result中
//创建一个表
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(String w:words){
//为每个单词生成一个tuple
Tuple aTuple = TupleFactory.getInstance().newTuple();
aTuple.append(w); //将单词放到tuple中
//把这些tuple放入一个bag中
bag.add(aTuple);
}
//把bag放入result
result.append(bag);
}catch(Exception ex){
ex.printStackTrace();
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
// RecordReader reader:代表HDFS输入流
this.reader = reader;
}
@Override
public void setLocation(String path, Job job) throws IOException {
// 从HDFS输入的路径
FileInputFormat.setInputPaths(job, new Path(path));
}
}
复制代码
注册jar包: register define
register /root/temp/p1.jar
myresult3 = load '/input/data.txt' using demo.pig.MyLoadFunc();
定义别名:define myload demo.pig.MyLoadFunc;
本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号