博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop MapReduce 处理2表join编程案例
阅读量:7172 次
发布时间:2019-06-29

本文共 14390 字,大约阅读时间需要 47 分钟。

hot3.png

假设文件1(表1)结构(hdfs文件名:t_user.txt):

1 wangming 男 计算机

2 hanmei 男 机械

3 lilei 女 法学

4 hanmeixiu 女 通信

5 chenyangxiu 男 设计

6 yangxiuping 男 英语

文件2(表2)结构(hdfs文件名:t_user_card.txt):

1 wangming 360781100207230023

2 hanmei 362781100207300033

3 lilei 36201100207100033

4 hanmeixiu 362202199697652519

5 chenyangxiu 363654678906542785

6 yangxiuping 360876187618971008

7 machao 370875468820186543

现在使用mapreduce使得表1和表2用姓名进行join,使得用户身份证号也展示出来

简述思路:

编程思路:

* 在map阶段会分别读取filePath = /xxx/xxx/t_user.txt的文件和

* filePath = /xxx/xxx/t_user_card.txt的文件, 读取2个不同文件会有不同的filePath

* 先把joinbean定义好, 读取不同的文件的时候,set进对应的属性值

* 然后把连接字段作为map阶段的key输出

* 使得JoinBean在Reduce阶段自动聚合成Iterable<JoinBean>

代码如下:

package com.chenjun.MRstudy.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class MrJoinTest extends Configured implements Tool {	/**	 * 编程思路:	 * 在map阶段会分别读取filePath = /xxx/xxx/t_user.txt的文件和	 * filePath = /xxx/xxx/t_user_card.txt的文件, 读取2个不同文件会有不同的filePath	 * 先把joinbean定义好, 读取不同的文件的时候,set进对应的属性值	 * 然后把连接字段作为map阶段的key输出	 * 使得JoinBean在Reduce阶段自动聚合成Iterable
* @author CJ */ public static class MyMapper extends Mapper
{ String tableFlag = ""; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); // 获取输入文件的路径 String filePath = fileSplit.getPath().toString(); System.out.println("filePath======================"+filePath); Text textKey = new Text(); JoinBean joinBean = new JoinBean(); // 根据输入路径名判断读取的是哪个表 if (filePath.substring(filePath.lastIndexOf('/') + 1, filePath.length()).equals("t_user.txt")) { tableFlag = "t_user.txt"; } else if (filePath.substring(filePath.lastIndexOf('/') + 1, filePath.length()).equals("t_user_card.txt")) { tableFlag = "t_user_card.txt"; } else { return; } // 根据不同的表名,把join字段作为输出的key,发送给reduce端 String line = value.toString(); String[] lineArray = line.split(" "); if ("t_user.txt".equals(tableFlag)) { String userid = lineArray[0]; String userName = lineArray[1]; String userSex = lineArray[2]; String profession = lineArray[3]; textKey.set(userName); joinBean.setUserId(userid); joinBean.setUserName(userName); joinBean.setUserSex(userSex); joinBean.setProfession(profession); } else if ("t_user_card.txt".equals(tableFlag)) { String userName = lineArray[1]; String idNumber = lineArray[2]; textKey.set(userName); joinBean.setIdNumber(idNumber); joinBean.setUserName(userName); } else { return; } System.out.println("textKey=" + textKey + " " + "joinBean=" + joinBean.toString()); // 发送给reduce端 context.write(textKey, joinBean); } } public static class MyReducer extends Reducer
{ @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { JoinBean joinBean = new JoinBean(); for (JoinBean bean : values) { if (StringUtils.isNotBlank(bean.getUserId())) { joinBean.setUserId(bean.getUserId()); } if (StringUtils.isNotBlank(bean.getUserName())) { joinBean.setUserName(bean.getUserName()); } if (StringUtils.isNotBlank(bean.getUserSex())) { joinBean.setUserSex(bean.getUserSex()); } if (StringUtils.isNotBlank(bean.getProfession())) { joinBean.setProfession(bean.getProfession()); } if (StringUtils.isNotBlank(bean.getIdNumber())) { joinBean.setIdNumber(bean.getIdNumber()); } } Text text = new Text(joinBean.getUserId() + " " + joinBean.getUserName() + " " + joinBean.getUserSex() + " " + joinBean.getProfession() + " " + joinBean.getIdNumber()); context.write(NullWritable.get(), text); } } public int run(String[] allArgs) throws Exception { Job job = Job.getInstance(getConf()); job.setJarByClass(MrJoinTest.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(JoinBean.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(1); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs(); FileInputFormat.addInputPaths(job, "/mrtest/joinInput/t_user.txt,/mrtest/joinInput/t_user_card.txt"); FileOutputFormat.setOutputPath(job, new Path("/mrtest/joinOutput")); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); ToolRunner.run(configuration, new MrJoinTest(), args); }}class JoinBean implements Writable { private String userId = ""; private String userName = ""; private String userSex = ""; private String profession = ""; private String IdNumber = ""; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getUserSex() { return userSex; } public void setUserSex(String userSex) { this.userSex = userSex; } public String getProfession() { return profession; } public void setProfession(String profession) { this.profession = profession; } public String getIdNumber() { return IdNumber; } public void setIdNumber(String idNumber) { IdNumber = idNumber; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.userId); out.writeUTF(this.userName); out.writeUTF(this.userSex); out.writeUTF(this.profession); out.writeUTF(this.IdNumber); } @Override public void readFields(DataInput in) throws IOException { this.userId = in.readUTF(); this.userName = in.readUTF(); this.userSex = in.readUTF(); this.profession = in.readUTF(); this.IdNumber = in.readUTF(); } @Override public String toString() { return "JoinBean [userId=" + userId + ", userName=" + userName + ", userSex=" + userSex + ", profession=" + profession + ", IdNumber=" + IdNumber + "]"; }}

