Notice
		                                        
										
                                    
                                        
                                    
                                        Recent Posts
                                        
                                    
                                        
                                    
                                        Recent Comments
                                        
                                    
                                        
                                    
                                        Link
                                        
                                    
                                | 일 | 월 | 화 | 수 | 목 | 금 | 토 | 
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | |||
| 5 | 6 | 7 | 8 | 9 | 10 | 11 | 
| 12 | 13 | 14 | 15 | 16 | 17 | 18 | 
| 19 | 20 | 21 | 22 | 23 | 24 | 25 | 
| 26 | 27 | 28 | 29 | 30 | 31 | 
                                        Tags
                                        
                                    
                                        
                                    - Spring
- Python
- 공정능력
- Java
- window
- 보조정렬
- vaadin
- es6
- react
- Eclipse
- hadoop
- plugin
- Kotlin
- NPM
- R
- IntelliJ
- SPC
- GIT
- Express
- table
- JavaScript
- Android
- mapreduce
- xPlatform
- tomcat
- mybatis
- Sqoop
- SQL
- MSSQL
- SSL
                                        Archives
                                        
                                    
                                        
                                    - Today
- Total
DBILITY
hadoop secondary sort exercise 3 ( 보조 정렬 실습 3 ) 본문
반응형
    
    
    
  일주일에 한 번은 해봐야 잊지 않을 거라는 근거 없는 믿음에 해본다.
삶의 절반 이전에나 봤던 수학도 해야 하고 참 어렵다.
윈도 환경 eclipse에서 테스트한 것이라 reducer는 하나다.
package com.dbility.hadoop.execise;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 *
 * Description
 *
 *
 * @author hyperrookie@gmail.com
 *
 * @version 1.0.0
 * @date 2017. 2. 28.
 */
