Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- SQL
- R
- IntelliJ
- NPM
- Python
- JavaScript
- Spring
- Kotlin
- 공정능력
- hadoop
- window
- 보조정렬
- Express
- Sqoop
- mapreduce
- react
- SSL
- vaadin
- plugin
- Java
- SPC
- GIT
- xPlatform
- MSSQL
- table
- Android
- es6
- Eclipse
- mybatis
- tomcat
Archives
- Today
- Total
DBILITY
hadoop ChainMapper, Reducer exercise 본문
반응형
하나의 맵리듀스 잡에서 여러 개의 Mapper와 Reducer를 실행할 수 있게 ChainMapper와 ChainReducer를 제공한다.
1.x.x 버전에서는 org.apache.hadoop.mapred하위의 MapReduceBase를 상속받고 Mapper와 Reducer를 구현해야 한다.
Mapper -> Mapper -> Reducer의 순서로 응용해 보았다.
- TokenizerMapper
package com.dbility.hadoop.chain; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import com.dbility.hadoop.util.AsaPerformParser; public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException { /*StringTokenizer stk = new StringTokenizer(value.toString(), " "); while (stk.hasMoreElements()) { String word = (String) stk.nextElement(); collector.collect(new Text(word), new IntWritable(1)); }*/ AsaPerformParser parser = new AsaPerformParser(value); collector.collect(new Text(parser.getUniqueCarrier()), new IntWritable(1)); } }
- UpperCaseMapper
package com.dbility.hadoop.chain; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class UpperCaseMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException { collector.collect(new Text(key.toString().toUpperCase()), value); } }
- WordCountReducer
package com.dbility.hadoop.chain; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum+= values.next().get(); } collector.collect(key, new IntWritable(sum)); } }
- WordCountChainDriver
package com.dbility.hadoop.chain; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.ChainMapper; import org.apache.hadoop.mapred.lib.ChainReducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountChainDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if ( remainArgs.length != 2 ) { System.out.println("Usage : <input> <output>"); return -1; } JobConf conf = new JobConf(getConf(), WordCountChainDriver.class); conf.setJobName("chainTest"); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(conf, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(conf, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if(hdfs.exists(path)) hdfs.delete(path, true); JobConf tmConf = new JobConf(false); ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, tmConf); JobConf umConf = new JobConf(false); ChainMapper.addMapper(conf, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, umConf); JobConf wrConf = new JobConf(false); ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, wrConf); /*JobClient client = new JobClient(conf); RunningJob job = client.submitJob(conf); client.monitorAndPrintJob(conf, job);*/ RunningJob job = JobClient.runJob(conf); return job.getJobState()==2 ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem"); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); args = new String[] {"d:/hadoop_test/2008.csv","d:/hadoop_test/output2008"}; Runtime.getRuntime().exit(ToolRunner.run(conf, new WordCountChainDriver(), args)); } }
참고서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop MapReduce 프로그래밍 요소 (0) | 2016.11.24 |
---|---|
hadoop MRUnit 사용하기 (0) | 2016.11.17 |
하둡 정리 2 (0) | 2016.11.13 |
hadoop MultipleOutputs을 이용한 ASA 통계 작성 (0) | 2016.11.12 |
output Key,Value Class 미지정시 오류 (0) | 2016.11.11 |
Comments