博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
05Hadoop-左外连接
阅读量:6990 次
发布时间:2019-06-27

本文共 6835 字,大约阅读时间需要 22 分钟。

场景:有两张表,一张用户表(user),交易表(transactions)。两张表的字段如下:

两份表数据做个左连接,查询出(商品名,地址)这种格式。

 这样就是相当于交易表是左表,不管怎么样数据都要保留,然后从右边里面查出来弥补左表。

效果如下:

 

 

 思路:写两个map,把两个表的数据都读进来,在reduce端进行连接,然后按照格式要求写出去。

(1)map1:读取transaction文件,封装为:

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)

throws IOException, InterruptedException {

              String lines=value.toString();

              String[] args=lines.split(" ");

              String productID=args[1];

              String userID=args[2];

    //把outPutKey加了一个2,这么做的目的是,后续在reduce端,聚合时,这个数据能够晚点到达。

              outPutKey.set(userID, "2");

              outPutValue.set("P", productID);

              context.write(outPutKey, outPutValue);

}

 

 (2)map2:读取user文件,封装为:

static class map2 extends Mapper<LongWritable, Text,PairOfStrings,PairOfStrings>

{

PairOfStrings outPutKey=new PairOfStrings();

PairOfStrings outPutvalue=new PairOfStrings();

@Override

protected void map(LongWritable key, Text value,

Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] args=line.split(" ");

String userID=args[0];

String locationID=args[1];

 //把outPutKey加了一个1,这么做的目的是,后续在reduce端,聚合时,这个数据能够早于transaction文件里面的数据到达。

outPutKey.set(userID, "1");

outPutvalue.set("L", locationID);

context.write(outPutKey, outPutvalue);

}

 

(3)reduce:把map端的数据要根据用户ID分区,相同的用户ID写入到同一个分区,进而写入到同一个Reduce分区,然后在Reduce中根据PairOfStrings这个类的自己的排序规则对数据排序。因为前面对key做了处理(加了1,2),所以是用户的地址这些信息先到达reduce。,然后根据不同的分组,把数据写出来。 

 

 






总的代码结构:

LeftCmain:

package com.guigu.left;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import edu.umd.cloud9.io.pair.PairOfStrings;public class LeftCmain {        //读取transaction文件    static class map1 extends Mapper
{ PairOfStrings outPutKey=new PairOfStrings(); PairOfStrings outPutValue=new PairOfStrings(); @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { String lines=value.toString(); String[] args=lines.split(" "); String productID=args[1]; String userID=args[2]; outPutKey.set(userID, "2"); outPutValue.set("P", productID); context.write(outPutKey, outPutValue); } } //读取user文件 static class map2 extends Mapper
{ PairOfStrings outPutKey=new PairOfStrings(); PairOfStrings outPutvalue=new PairOfStrings(); @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { String line=value.toString(); String[] args=line.split(" "); String userID=args[0]; String locationID=args[1]; outPutKey.set(userID, "1"); outPutvalue.set("L", locationID); context.write(outPutKey, outPutvalue); } } /** * 这个的关键点在于,取出的数据:要求先取出地址的数据。 * @author Sxq * */ static class reduce1 extends Reducer
{ Text produceID=new Text(); Text localID=new Text("undefine"); @Override protected void reduce(PairOfStrings arg0, Iterable
Iterator1, Reducer
.Context context) throws IOException, InterruptedException { Iterator
iterator=Iterator1.iterator(); //由于做了二次排序,可以保证先得到的是地址的数据。 if(iterator.hasNext()) { PairOfStrings fisrPair=iterator.next(); // System.out.println("firstPair="+fisrPair.toString()); //如果是地址的信息,那就把他直接放出来 if(fisrPair.getLeftElement().equals("L")) { localID.set(fisrPair.getRightElement()); } } while(iterator.hasNext()) { PairOfStrings pairOfStrings=iterator.next(); //System.out.println(pairOfStrings.toString()); produceID.set(pairOfStrings.getRightElement()); System.out.println("prdouct:"+produceID.toString()+"localId:"+localID.toString()); //System.out.println(); context.write(produceID, localID); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LeftCmain.class); job.setMapperClass(map1.class); job.setReducerClass(reduce1.class); job.setMapOutputKeyClass(PairOfStrings.class); job.setMapOutputValueClass(PairOfStrings.class); job.setOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setSortComparatorClass(PairOfStrings.Comparator.class); // 在Reduce端设置分组,使得同一个用户在同一个组,然后做拼接。 job.setGroupingComparatorClass(SecondarySortGroupComparator.class); // 设置分区 job.setPartitionerClass(SecondarySortParitioner.class); // job.setOutputFormatClass(SequenceFileOutputFormat.class); Path transactions=new Path("/Users/mac/Desktop/transactions.txt"); MultipleInputs.addInputPath(job,transactions,TextInputFormat.class,map1.class); MultipleInputs.addInputPath(job,new Path("/Users/mac/Desktop/user.txt"), TextInputFormat.class,map2.class); FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort")); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

 

 

SecondarySortGroupComparator:

package com.guigu.left;import org.apache.hadoop.io.DataInputBuffer;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import com.book.test1.CompositeKey;import edu.umd.cloud9.io.pair.PairOfStrings;/** * 不同分区的组聚合时,可以按照我们要的顺序来排列 * @author Sxq *WritableComparator */public class SecondarySortGroupComparator extends WritableComparator {    public  SecondarySortGroupComparator() {         super(PairOfStrings.class,true);    }        @Override    public int compare(WritableComparable a, WritableComparable b) {                PairOfStrings v1=(PairOfStrings)a;        PairOfStrings v2=(PairOfStrings)b;        return v1.getLeftElement().compareTo(v2.getLeftElement());    }}

SecondarySortParitioner:

package com.guigu.left;import org.apache.hadoop.mapreduce.Partitioner;import edu.umd.cloud9.io.pair.PairOfStrings;/** *  * @author Sxq * */public class SecondarySortParitioner extends Partitioner
{ @Override public int getPartition(PairOfStrings key, Object value, int numPartitions) { return (key.getLeftElement().hashCode()&Integer.MAX_VALUE)%numPartitions; }}

 

 

运行结果:

 

 

 

 

 

 

 

转载地址:http://iobvl.baihongyu.com/

你可能感兴趣的文章
MVC Contoller 控制器的返回类型
查看>>
基于python语言的tensorflow的‘端到端’的字符型验证码识别源码整理(github源码分享)...
查看>>
tab-选项卡-[data-class]
查看>>
批量修改文件名re_name.py
查看>>
Linux 可以SSH,但ping不通
查看>>
APT***简述
查看>>
shell批量操作循环
查看>>
Gitlab omnibus 8.15.1 升级到 9.5.+
查看>>
PHP configure: error: mcrypt.h not found. Please reinstall libmcrypt.(转)
查看>>
awk命令——报告生成工具
查看>>
Linux开机启动流程描述
查看>>
“两只小熊队”Alpha版本展示博客
查看>>
创建django的不同环境
查看>>
Top 10 command-line commands for managing Windows 7 desktops
查看>>
CentOS5.4安装samba服务
查看>>
学习笔记之简单工厂设计模式
查看>>
Spring+SpringMVC+MyBatis+Maven框架整合
查看>>
MFC读写文件
查看>>
linux优化
查看>>
手动制作mini linux详细步骤—之一
查看>>