public class Analysis extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
		if ( remainArgs.length != 2 ) {
			System.out.println("Usage : hadoop jar jarName [mainClass] <input> <output>");
			return -2;
		}
		Job job = new Job(getConf());
		job.setJobName("Analysis");
		job.setJarByClass(Analysis.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(AsaMapper.class);
		job.setMapOutputKeyClass(CompositeKey.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setCombinerClass(AsaCombiner.class);
		job.setNumReduceTasks(1);
		job.setPartitionerClass(AsaPartitioner.class);
		job.setGroupingComparatorClass(AsaGroupingComparator.class);
		job.setSortComparatorClass(AsaSortComparator.class);
		job.setReducerClass(AsaReducer.class);
		job.setOutputKeyClass(CompositeKey.class);
		job.setOutputValueClass(IntWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
		FileSystem hdfs = FileSystem.get(getConf());
		Path path = new Path(remainArgs[1]);
		if ( hdfs.exists(path) ) hdfs.delete(path, true);
		return job.waitForCompletion(true) ? 0 : -2;
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("fs.default.name", "file:///");
		conf.set("mapred.job.tracker", "local");
		conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
		conf.set("fs.file.impl", "com.dbility.hadoop.execise.Analysis$WindowsLocalFileSystem");
		conf.set("io.sort.mb", "512");
		args = new String[] {"d:/hadoop_test/*.csv","d:/hadoop_test/output"};
		int result = ToolRunner.run(conf, new Analysis(), args);
		if ( result == 0 ) {
			FileSystem hdfs = FileSystem.get(conf);
			Path org_path = new Path(args[1]);
			Path new_path = new Path(args[1]+"_result/merge.txt");
			if ( hdfs.exists(org_path) ) {
				if ( hdfs.exists(new_path) ){
					hdfs.delete(new_path, true);
				}
				FileUtil.copyMerge(hdfs, org_path, hdfs, new_path, true, conf, null);
			}
		}
		Runtime.getRuntime().exit(result);
	}
	public static class WindowsLocalFileSystem extends LocalFileSystem {
		public WindowsLocalFileSystem() {
			super();
		}
		public boolean mkdirs(final Path path, final FsPermission permission)
				throws IOException {
			final boolean result = super.mkdirs(path);
			return result;
		}
		public void setPermission(final Path path, final FsPermission permission)
				throws IOException {
			try {
				super.setPermission(path, permission);
			} catch ( final IOException ioe ) {
				System.err.println(ioe.getMessage());
			}
		}
	}
	public static class Parser {
		private String year;
		private int month;
		private int arrivalDelayTime = 0;
		private boolean arrivalDelayAvailable = false;
		public Parser(Text value) {
			String[] columns = value.toString().split(",");
			this.year = columns[0];
			this.month = Integer.parseInt(columns[1]);
			if ( !"NA".equals(columns[14]) ) {
				this.arrivalDelayAvailable = true;
				this.arrivalDelayTime = Integer.parseInt(columns[14]);
			}
		}
		public String getYear() {
			return year;
		}
		public int getMonth() {
			return month;
		}
		public int getArrivalDelayTime() {
			return arrivalDelayTime;
		}
		public boolean isArrivalDelayAvailable() {
			return arrivalDelayAvailable;
		}
	}
	public static class CompositeKey implements WritableComparable<CompositeKey> {
		private String year;
		private Integer month;
		public CompositeKey() {
		}
		public String getYear() {
			return year;
		}
		public void setYear(String year) {
			this.year = year;
		}
		public Integer getMonth() {
			return month;
		}
		public void setMonth(Integer month) {
			this.month = month;
		}
		public void readFields(DataInput in) throws IOException {
			this.year = WritableUtils.readString(in);
			this.month = in.readInt();
		}
		public void write(DataOutput out) throws IOException {
			WritableUtils.writeString(out, year);
			out.writeInt(month);
		}
		public int compareTo(CompositeKey o) {
			int cmp = this.year.compareTo(o.getYear());
			if ( cmp == 0 ) {
				cmp = this.month.compareTo(o.getMonth());
			}
			return cmp;
		}
		@Override
		public String toString() {
			return (new StringBuilder()).append(year).append("\t").append(month).toString();
		}
	}
	public static class AsaPartitioner extends Partitioner<CompositeKey, IntWritable> {
		@Override
		public int getPartition(CompositeKey key, IntWritable value,
				int numReduceTasks) {
			return key.getYear().hashCode() % numReduceTasks;
		}
	}
	public static class AsaGroupingComparator extends WritableComparator {
		public AsaGroupingComparator() {
			super(CompositeKey.class,true);
		}
		@SuppressWarnings("rawtypes")
		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			CompositeKey k1 = (CompositeKey)a;
			CompositeKey k2 = (CompositeKey)b;
			return k1.getYear().compareTo(k2.getYear());
		}
	}
	public static class AsaSortComparator extends WritableComparator {
		public AsaSortComparator() {
			super(CompositeKey.class,true);
		}
		@SuppressWarnings("rawtypes")
		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			CompositeKey k1 = (CompositeKey)a;
			CompositeKey k2 = (CompositeKey)b;
			int cmp = k1.getYear().compareTo(k2.getYear());
			if ( cmp != 0 ){
				return cmp;
			}
			return k1.getMonth() == k2.getMonth() ? 0 : ( k1.getMonth() < k2.getMonth() ? -1 : 1 );
		}
	}
	public static class AsaMapper extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {
		private CompositeKey outputKey = new CompositeKey();
		private static IntWritable outputValue = new IntWritable(1);
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			Parser parser = new Parser(value);
			if ( parser.isArrivalDelayAvailable() && parser.getArrivalDelayTime() > 0 ) {
				outputKey.setYear(parser.getYear());
				outputKey.setMonth(parser.getMonth());
				context.write(outputKey, outputValue);
			}
		}
	}
	public static class AsaReducer extends Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {
		private CompositeKey outputKey = new CompositeKey();
		private IntWritable outputValue = new IntWritable();
		private int sum ;
		@Override
		protected void reduce(CompositeKey key, Iterable<IntWritable> values,
				Context context)
				throws IOException, InterruptedException {
			sum = 0;
			int preMonth = key.getMonth();
			for (IntWritable value : values) {
				if ( preMonth != key.getMonth()) {
					outputKey.setYear(key.getYear());
					outputKey.setMonth(preMonth);
					outputValue.set(sum);
					context.write(outputKey, outputValue);
					sum = 0;
				}
				sum+=value.get();
				preMonth = key.getMonth();
			}
			if ( preMonth == key.getMonth()) {
				outputKey.setYear(key.getYear());
				outputKey.setMonth(preMonth);
				outputValue.set(sum);
				context.write(outputKey, outputValue);
			}
		}
	}
	public static class AsaCombiner extends Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {
		private IntWritable outputValue = new IntWritable();
		private int sum;
		@Override
		protected void reduce(CompositeKey key, Iterable<IntWritable> values,
				Context context)
				throws IOException, InterruptedException {
			sum = 0;
			for (IntWritable value : values) {
				sum+=value.get();
			}
			outputValue.set(sum);
			context.write(key, outputValue);
		}
	}
}
반응형
    
    
    
  'bigdata > hadoop' 카테고리의 다른 글
| hadoop Mapper이용 text파일을 SequenceFileFormat으로 저장 (0) | 2017.03.06 | 
|---|---|
| hadoop total sort ( 전체 정렬 ) (0) | 2017.03.01 | 
| hadoop secondary sort exercise 2 ( 보조 정렬 실습 2 ) (0) | 2017.02.21 | 
| hadoop partial sort exercise ( 부분 정렬 실습 ) (0) | 2017.02.16 | 
| hadoop secondary sort and multiple outputs exercise (0) | 2017.02.08 | 
                          Comments