Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- 공정능력
- Eclipse
- SSL
- NPM
- SPC
- SQL
- hadoop
- xPlatform
- Python
- 보조정렬
- plugin
- tomcat
- Sqoop
- Spring
- R
- JavaScript
- vaadin
- mybatis
- table
- window
- react
- IntelliJ
- Android
- Express
- mapreduce
- MSSQL
- GIT
- Java
- Kotlin
- es6
Archives
- Today
- Total
DBILITY
성별,연령대별 최고연봉 구하기 본문
반응형
3개의 Partition으로 나눠 output이 3개 생성되므로, FileUtil.copyMerge를 사용해 결과를 합쳐 보았다.
물론,이게 정상적인 방법인지는 공부를 더 해봐야 확인이 가능하겠다.
-
package com.dbility.hadoop.partitioner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class GenderDistinctMaxSalaryDriver extends Configured implements Tool { public static class GenderDistinctMaxSalaryMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] cols = value.toString().split("\t"); int age = Integer.parseInt(cols[2]); String ages; if ( age <= 20){ ages = "0-20"; } else if ( age > 20 && age <= 30 ) { ages = "21-30"; } else { ages = "31~"; } context.write(new Text(cols[3]+"\t"+ages), new Text(value)); } } public static class GenderDistinctMaxSalaryPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] cols = value.toString().split("\t"); int age = Integer.parseInt(cols[2]); if ( numReduceTasks == 0) return 0; if ( age <= 20){ return 0; } else if ( age > 20 && age <= 30 ) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } public static class GenderDistinctMaxSalaryReducer extends Reducer<Text, Text, Text, IntWritable> { int max = -1; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { max = -1; for (Text value : values) { String[] cols = value.toString().split("\t"); if ( Integer.parseInt(cols[4]) > max) max = Integer.parseInt(cols[4]); } context.write(key, new IntWritable(max)); } } public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (remainArgs.length != 2) { System.out.println("Usage : hadoop jar JobJarFile [mainClass] <input> <output>"); return -1; } Job job = new Job(getConf(), "GenderAgesDistinctMaxSalaryDriver"); job.setJarByClass(GenderDistinctMaxSalaryDriver.class); job.setMapperClass(GenderDistinctMaxSalaryMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(GenderDistinctMaxSalaryPartitioner.class); job.setReducerClass(GenderDistinctMaxSalaryReducer.class); job.setNumReduceTasks(3); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if ( hdfs.exists(path) ) hdfs.delete(path, true); return job.waitForCompletion(true) ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int ret = ToolRunner.run(conf, new GenderDistinctMaxSalaryDriver(), args); if ( ret == 0) { FileSystem hdfs = FileSystem.get(conf); Path path = new Path(args[1]+"_result/out.txt"); if ( hdfs.exists(path) ) hdfs.delete(path, true); FileUtil.copyMerge(hdfs, new Path(args[1]), hdfs, path, true, conf, null); } System.exit(ret); } }
참조 : https://www.tutorialspoint.com/map_reduce/map_reduce_partitioner.htm
반응형
'bigdata > hadoop' 카테고리의 다른 글
입력한 공항을 도착지로 년도별 최대 지연 도착 항공편 구하기 (0) | 2016.11.30 |
---|---|
hadoop job list, kill (0) | 2016.11.30 |
hadoop MapReduce 프로그래밍 요소 (0) | 2016.11.24 |
hadoop MRUnit 사용하기 (0) | 2016.11.17 |
hadoop ChainMapper, Reducer exercise (0) | 2016.11.14 |
Comments