Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- tomcat
- react
- Express
- JavaScript
- window
- mybatis
- Eclipse
- IntelliJ
- xPlatform
- Python
- Android
- NPM
- SQL
- SPC
- hadoop
- GIT
- Spring
- table
- 공정능력
- R
- Java
- SSL
- vaadin
- mapreduce
- Kotlin
- plugin
- Sqoop
- es6
- MSSQL
- 보조정렬
Archives
- Today
- Total
DBILITY
성별,연령대별 최고연봉 구하기 본문
반응형
3개의 Partition으로 나눠 output이 3개 생성되므로, FileUtil.copyMerge를 사용해 결과를 합쳐 보았다.
물론,이게 정상적인 방법인지는 공부를 더 해봐야 확인이 가능하겠다.
- input.txt다운로드
-
package com.dbility.hadoop.partitioner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class GenderDistinctMaxSalaryDriver extends Configured implements Tool { public static class GenderDistinctMaxSalaryMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] cols = value.toString().split("\t"); int age = Integer.parseInt(cols[2]); String ages; if ( age <= 20){ ages = "0-20"; } else if ( age > 20 && age <= 30 ) { ages = "21-30"; } else { ages = "31~"; } context.write(new Text(cols[3]+"\t"+ages), new Text(value)); } } public static class GenderDistinctMaxSalaryPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] cols = value.toString().split("\t"); int age = Integer.parseInt(cols[2]); if ( numReduceTasks == 0) return 0; if ( age <= 20){ return 0; } else if ( age > 20 && age <= 30 ) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } public static class GenderDistinctMaxSalaryReducer extends Reducer<Text, Text, Text, IntWritable> { int max = -1; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { max = -1; for (Text value : values) { String[] cols = value.toString().split("\t"); if ( Integer.parseInt(cols[4]) > max) max = Integer.parseInt(cols[4]); } context.write(key, new IntWritable(max)); } } public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (remainArgs.length != 2) { System.out.println("Usage : hadoop jar JobJarFile [mainClass] <input> <output>"); return -1; } Job job = new Job(getConf(), "GenderAgesDistinctMaxSalaryDriver"); job.setJarByClass(GenderDistinctMaxSalaryDriver.class); job.setMapperClass(GenderDistinctMaxSalaryMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(GenderDistinctMaxSalaryPartitioner.class); job.setReducerClass(GenderDistinctMaxSalaryReducer.class); job.setNumReduceTasks(3); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if ( hdfs.exists(path) ) hdfs.delete(path, true); return job.waitForCompletion(true) ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int ret = ToolRunner.run(conf, new GenderDistinctMaxSalaryDriver(), args); if ( ret == 0) { FileSystem hdfs = FileSystem.get(conf); Path path = new Path(args[1]+"_result/out.txt"); if ( hdfs.exists(path) ) hdfs.delete(path, true); FileUtil.copyMerge(hdfs, new Path(args[1]), hdfs, path, true, conf, null); } System.exit(ret); } }
GenderDistinctMaxSalaryDriver.java다운로드
참조 : https://www.tutorialspoint.com/map_reduce/map_reduce_partitioner.htm
MapReduce - Partitioner
MapReduce - Partitioner A partitioner works like a condition in processing an input dataset. The partition phase takes place after the Map phase and before the Reduce phase. The number of partitioners is equal to the number of reducers. That means a partit
www.tutorialspoint.com
반응형
'bigdata > hadoop' 카테고리의 다른 글
입력한 공항을 도착지로 년도별 최대 지연 도착 항공편 구하기 (0) | 2016.11.30 |
---|---|
hadoop job list, kill (0) | 2016.11.30 |
hadoop MapReduce 프로그래밍 요소 (0) | 2016.11.24 |
hadoop MRUnit 사용하기 (0) | 2016.11.17 |
hadoop ChainMapper, Reducer exercise (0) | 2016.11.14 |
Comments