성별,연령대별 최고연봉 구하기 본문
3개의 Partition으로 나눠 output이 3개 생성되므로, FileUtil.copyMerge를 사용해 결과를 합쳐 보았다.
물론,이게 정상적인 방법인지는 공부를 더 해봐야 확인이 가능하겠다.
package com.dbility.hadoop.partitioner; import; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import; import; import; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class GenderDistinctMaxSalaryDriver extends Configured implements Tool { public static class GenderDistinctMaxSalaryMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] cols = value.toString().split("\t"); int age = Integer.parseInt(cols[2]); String ages; if ( age <= 20){ ages = "0-20"; } else if ( age > 20 && age <= 30 ) { ages = "21-30"; } else { ages = "31~"; } context.write(new Text(cols[3]+"\t"+ages), new Text(value)); } } public static class GenderDistinctMaxSalaryPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] cols = value.toString().split("\t"); int age = Integer.parseInt(cols[2]); if ( numReduceTasks == 0) return 0; if ( age <= 20){ return 0; } else if ( age > 20 && age <= 30 ) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } public static class GenderDistinctMaxSalaryReducer extends Reducer<Text, Text, Text, IntWritable> { int max = -1; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { max = -1; for (Text value : values) { String[] cols = value.toString().split("\t"); if ( Integer.parseInt(cols[4]) > max) max = Integer.parseInt(cols[4]); } context.write(key, new IntWritable(max)); } } public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (remainArgs.length != 2) { System.out.println("Usage : hadoop jar JobJarFile [mainClass] <input> <output>"); return -1; } Job job = new Job(getConf(), "GenderAgesDistinctMaxSalaryDriver"); job.setJarByClass(GenderDistinctMaxSalaryDriver.class); job.setMapperClass(GenderDistinctMaxSalaryMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(GenderDistinctMaxSalaryPartitioner.class); job.setReducerClass(GenderDistinctMaxSalaryReducer.class); job.setNumReduceTasks(3); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if ( hdfs.exists(path) ) hdfs.delete(path, true); return job.waitForCompletion(true) ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int ret =, new GenderDistinctMaxSalaryDriver(), args); if ( ret == 0) { FileSystem hdfs = FileSystem.get(conf); Path path = new Path(args[1]+"_result/out.txt"); if ( hdfs.exists(path) ) hdfs.delete(path, true); FileUtil.copyMerge(hdfs, new Path(args[1]), hdfs, path, true, conf, null); } System.exit(ret); } }
참조 :
MapReduce - Partitioner
MapReduce - Partitioner
