MapReduce+DistributedCache实现矩阵相乘

Scroll Down

MapReduce+DistributedCache实现矩阵相乘

1、结果

2、矩阵转置

这样就可以直接使用左矩阵的行乘以右矩阵的行了。

3、思路

实现

1、准备两个数据文件上传到hdfs

matrix1.txt 上传到/user/matrix/step2_input/

1	1_1,2_2,3_-2,4_0
2	1_3,2_3,3_4,4_-3
3	1_-2,2_0,3_2,4_3
4	1_5,2_3,3_-1,4_2
5	1_-4,2_2,3_0,4_2

matrix2.txt 上传到/user/matrix/step1_input/

1	1_0,2_3,3_-1,4_2,5_-3
2	1_1,2_3,3_5,4_-2,5_-1
3	1_0,2_1,3_4,4_-1,5_2
4	1_-2,2_2,3_-1,4_1,5_2

2、编写java代码

step1包下Mapper1.java、Reudcer1.java、Mr1.java:

package step1;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
	private Text outkey = new Text();
	private Text outvalue = new Text();

	/**
	 * 右侧矩阵
	 * key:1 行号 value:1 1_0,2_3,3_-1,4_2,5_-3
	 */
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		String[] rowAndLine = value.toString().split("\t");// 将输入的字符串进行拆分
		String row = rowAndLine[0];// 行号
		String[] lines = rowAndLine[1].split(",");// 后面的值
		//lines ["1_0","2_3","3_-1","4_2","5_-3"]
		for(int i=0;i<lines.length;i++) {
			String column=lines[i].split("_")[0];//列号
			String valueStr=lines[i].split("_")[1];//具体的值
			//key:列号  value:行号_值
			outkey.set(column);	
			outvalue.set(row+"_"+valueStr);
			context.write(outkey, outvalue);
		}
	}
}
package step1;

import java.io.IOException;

public class Reducer1 extends Reducer<Text,Text,Text,Text>{
	
	private Text outkey = new Text();
	private Text outvalue = new Text();
	
	
	//key:列号 value:[行号_值,行号_值,行号_值,行号_值....]
	@Override
	protected void reduce(Text key, Iterable<Text> values,Context context)
			throws IOException, InterruptedException {
		StringBuilder sb=new StringBuilder();
		for(Text text:values) {
			//text: 行号_值
			sb.append(text+",");
		}
		String line=null;
		if(sb.toString().endsWith(",")) {
			line=sb.substring(0,sb.length()-1);//将最后多余的 , 去掉
		}
		outkey.set(key);
		outvalue.set(line);
		context.write(outkey,outvalue);
	}
}

