倒排序索引的实现

倒排序索引的实现

Scroll Down

一、项目

倒排序索引的实现

二、要求

数据源:a.txt, b.txt, c.txt

a.txt

hello tom hello jerry hello tom
hello jerry hello mike mike tom
jerry hello mike hello tom jerry
b.txt

hello marry hello tom
hello jerry mike tom
c.txt

hello tom hello jerry mike 
hello mike hello tom tom

请将数据源中的文本按照倒排序的规则统计每个单词在不同文本中出现的次数。最终要得到类似以下的结果:
hello a.txt->8,b.txt->3,c.txt->1
tom a.txt->5,b.txt->2,c.txt->1
.....

三、分析

  • 1、第一个mapper读取每个文件按照空格分隔,然后输入的key为单词-文件名,value为1。这样的话,同一个文件的相同的单词会进行合并

  • 2、第一个reducer,遍历value集合即可知道此文件中这个单词出现的次数。

  • 3、第一个mapreduce结束后生成的文件是每个单词在每个文件中出现的次数如下:

hello-a.txt 7
hello-b.txt 3
hello-c.txt 4
jerry-a.txt 4
jerry-b.txt 1
jerry-c.txt 1
marry-b.txt 1
mike-a.txt 3
mike-b.txt 1
mike-c.txt 2
tom-a.txt 4
tom-b.txt 2
tom-c.txt 3


- 4、第二个mapper,每次读取一行按照 - 分割,key单词,value为文件名字和出现次数,这样相同的key也会进行合并,value就是一个文件名字和出现次数的集合了。

- 5、第二个reducer,处理value,制作一个map集合,key为文件名字,value为出现次数,然后按照value降序排序这个map集合。接着遍历集合将数据形成题目要求的格式,然后写入文件即可。

#### 四、代码 

首先在Hdfs的根目录创建sort目录,接着在sort目录下创建input目录,将源数据上传到input文件夹里面。

> 第一个mapper代码

```java
package wordsort.mapper;

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class Map1 extends Mapper<LongWritable, Text, Text, IntWritable> {
	String pathName = null;
	
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		FileSplit fileSplit = (FileSplit)context.getInputSplit();
		pathName = fileSplit.getPath().getName();
	}
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String[] fields = StringUtils.split(line, " ");
		for(String s:fields) {
			context.write(new Text(s+"-"+pathName),new IntWritable(1));
		}
	}
}

第一个reducer代码

package wordsort.reducer;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Red1   extends Reducer<Text, IntWritable, Text, IntWritable> {
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		int a=0;
		for(IntWritable value:values) {
			a++;
		}
		context.write(new Text(key),new IntWritable(a));
	}
}

第一个Job代码

package wordsort.runner;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import wordsort.mapper.Map1;
import wordsort.reducer.Red1;
public class JobMr1 {
	// 输入文件的相对路径
	private static String inPath = "/sort/input/";
	// 输出文件的相对路径
	private static String outPath = "/sort/output1";
	// hdsf地址
	private static String hdfs = "hdfs://192.168.80.90:9000";

	public int run() {
		try {
			// 创建job配置类
			Configuration conf = new Configuration();

			// 设置hdfs的地址
			conf.set("fs.defaultFS", hdfs);

			// 创建job实例
			Job job = Job.getInstance(conf, "mr");
			
			// 设置job的主类
			job.setJarByClass(JobMr1.class);
		
			
			// 设置Job的Mapper类和Reducer类
			job.setMapperClass(Map1.class);
			job.setReducerClass(Red1.class);

			// 指定mapper的输出数据key-value类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			// 指定reduce的输出数据key-value类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);

			FileSystem fs = FileSystem.get(conf);
			// 设置输入和输出路径
			Path inputpath = new Path(inPath);
			if (fs.exists(inputpath)) {
				// 如果输入的文件路径存在 才会添加路径,不存在会报异常
				FileInputFormat.addInputPath(job, inputpath);
			}
			Path outputpath = new Path(outPath);
			// 如果输出的文件目录存在,要删除,否则报错
			// 路径存在才会删除,否则不删除
			fs.delete(outputpath, true);
			FileOutputFormat.setOutputPath(job, outputpath);

			// 返回作业运行状态 成功返回1 失败返回-1
			return job.waitForCompletion(true) ? 1 : -1;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return -1;
	}

	public static void main(String[] args) {
		int result = -1;
		result = new JobMr1().run();
		if (result == 1) {
			System.out.println("step1运行成功.....");
		} else if (result == -1) {
			System.out.println("step1运行失败....");
		}
	}
}

第二个Mapper代码

package wordsort.mapper;

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class Map2 extends Mapper<LongWritable, Text, Text, Text> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String[] fields = StringUtils.split(line, "-");
		context.write(new Text(fields[0]),new Text(fields[1]));
	}
}

第二个reducer代码

package wordsort.reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Red2 extends Reducer<Text, Text, Text, Text> {
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		Map<String,Integer> map = new TreeMap<String,Integer>();   
        for(Text tx:values) {
        	String[] filevalue=tx.toString().split("\t");
        	map.put(filevalue[0],Integer.parseInt(filevalue[1]));
		}
        List<Entry<String, Integer>> list = new ArrayList<Entry<String, Integer>>(map.entrySet()); 
        Collections.sort(list,new Comparator<Map.Entry<String,Integer>>() {
            public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
                return o2.getValue().compareTo(o1.getValue());
            }
        });
        StringBuilder result=new StringBuilder();
        for (Entry<String, Integer> e: list) {  
        	result.append(e.getKey()+"->"+e.getValue()+",");
        }
        context.write(new Text(key),new Text(result.toString().substring(0, result.toString().length() - 1))); 
	}
}

第二个Job代码

package wordsort.runner;

import java.io.IOException;
public class JobMr2 {
	// 输入文件的相对路径
	private static String inPath = "/sort/output1/part-r-00000";
	// 输出文件的相对路径
	private static String outPath = "/sort/output2";
	// hdsf地址
	private static String hdfs = "hdfs://192.168.80.90:9000";

	public int run() {
		try {
			// 创建job配置类
			Configuration conf = new Configuration();

			// 设置hdfs的地址
			conf.set("fs.defaultFS", hdfs);

			// 创建job实例
			Job job = Job.getInstance(conf, "mr");
			
			// 设置job的主类
			job.setJarByClass(JobMr2.class);
		
			
			// 设置Job的Mapper类和Reducer类
			job.setMapperClass(Map2.class);
			job.setReducerClass(Red2.class);

			// 指定mapper的输出数据key-value类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			
			// 指定reduce的输出数据key-value类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);

			FileSystem fs = FileSystem.get(conf);
			// 设置输入和输出路径
			Path inputpath = new Path(inPath);
			if (fs.exists(inputpath)) {
				// 如果输入的文件路径存在 才会添加路径,不存在会报异常
				FileInputFormat.addInputPath(job, inputpath);
			}
			Path outputpath = new Path(outPath);
			// 如果输出的文件目录存在,要删除,否则报错
			// 路径存在才会删除,否则不删除
			fs.delete(outputpath, true);
			FileOutputFormat.setOutputPath(job, outputpath);

			// 返回作业运行状态 成功返回1 失败返回-1
			return job.waitForCompletion(true) ? 1 : -1;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return -1;
	}

	public static void main(String[] args) {
		int result = -1;
		result = new JobMr2().run();
		if (result == 1) {
			System.out.println("step1运行成功.....");
		} else if (result == -1) {
			System.out.println("step1运行失败....");
		}
	}
}