需求: 表4-4 订单数据表t_order
1 2 3 4 5 6 7 id pid amount 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
表4-5 商品信息表t_product
1 2 3 4 pid pname 01 小米 02 华为 03 格力
将商品信息表中数据根据商品pid合并到订单数据表中。
表4-6 最终数据形式
1 2 3 4 5 6 7 id pname amount 1001 小米 1 1004 小米 4 1002 华为 2 1005 华为 5 1003 格力 3 1006 格力 6
OrderBean类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.atguigu.bean; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private String id; private String pid; private int amount; private String pname; @Override public String toString() { return id + "\t" + pname + "\t" + amount; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(id); dataOutput.writeUTF(pid); dataOutput.writeInt(amount); dataOutput.writeUTF(pname); } public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readUTF(); this.pid = dataInput.readUTF(); this.amount = dataInput.readInt(); this.pname = dataInput.readUTF(); } public int compareTo(OrderBean o) { int compare = this.pid.compareTo(o.pid);//首先按照pid分组 if(compare == 0){ return o.pname.compareTo(this.pname);//其次再按照pname排序,倒序排列,因为需要pname字段为空时排到后面 }else{ return compare; } } }
MJMapper类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.atguigu.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MJMapper extends Mapper<LongWritable, Text,Text, NullWritable> { private Map<String,String> pMap = new HashMap<>();//存pd.txt的内容 private Text k = new Text(); /** * 需要在执行map之前将缓存内容读进内存 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { URI[] cacheFiles = context.getCacheFiles(); String path = cacheFiles[0].getPath().toString(); BufferedReader bufferedReader = new BufferedReader(new FileReader(path)); String line; while(StringUtils.isNotEmpty((line = bufferedReader.readLine()))){ String[] fields = line.split("\t"); pMap.put(fields[0],fields[1]); } IOUtils.closeStream(bufferedReader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); String pname = pMap.get(fields[1]); if(pname == null) pname = "NULL"; k.set(fields[0] + "\t" + pname + "\t" + fields[2]); context.write(k,NullWritable.get()); } }
MJDriver类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.atguigu.mapjoin; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; public class MJDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJarByClass(MJDriver.class); job.setMapperClass(MJMapper.class); job.setNumReduceTasks(0);//设置ReduceTask数量为0,就不会进入Shuffle和Reduce,数据直接进入OutputFormat job.addCacheFile(URI.create("file:///F:/input/pd.txt"));//将小表缓存进入内存 FileInputFormat.setInputPaths(job, new Path("F:\\input/order.txt")); FileOutputFormat.setOutputPath(job, new Path("F:/output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
v1.5.2