package step1;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Mr1 {
	//输入文件的相对路径
	private static String inPath="/user/matrix/step1_input/matrix2.txt";
	//输出文件的相对路径
	private static String outPath="/user/matrix/step1_output";
	//hdsf地址
	private static String hdfs="hdfs://192.168.80.90:9000";
	
	public int run() {
		try {
			//创建job配置类
			Configuration conf=new Configuration();
			
			//设置hdfs的地址
			conf.set("fs.defaultFS", hdfs);
			
			//创建job实例
			Job job=Job.getInstance(conf,"step1");
			
			//设置job的主类
			job.setJarByClass(Mr1.class);
			
			//设置Job的Mapper类和Reducer类
			job.setMapperClass(Mapper1.class);
			job.setReducerClass(Reducer1.class);
			
			//设置Mapper的输出类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			
			//设置Reducer输出的类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
	
			FileSystem fs=FileSystem.get(conf);
			//设置输入和输出路径
			Path inputpath=new Path(inPath);
			if(fs.exists(inputpath)) {
				//如果输入的文件路径存在 才会添加路径,不存在会报异常
				FileInputFormat.addInputPath(job, inputpath);
			}
			Path outputpath=new Path(outPath);
			//如果输出的文件目录存在,要删除,否则报错
			//路径存在才会删除,否则不删除
			fs.delete(outputpath,true);
			FileOutputFormat.setOutputPath(job, outputpath);
			
			//返回作业运行状态 成功返回1 失败返回-1
			return job.waitForCompletion(true)?1:-1;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	public static void main(String[] args) {
		int result=-1;
		result=new Mr1().run();
		if(result==1) {
			System.out.println("step1运行成功.....");
		}else if(result==-1) {
			System.out.println("step1运行失败.....");
		}
	}
}

step2包下Mapper2.java、Reudcer2.java、Mr2.java:

package step2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Mapper2 extends Mapper<LongWritable, Text, Text, Text> {
	
	private Text outkey = new Text();
	private Text outvalue = new Text();

	private List<String> cacheList=new ArrayList<String>();
	
	
	
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
		super.setup(context);
		//通过输入流将全局缓存中的 右侧矩阵读入Lsit<String>中
		FileReader fr=new FileReader("matrix2");
		BufferedReader br=new BufferedReader(fr);
		
		String line=null;
		while((line=br.readLine())!=null) {
			cacheList.add(line);
		}
		fr.close();
		br.close();
	}


	/**
	 * key 行号
	 * value:行 列_值,列_值,列_值,列_值
	 * */
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//行
		String row_matrix1=value.toString().split("\t")[0];
		//列_值(数组)
		String[] column_value_array_matrix1=value.toString().split("\t")[1].split(",");
		
		for(String line:cacheList) {
			//右侧矩阵的line
			//格式:行 列_值,列_值,列_值,列_值
			String row_matrix2=line.toString().split("\t")[0];
			String[] column_value_array_matrix2=line.toString().split("\t")[1].split(",");
			
			//矩阵两行相乘得到的结果
			int result=0;
			//遍历左矩阵的第一行的每一列
			for(String column_value_matrix1:column_value_array_matrix1) {
				String column_martrix1=column_value_matrix1.split("_")[0];
				String value_martrix1=column_value_matrix1.split("_")[1];
				//遍历右矩阵每一行的每一列
				for(String column_value_matrix2:column_value_array_matrix2) {
					//如果左矩阵列号等于又矩阵列号
					if(column_value_matrix2.startsWith(column_martrix1+"_")){
						String value_martrix2=column_value_matrix2.split("_")[1];
						//将两列的值相乘,并累加
						result+=Integer.valueOf(value_martrix1)*Integer.valueOf(value_martrix2);
					}
				}
			}
			//result就是结果矩阵中的某元素,坐标为 行:row_matrix1 列:row_matrix2(因为右矩阵已经转置)
			outkey.set(row_matrix1);
			outvalue.set(row_matrix2+"_"+result);
			//输入格式key:行  value:列_值
			context.write(outkey, outvalue);
		}
	}
}

package step2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class Reducer2 extends Reducer<Text,Text,Text,Text>{
	
	private Text outkey = new Text();
	private Text outvalue = new Text();
	
	
	@Override
	protected void reduce(Text key, Iterable<Text> values,Context context)
			throws IOException, InterruptedException {
		StringBuilder sb=new StringBuilder();
		for(Text text:values) {
			//text: 行号_值
			sb.append(text+",");
		}
		String line=null;
		if(sb.toString().endsWith(",")) {
			line=sb.substring(0,sb.length()-1);//将最后多余的 , 去掉
		}
		outkey.set(key);
		outvalue.set(line);
		context.write(outkey,outvalue);
	}
}