_____________________________________________________________________________________________

编程过程中遇到的错误:

错误1:

hadoop jar MRstudy-1.0.jar com.chenjun.MRstudy.join.MrJoinTest 18/03/15 16:35:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable18/03/15 16:35:08 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id18/03/15 16:35:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=18/03/15 16:35:09 INFO input.FileInputFormat: Total input paths to process : 118/03/15 16:35:09 INFO mapreduce.JobSubmitter: number of splits:118/03/15 16:35:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local2133625459_000118/03/15 16:35:09 INFO mapreduce.Job: The url to track the job: http://localhost:8080/18/03/15 16:35:09 INFO mapred.LocalJobRunner: OutputCommitter set in config null18/03/15 16:35:09 INFO mapreduce.Job: Running job: job_local2133625459_000118/03/15 16:35:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 118/03/15 16:35:09 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter18/03/15 16:35:09 INFO mapred.LocalJobRunner: Waiting for map tasks18/03/15 16:35:09 INFO mapred.LocalJobRunner: Starting task: attempt_local2133625459_0001_m_000000_018/03/15 16:35:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 118/03/15 16:35:09 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.18/03/15 16:35:09 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null18/03/15 16:35:09 INFO mapred.MapTask: Processing split: hdfs://localhost:8000/mrtest/joinInput/t_user.txt:0+13718/03/15 16:35:09 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)18/03/15 16:35:09 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 10018/03/15 16:35:09 INFO mapred.MapTask: soft limit at 8388608018/03/15 16:35:09 INFO mapred.MapTask: bufstart = 0; bufvoid = 10485760018/03/15 16:35:09 INFO mapred.MapTask: kvstart = 26214396; length = 655360018/03/15 16:35:09 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBufferjava.lang.NullPointerException	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)	at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)	at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.
(MapTask.java:698) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)18/03/15 16:35:09 INFO mapred.LocalJobRunner: map task executor complete.18/03/15 16:35:09 WARN mapred.LocalJobRunner: job_local2133625459_0001java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)Caused by: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:415) at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.
(MapTask.java:698) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.NullPointerException at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011) at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402) ... 10 more18/03/15 16:35:10 INFO mapreduce.Job: Job job_local2133625459_0001 running in uber mode : false18/03/15 16:35:10 INFO mapreduce.Job: map 0% reduce 0%18/03/15 16:35:10 INFO mapreduce.Job: Job job_local2133625459_0001 failed with state FAILED due to: NA18/03/15 16:35:10 INFO mapreduce.Job: Counters: 0

这个错误百度了很久 ,到后面发现其实原因是JoinBean没有实现Writable接口导致的

_____________________________________________________________________________________________

错误2:

java.lang.Exception: java.lang.NullPointerException	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)Caused by: java.lang.NullPointerException	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)	at com.chenjun.MRstudy.join.JoinBean.write(MrJoinTest.java:199)	at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)	at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1157)	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)	at com.chenjun.MRstudy.join.MrJoinTest$MyMapper.map(MrJoinTest.java:76)	at com.chenjun.MRstudy.join.MrJoinTest$MyMapper.map(MrJoinTest.java:29)	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)	at java.lang.Thread.run(Thread.java:745)

 

这个原因是因为write()和readFields()方法报出了空指针,

解决办法:

在private String xxx 后面加入初始化赋值

 

最后运行结果:

________________________________________________________________________________

5 chenyangxiu 男 设计 363654678906542785

2 hanmei 男 机械 362781100207300033

4 hanmeixiu 女 通信 362202199697652519

3 lilei 女 法学 36201100207100033

machao 370875468820186543

1 wangming 男 计算机 360781100207100033

6 yangxiuping 男 英语 360876187618971008

 

 

 

转载于:https://my.oschina.net/u/2338224/blog/1635735

你可能感兴趣的文章
Gradle 自动构建工具5.2.1发布了
查看>>
死磕 java集合之LinkedHashSet源码分析
查看>>
让WINXP也跑4G内存
查看>>
关于thinkphp 框架开启路径重写,无法获取Authorization Header
查看>>
IntelliJ IDEA 开发 JavaFx
查看>>
用TMG搭建×××服务器(四)---基于PPTP的站点到站点×××
查看>>
datatables表头与数据无法对齐的解决方案
查看>>
Cisco 路由器 secondary address实现RIPV1不连续子网
查看>>
启用 Open vSwitch - 每天5分钟玩转 OpenStack(127)
查看>>
review what i studied `date` - 2017-4-25
查看>>
Windows 2003服务器群集(负载均衡)的安装
查看>>
MySQL不支持InnoDB的解决方法
查看>>
信息收集之DNS信息收集 -- dnstracer
查看>>
Fiddler抓包HTTPS请求
查看>>
4月全球操作系统份额动态:Win 7以49.27%成新霸主
查看>>
mongodb学习(-)
查看>>
URL 重写新手指南
查看>>
Web 前端知识体系精简
查看>>
判断一个数是不是素数
查看>>
一文读懂深度学习:从神经元到BERT
查看>>