ITPub博客

首页 > 大数据 > Hadoop > MapReduce实现倒排索引(多路径输入)

MapReduce实现倒排索引(多路径输入)

原创 Hadoop 作者:yanke_shanghai 时间:2016-06-14 11:16:43 0 删除 编辑



案例采用 MultipleInputs类 实现多路径输入的倒排索引。


package test0820; 
import java.io.IOException; 
import java.lang.reflect.Method; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
public class WC0826 { 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(WC0826.class);      

        job.setMapperClass(IIMapper.class);
        job.setCombinerClass(IICombiner.class);
        job.setReducerClass(IIReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class); //使用MultipleInputs类指定多路径输入 
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        System.exit(job.waitForCompletion(true)? 0:1);
    } 
 
    //map 
    public static class IIMapper extends Mapper<LongWritable, Text, Text, Text>{

        String fileName; 
        /** 
        * 使用 MultipleInputs 获得 FileName 必须添加的类 
        */ 
        private Path getFilePath(Context context) throws IOException {
            
            InputSplit split = context.getInputSplit();
            Class<? extends InputSplit> splitClass = split.getClass();
            
            FileSplit fileSplit = null
            if (splitClass.equals(FileSplit.class)) {
                fileSplit = (FileSplit) split;
            } else if (splitClass.getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) 
                // begin reflection hackery... 
                try {
                    Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
                    getInputSplitMethod.setAccessible(true);
                    fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
                } catch (Exception e) { 
                    // wrap and re-throw error 
                    throw new IOException(e);
                } 
                // end reflection hackery  
            } 
            return fileSplit.getPath();
        }

        @Override 
        protected void setup(Context context) throws IOException, InterruptedException { 
        //get file name 
        fileName = getFilePath(context).getName();  
        }

        @Override 
        protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

            String[] splited = value.toString().split("\t"); 
            for(String word : splited){
                Text word_fileName=new Text(word+"@"+fileName);
                context.write(word_fileName,new Text("1"));
            }
        }
    } 
 
    //combiner 
    public static class IICombiner extends Reducer<Text, Text, Text, Text>{
        @Override 
        protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {

            Long sum = 0L;    
            String value=new String();

            String[] splited = key.toString().split("@"); 
            for(Text vl :v2s){
                sum += Long.parseLong(vl.toString());
                value = splited[1]+"@"+sum.toString();                
            }
            context.write(new Text(splited[0]), new Text(value));
        }
    } 
 
    //reduce 
    public static class IIReducer extends Reducer<Text, Text, Text, Text>{
        @Override 
        protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {

            String value=new String(); for(Text text : v2s){
                value = text.toString()+":"+value;                
            } 
            //去掉最后的":" 
            context.write(key, new Text(value.substring(0, value.length()-1)));
        }
    }
}

出现问题01使用MultipleInputs类指定输入路径,当setup()方法中调用getInputSplit()方法获取当前split对应的FileName时会报IO异常:

Error: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit

问题原因01:filesplit实际上就是TaggedInputSplit中的成员变量inputSplit,而TaggedInputSplit类并不是public的(默认是default声明类型),所以不能直接获得对应的信息。

解决方案01:

  • 第一种方法:在当前项目中新建对应的TaggedInputSplit类,并声明为public。即覆盖掉原有TaggedInputSplit类的声明类型。然后通过以下代码就可以正确调用:
(FileSplit)((TaggedInputSplit)reporter.getInputSplit()).getInputSplit(); 
  • 第二种方法:通过反射机制。代码如下:
复制代码
/** 
 * 反射机制
 * 使用 MultipleInputs 获得 FileName 必须添加的类 
*/ 
private Path getFilePath(Context context) throws IOException {
    
    InputSplit split = context.getInputSplit();
    Class<? extends InputSplit> splitClass = split.getClass();
    
    FileSplit fileSplit = null
    if (splitClass.equals(FileSplit.class)) {
        fileSplit = (FileSplit) split;
    } else if (splitClass.getName().
            equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { 
            // begin reflection hackery... 
            try {
            Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
            getInputSplitMethod.setAccessible(true);
            fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
        } catch (Exception e) { 
            // wrap and re-throw error 
            throw new IOException(e);
        } 
        // end reflection hackery  
    } 
    return fileSplit.getPath();
}
复制代码

出现问题02:

map<Object,Text,Text,IntWritble>
combiner<Text,IntWritble,Text,Text>
reduce<Text,Text,Text,Text>

这样设置,系统会异常。这是因为Combiner和Reducer其实是同一个函数,所以输入和输出类型必须保持一致。

Combiner实现对map端value的聚合,减少map 到 reudce 间数据传输,加快 shuffle 速度。牢记求平均值的MR不能使用Combiner。

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/30316686/viewspace-2119947/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2015-06-30

  • 博文量
    65
  • 访问量
    341186