package step2;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Mr2 {
	//输入文件的相对路径
	private static String inPath="/user/matrix/step2_input/matrix1.txt";
	//输出文件的相对路径
	private static String outPath="/user/matrix/output";
	
	//将step1输出的转置矩阵作为全局缓存
	private static String cache="/user/matrix/step1_output/part-r-00000";
	
	//hdsf地址
	private static String hdfs="hdfs://192.168.80.90:9000";
	
	public int run() {
		try {
			//创建job配置类
			Configuration conf=new Configuration();
			
			//设置hdfs的地址
			conf.set("fs.defaultFS", hdfs);
			
			//创建job实例
			Job job=Job.getInstance(conf,"step2");
			
			//将缓存的文件添加值job中
			job.addCacheFile(new URI(cache+"#matrix2"));
			
			
			//设置job的主类
			job.setJarByClass(Mr2.class);
			
			//设置Job的Mapper类和Reducer类
			job.setMapperClass(Mapper2.class);
			job.setReducerClass(Reducer2.class);
			
			//设置Mapper的输出类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			
			//设置Reducer输出的类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
	
			FileSystem fs=FileSystem.get(conf);
			//设置输入和输出路径
			Path inputpath=new Path(inPath);
			if(fs.exists(inputpath)) {
				//如果输入的文件路径存在 才会添加路径,不存在会报异常
				FileInputFormat.addInputPath(job, inputpath);
			}
			Path outputpath=new Path(outPath);
			//如果输出的文件目录存在,要删除,否则报错
			//路径存在才会删除,否则不删除
			fs.delete(outputpath,true);
			FileOutputFormat.setOutputPath(job, outputpath);
			
			//返回作业运行状态 成功返回1 失败返回-1
			return job.waitForCompletion(true)?1:-1;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	public static void main(String[] args) {
		int result=-1;
		result=new Mr2().run();
		if(result==1) {
			System.out.println("step2运行成功.....");
		}else if(result==-1) {
			System.out.println("step2运行失败.....");
		}
		
	}
	
}

3、运行步骤

先运行Mr1.java,将右侧矩阵转置,再运行Mr2.java。计算矩阵相乘的结果。

4、程序分析

【1】Mr1.java程序运行之后产生的文件的过程,如下图。输入的文件再Map阶段通过Mapper1.java文件形成map之后的图,之后会进行Shuffle阶段。其实此阶段已经将矩阵相当于转置了。Shuffle阶段是按照key将数据进行合并。原理可以参考(Shuffle阶段)

【2】Mr2.java程序在运行job作业的时候首先将上一步转置的矩阵读取到缓存中。(使用DistributedCache)

//将缓存的文件添加值job中
job.addCacheFile(new URI(cache+"#matrix2"));

之后可以再Map阶段通过Mapper2.java将缓存读取到list集合中。先读取缓存接着计算。(setup方法最先运行而且只运行一次)

@Override
protected void setup(Context context)throws IOException, InterruptedException {
	super.setup(context);
	//通过输入流将全局缓存中的 右侧矩阵读入Lsit<String>中
	FileReader fr=new FileReader("matrix2");
	BufferedReader br=new BufferedReader(fr);
	String line=null;
	while((line=br.readLine())!=null) {
		cacheList.add(line);
	}
	fr.close();
	br.close();
}

接着运行map方法进行矩阵的计算。

/**
 * key 行号
 * value:行 列_值,列_值,列_值,列_值
 * */
@Override
protected void map(LongWritable key, Text value,Context context)
	throws IOException, InterruptedException {
	//行
	String row_matrix1=value.toString().split("\t")[0];
	//列_值(数组)
	String[] column_value_array_matrix1=value.toString().split("\t")[1].split(",");
	for(String line:cacheList) {
		//右侧矩阵的line
		//格式:行 列_值,列_值,列_值,列_值
		String row_matrix2=line.toString().split("\t")[0];
		String[] column_value_array_matrix2=line.toString().split("\t")[1].split(",");
		//矩阵两行相乘得到的结果
		int result=0;
		//遍历左矩阵的第一行的每一列
		for(String column_value_matrix1:column_value_array_matrix1) {
			String column_martrix1=column_value_matrix1.split("_")[0];
			String value_martrix1=column_value_matrix1.split("_")[1];
			//遍历右矩阵每一行的每一列
			for(String column_value_matrix2:column_value_array_matrix2) {
				//如果左矩阵列号等于又矩阵列号
				if(column_value_matrix2.startsWith(column_martrix1+"_")){
					String value_martrix2=column_value_matrix2.split("_")[1];
				//将两列的值相乘,并累加
				result+=Integer.valueOf(value_martrix1)*Integer.valueOf(value_martrix2);
				}
			}
		}
	//result就是结果矩阵中的某元素,坐标为 行:row_matrix1 列:row_matrix2(因为右矩阵已经转置)
		outkey.set(row_matrix1);
		outvalue.set(row_matrix2+"_"+result);
		//输入格式key:行  value:列_值
		context.write(outkey, outvalue);
	}
}