DBILITY

hadoop secondary sort and multiple outputs exercise 본문

bigdata/hadoop

hadoop secondary sort and multiple outputs exercise

DBILITY 2017. 2. 8. 20:42
반응형

항공지연통계 보조정렬+다중출력 적용 실습을 그냥 해 보았습니다.

  1. AsaDelaySortMultiouputsDriver.java
    다운로드
    package com.dbility.hadoop.sort.secondary;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.io.WritableUtils;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     *
     * Description
     *
     *
     * @author hyperrookie@gmail.com
     *
     * @version 1.0.0
     * @date 2017. 2. 8.
     */
    public class AsaDelaySortMultiouputsDriver extends Configured implements Tool {
    
    	public static class WindowsLocalFileSystem extends LocalFileSystem {
    
    		public WindowsLocalFileSystem() {
    			super();
    		}
    
    		public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
    			final boolean result = super.mkdirs(f);
    			this.setPermission(f, permission);
    			return result;
    		}
    
    		public void setPermission(final Path p, final FsPermission permission)
    				throws IOException {
    			try {
    				super.setPermission(p, permission);
    			} catch (IOException ioe) {
    				System.err.println(ioe.getMessage());
    			}
    		}
    
    	}
    
    	public static class PerformParser {
    
    		private String year;
    		private int month;
    		private int arrivalDelayTime = 0;
    		private int departureDelayTime = 0;
    		private boolean arrivalDelayAvailable = false;
    		private boolean departureDelayAvailable = false;
    
    		public PerformParser(Text value) {
    			String[] columns = value.toString().split(",");
    
    			year = columns[0];
    			month = Integer.parseInt(columns[1]);
    
    			if ( !"NA".equals(columns[14]) ) {
    				arrivalDelayAvailable = true;
    				arrivalDelayTime = Integer.parseInt(columns[14]);
    			}
    
    			if ( !"NA".equals(columns[15]) ) {
    				departureDelayAvailable = true;
    				departureDelayTime = Integer.parseInt(columns[15]);
    			}
    		}
    
    		public String getYear() {
    			return year;
    		}
    
    		public int getMonth() {
    			return month;
    		}
    
    		public int getArrivalDelayTime() {
    			return arrivalDelayTime;
    		}
    
    		public int getDepartureDelayTime() {
    			return departureDelayTime;
    		}
    
    		public boolean isArrivalDelayAvailable() {
    			return arrivalDelayAvailable;
    		}
    
    		public boolean isDepartureDelayAvailable() {
    			return departureDelayAvailable;
    		}
    	}
    
    	public static class CompositeKey implements WritableComparable<CompositeKey> {
    
    		private String year;
    		private Integer month;
    
    		public CompositeKey() {
    		}
    
    		public String getYear() {
    			return year;
    		}
    
    		public void setYear(String year) {
    			this.year = year;
    		}
    
    		public int getMonth() {
    			return month;
    		}
    
    		public void setMonth(int month) {
    			this.month = month;
    		}
    
    		public void readFields(DataInput in) throws IOException {
    			year = WritableUtils.readString(in);
    			month = in.readInt();
    		}
    
    		public void write(DataOutput out) throws IOException {
    			WritableUtils.writeString(out, year);
    			out.writeInt(month);
    		}
    
    		public int compareTo(CompositeKey obj) {
    
    			int result = year.compareTo(obj.getYear());
    
    			if ( result == 0)
    				result = month == obj.month ? 0 : ( month < obj.month ? -1 : 1 );
    			return result;
    		}
    
    		@Override
    		public String toString() {
    			return (new StringBuilder()).append(year).append("\t").append(month).toString();
    		}
    	}
    
    	public static class AsaSortPartitioner extends Partitioner<CompositeKey, IntWritable> {
    		@Override
    		public int getPartition(CompositeKey key, IntWritable value, int numReduceTasks) {
    			return key.getYear().split(",")[0].hashCode() % numReduceTasks;
    		}
    	}
    
    	public static class AsaSortGroupingComparator extends WritableComparator {
    
    		public AsaSortGroupingComparator() {
    			super(CompositeKey.class,true);
    		}
    
    		@SuppressWarnings("rawtypes")
    		@Override
    		public int compare(WritableComparable a, WritableComparable b) {
    			CompositeKey k1 = (CompositeKey)a;
    			CompositeKey k2 = (CompositeKey)b;
    			return k1.getYear().compareTo(k2.getYear());
    		}
    	}
    
    	public static class AsaSortComparator extends WritableComparator {
    
    		public AsaSortComparator() {
    			super(CompositeKey.class,true);
    		}
    
    		@Override
    		public int compare(Object a, Object b) {
    
    			CompositeKey k1 = (CompositeKey)a;
    			CompositeKey k2 = (CompositeKey)b;
    
    			int cmp = k1.getYear().compareTo(k2.getYear());
    			if ( cmp != 0)
    				return cmp;
    
    			return k1.getMonth() == k2.getMonth() ? 0 :(  k1.getMonth() < k2.getMonth() ? -1 : 1 );
    		}
    	}
    
    	public enum AsaCounter {
    		ArrivalDelayThirtyUnder,ArrivalDelaySixtyUnder,ArrivalDelaySixtyAndOver,
    		DepartureDelayThirtyUnder,DepartureDelaySixtyUnder,DepartureDelaySixtyAndOver
    	}
    
    	public static class AsaSortMapper extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {
    
    		private CompositeKey outputKey = new CompositeKey();
    		private final static IntWritable outputValue = new IntWritable(1);
    
    		@Override
    		protected void map(LongWritable key, Text value,Context context)
    				throws IOException, InterruptedException {
    
    			PerformParser parser = new PerformParser(value);
    
    			if ( parser.isArrivalDelayAvailable() && parser.getArrivalDelayTime() > 0 ) {
    
    				if ( parser.getArrivalDelayTime() < 30 )
    					context.getCounter(AsaCounter.ArrivalDelayThirtyUnder).increment(1);
    				else if ( parser.getArrivalDelayTime() < 60 )
    					context.getCounter(AsaCounter.ArrivalDelaySixtyUnder).increment(1);
    				else
    					context.getCounter(AsaCounter.ArrivalDelaySixtyAndOver).increment(1);
    
    				outputKey.setYear("A,"+parser.getYear());
    				outputKey.setMonth(parser.getMonth());
    				context.write(outputKey, outputValue);
    			}
    
    			if ( parser.isDepartureDelayAvailable() && parser.getDepartureDelayTime() > 0 ) {
    
    				if ( parser.getDepartureDelayTime() < 30 )
    					context.getCounter(AsaCounter.DepartureDelayThirtyUnder).increment(1);
    				else if ( parser.getDepartureDelayTime() < 60 )
    					context.getCounter(AsaCounter.DepartureDelaySixtyUnder).increment(1);
    				else
    					context.getCounter(AsaCounter.DepartureDelaySixtyAndOver).increment(1);
    
    				outputKey.setYear("D,"+parser.getYear());
    				outputKey.setMonth(parser.getMonth());
    				context.write(outputKey, outputValue);
    			}
    
    		}
    	}
    
    	public static class AsaSortCombiner extends Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {
    
    		private IntWritable outputValue = new IntWritable();
    		private int sum;
    
    		@Override
    		protected void reduce(CompositeKey key, Iterable<IntWritable> values,
    				Context context)
    				throws IOException, InterruptedException {
    
    			sum = 0;
    
    			for (IntWritable value : values) {
    				sum+=value.get();
    			}
    
    			outputValue.set(sum);
    			context.write(key, outputValue);
    		}
    
    	}
    
    
    	public static class AsaSortReducer extends Reducer<CompositeKey, IntWritable, Text, IntWritable> {
    
    		private Text outputKey = new Text();
    		private IntWritable outputValue = new IntWritable();
    		private int sum = 0;
    		private MultipleOutputs<Text, IntWritable> mos;
    
    		@Override
    		protected void setup(Context context) throws IOException, InterruptedException {
    			mos = new MultipleOutputs<Text, IntWritable>(context);
    		}
    
    		@Override
    		protected void reduce(CompositeKey key, Iterable<IntWritable> values,
    				Context context)
    				throws IOException, InterruptedException {
    
    			String[] columns = key.getYear().split(",");
    			int preMonth = key.getMonth();
    
    			if ( columns[0].equals("A") ) {
    
    				for (IntWritable value : values) {
    
    					if ( preMonth != key.getMonth() ){
    						outputKey.set(columns[1]+"\t"+preMonth);
    						outputValue.set(sum);
    						mos.write("arrival", outputKey, outputValue);
    						sum = 0;
    					}
    
    					sum+=value.get();
    					preMonth = key.getMonth();
    				}
    
    				if ( preMonth == key.getMonth() ){
    					outputKey.set(columns[1]+"\t"+preMonth);
    					outputValue.set(sum);
    					mos.write("arrival", outputKey, outputValue);
    				}
    			}
    
    			if ( columns[0].equals("D") ) {
    
    				for (IntWritable value : values) {
    
    					if ( preMonth != key.getMonth() ){
    						outputKey.set(columns[1]+"\t"+preMonth);
    						outputValue.set(sum);
    						mos.write("departure", outputKey, outputValue);
    						sum = 0;
    					}
    
    					sum+=value.get();
    					preMonth = key.getMonth();
    				}
    
    				if ( preMonth == key.getMonth() ){
    					outputKey.set(columns[1]+"\t"+preMonth);
    					outputValue.set(sum);
    					mos.write("departure", outputKey, outputValue);
    				}
    			}
    
    		}
    
    		@Override
    		protected void cleanup(Context context) throws IOException, InterruptedException {
    			mos.close();
    		}
    	}
    
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		if ( remainArgs.length != 2 ) {
    			System.out.println("Usage : hadoop jar jarName.jar mainClass <input path> <ouput path>");
    			return -1;
    		}
    
    		Job job = new Job(getConf(), "WordCountSort");
    
    		job.setJarByClass(AsaDelaySortMultiouputsDriver.class);
    		job.setInputFormatClass(TextInputFormat.class);
    
    		job.setMapperClass(AsaSortMapper.class);
    		job.setMapOutputKeyClass(CompositeKey.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		job.setCombinerClass(AsaSortCombiner.class);
    
    		job.setNumReduceTasks(2);
    		job.setPartitionerClass(AsaSortPartitioner.class);
    
    		job.setGroupingComparatorClass(AsaSortGroupingComparator.class);
    		job.setSortComparatorClass(AsaSortComparator.class);
    
    		job.setReducerClass(AsaSortReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		//job.setOutputFormatClass(TextOutputFormat.class);
    
    		MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
    
    		FileSystem hdfs = FileSystem.get(getConf());
    		Path path = new Path(remainArgs[1]);
    
    		if ( hdfs.exists(path) ) hdfs.delete(path, true);
    
    		return job.waitForCompletion(true) ? 0 : -2;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = new Configuration();
    
    		/*conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
    		conf.set("fs.file.impl", "com.dbility.hadoop.sort.secondary.AsaDelaySortMultiouputsDriver$WindowsLocalFileSystem");
    
    		args = new String[] {"d:/hadoop_test/*","d:/hadoop_test/output/"};*/
    
    		Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDelaySortMultiouputsDriver(), args));
    	}
    }
  2. arrival-r-00000
    2007	1	286334
    2007	2	284152
    2007	3	293360
    2007	4	273055
    2007	5	275332
    2007	6	326446
    2007	7	326559
    2007	8	317197
    2007	9	225751
    2007	10	270098
    2007	11	242722
    2007	12	332449
    2008	1	611876
    2008	2	278902
    2008	3	294556
    2008	4	256142
    2008	5	254673
    2008	6	295897
    2008	7	264630
    2008	8	239737
    2008	9	169959
    2008	10	183582
    2008	11	181506
    2008	12	280493
  3. departure-r-00000
    2007	1	536270
    2007	2	259288
    2007	3	276261
    2007	4	249097
    2007	5	241699
    2007	6	307986
    2007	7	307864
    2007	8	298530
    2007	9	195615
    2007	10	231129
    2007	11	217557
    2007	12	304011
    2008	1	551959
    2008	2	252765
    2008	3	271969
    2008	4	220864
    2008	5	220614
    2008	6	271014
    2008	7	253632
    2008	8	231349
    2008	9	147061
    2008	10	162531
    2008	11	157278
    2008	12	263949

참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

 

반응형
Comments