DBILITY

hadoop ChainMapper, Reducer exercise 본문

bigdata/hadoop

hadoop ChainMapper, Reducer exercise

DBILITY 2016. 11. 14. 20:59
반응형

하나의 맵리듀스 잡에서 여러 개의 Mapper와 Reducer를 실행할 수 있게 ChainMapper와 ChainReducer를 제공한다.
1.x.x 버전에서는 org.apache.hadoop.mapred하위의 MapReduceBase를 상속받고 Mapper와 Reducer를 구현해야 한다.
Mapper -> Mapper -> Reducer의 순서로 응용해 보았다.

  1. TokenizerMapper
    package com.dbility.hadoop.chain;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    
    import com.dbility.hadoop.util.AsaPerformParser;
    
    public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    	@Override
    	public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
    			throws IOException {
    		/*StringTokenizer stk = new StringTokenizer(value.toString(), " ");
    		while (stk.hasMoreElements()) {
    			String word = (String) stk.nextElement();
    			collector.collect(new Text(word), new IntWritable(1));
    		}*/
    		AsaPerformParser parser = new AsaPerformParser(value);
    		collector.collect(new Text(parser.getUniqueCarrier()), new IntWritable(1));
    	}
    }
  2. UpperCaseMapper
    package com.dbility.hadoop.chain;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    
    public class UpperCaseMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
    
    	@Override
    	public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
    			throws IOException {
    		collector.collect(new Text(key.toString().toUpperCase()), value);
    	}
    }
  3. WordCountReducer
    package com.dbility.hadoop.chain;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    
    public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    	@Override
    	public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector, Reporter reporter)
    			throws IOException {
    		int sum = 0;
    		while (values.hasNext()) {
    			sum+= values.next().get();
    		}
    		collector.collect(key, new IntWritable(sum));
    	}
    }
  4. WordCountChainDriver
    package com.dbility.hadoop.chain;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.RunningJob;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.mapred.lib.ChainMapper;
    import org.apache.hadoop.mapred.lib.ChainReducer;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class WordCountChainDriver extends Configured implements Tool {
    
    	@Override
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		if ( remainArgs.length != 2 ) {
    			System.out.println("Usage : <input> <output>");
    			return -1;
    		}
    
    		JobConf conf = new JobConf(getConf(), WordCountChainDriver.class);
    		conf.setJobName("chainTest");
    
    		conf.setInputFormat(TextInputFormat.class);
    		conf.setOutputFormat(TextOutputFormat.class);
    		conf.setOutputKeyClass(Text.class);
    		conf.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(conf, new Path(remainArgs[0]));
    		FileOutputFormat.setOutputPath(conf, new Path(remainArgs[1]));
    
    		FileSystem hdfs = FileSystem.get(getConf());
    		Path path = new Path(remainArgs[1]);
    		if(hdfs.exists(path)) hdfs.delete(path, true);
    
    		JobConf tmConf = new JobConf(false);
    		ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, tmConf);
    
    		JobConf umConf = new JobConf(false);
    		ChainMapper.addMapper(conf, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, umConf);
    
    		JobConf wrConf = new JobConf(false);
    		ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, wrConf);
    
    		/*JobClient client = new JobClient(conf);
    		RunningJob job = client.submitJob(conf);
    		client.monitorAndPrintJob(conf, job);*/
    
    		RunningJob job = JobClient.runJob(conf);
    
    		return job.getJobState()==2 ? 0 : -2;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem");
    		conf.set("io.serializations",
    				"org.apache.hadoop.io.serializer.JavaSerialization,"
    				+ "org.apache.hadoop.io.serializer.WritableSerialization");
    
    		args = new String[] {"d:/hadoop_test/2008.csv","d:/hadoop_test/output2008"};
    
    		Runtime.getRuntime().exit(ToolRunner.run(conf, new WordCountChainDriver(), args));
    	}
    }

asa-flight-statistics.zip
다운로드

참고서적 : 시작하세요!  하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

반응형

'bigdata > hadoop' 카테고리의 다른 글

hadoop MapReduce 프로그래밍 요소  (0) 2016.11.24
hadoop MRUnit 사용하기  (0) 2016.11.17
하둡 정리 2  (0) 2016.11.13
hadoop MultipleOutputs을 이용한 ASA 통계 작성  (0) 2016.11.12
output Key,Value Class 미지정시 오류  (0) 2016.11.11
Comments