DBILITY

성별,연령대별 최고연봉 구하기 본문

bigdata/hadoop

성별,연령대별 최고연봉 구하기

DBILITY 2016. 11. 29. 15:47
반응형

3개의 Partition으로 나눠 output이 3개 생성되므로, FileUtil.copyMerge를 사용해 결과를 합쳐 보았다.
물론,이게 정상적인 방법인지는 공부를 더 해봐야 확인이 가능하겠다.

  1. input.txt
    다운로드
  2. package com.dbility.hadoop.partitioner;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class GenderDistinctMaxSalaryDriver extends Configured implements Tool {
    
    	public static class GenderDistinctMaxSalaryMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    		@Override
    		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    			String[] cols = value.toString().split("\t");
    			int age = Integer.parseInt(cols[2]);
    			String ages;
    			if ( age <= 20){
    				ages = "0-20";
    			} else if ( age > 20 && age <= 30 ) {
    				ages = "21-30";
    			} else {
    				ages = "31~";
    			}
    			context.write(new Text(cols[3]+"\t"+ages), new Text(value));
    		}
    	}
    
    	public static class GenderDistinctMaxSalaryPartitioner extends Partitioner<Text, Text> {
    
    		@Override
    		public int getPartition(Text key, Text value, int numReduceTasks) {
    
    			String[] cols = value.toString().split("\t");
    			int age = Integer.parseInt(cols[2]);
    
    			if ( numReduceTasks == 0)
    				return 0;
    
    			if ( age <= 20){
    				return 0;
    			} else if ( age > 20 && age <= 30 ) {
    				return 1 % numReduceTasks;
    			} else {
    				return 2 % numReduceTasks;
    			}
    		}
    	}
    
    	public static class GenderDistinctMaxSalaryReducer extends Reducer<Text, Text, Text, IntWritable> {
    
    		int max = -1;
    
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
    				InterruptedException {
    			max = -1;
    			for (Text value : values) {
    				String[] cols = value.toString().split("\t");
    				if ( Integer.parseInt(cols[4]) > max)
    					max = Integer.parseInt(cols[4]);
    			}
    			context.write(key, new IntWritable(max));
    		}
    	}
    
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		if (remainArgs.length != 2) {
    			System.out.println("Usage : hadoop jar JobJarFile [mainClass] <input> <output>");
    			return -1;
    		}
    
    		Job job = new Job(getConf(), "GenderAgesDistinctMaxSalaryDriver");
    
    		job.setJarByClass(GenderDistinctMaxSalaryDriver.class);
    
    		job.setMapperClass(GenderDistinctMaxSalaryMapper.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		job.setPartitionerClass(GenderDistinctMaxSalaryPartitioner.class);
    		job.setReducerClass(GenderDistinctMaxSalaryReducer.class);
    		job.setNumReduceTasks(3);
    
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
    
    		FileSystem hdfs = FileSystem.get(getConf());
    		Path path = new Path(remainArgs[1]);
    		if ( hdfs.exists(path) ) hdfs.delete(path, true);
    
    		return job.waitForCompletion(true) ? 0 : -2;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = new Configuration();
    
    		int ret = ToolRunner.run(conf, new GenderDistinctMaxSalaryDriver(), args);
    		if ( ret == 0) {
    
    			FileSystem hdfs = FileSystem.get(conf);
    
    			Path path = new Path(args[1]+"_result/out.txt");
    			if ( hdfs.exists(path) ) hdfs.delete(path, true);
    
    			FileUtil.copyMerge(hdfs, new Path(args[1]), hdfs, path, true, conf, null);
    		}
    		System.exit(ret);
    	}
    }

    GenderDistinctMaxSalaryDriver.java
    다운로드

참조 : https://www.tutorialspoint.com/map_reduce/map_reduce_partitioner.htm

 

MapReduce - Partitioner

MapReduce - Partitioner A partitioner works like a condition in processing an input dataset. The partition phase takes place after the Map phase and before the Reduce phase. The number of partitioners is equal to the number of reducers. That means a partit

www.tutorialspoint.com

 

반응형
Comments