DBILITY

hadoop secondary sort 4 ( 보조정렬 실습 ) 본문

bigdata/hadoop

hadoop secondary sort 4 ( 보조정렬 실습 )

DBILITY 2017. 3. 12. 23:06
반응형

ASA항공운항 기록에서 년/월별 지연도착 통계를 정렬 후 SequenceFile로 저장하고, 읽는 걸 해봄.

SecondarySortDriver.java
다운로드

package com.dbility.hadoop.execise;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SecondarySortDriver extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		String[] remainArgs = new GenericOptionsParser(getConf(), args)
				.getRemainingArgs();

		if (remainArgs.length != 2) {
			System.out
					.println("Usage : hadoop jar jarName [mainClass] <input_path> <output_path>");
			return -1;
		}

		Job job = new Job(getConf());
		job.setJarByClass(SecondarySortDriver.class);
		job.setJobName("SecondarySort");

		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(SecMapper.class);
		job.setMapOutputKeyClass(CompositeKey.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setCombinerClass(SecCombiner.class);
		job.setNumReduceTasks(1);

		job.setPartitionerClass(SecParitioner.class);
		job.setGroupingComparatorClass(SecGroupingComparator.class);
		job.setSortComparatorClass(SecSortComparator.class);

		job.setReducerClass(SecReducer.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		job.setOutputKeyClass(CompositeKey.class);
		job.setOutputValueClass(IntWritable.class);

		SequenceFileOutputFormat.setCompressOutput(job, true);
		SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);

		Path input_path = new Path(remainArgs[0]);
		Path output_path = new Path(remainArgs[1]);

		FileSystem hdfs = FileSystem.get(getConf());

		FileInputFormat.addInputPath(job, input_path);
		FileOutputFormat.setOutputPath(job, output_path);

		if ( hdfs.exists(output_path) ) hdfs.delete(output_path, true);

		return job.waitForCompletion(true) ? 0 : -2;
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		conf.set("fs.default.name", "file:///");
		conf.set("mapred.job.tracker", "local");
		conf.set(
				"io.serializatilons",
				"org.apache.hadoop.io.serializer.JavaSerialization,"
						+ "org.apache.hadoop.io.serializer.WritableSerialization");
		conf.set("fs.file.impl",
				"com.dbility.hadoop.execise.SecondarySortDriver$WindowsLocalFileSystem");

		args = new String[] { "d:/hadoop_test/input_data/",
				"d:/hadoop_test/sec_data/" };

		int result = ToolRunner.run(conf, new SecondarySortDriver(), args);

		if ( result == 0 ) {

			FileSystem hdfs = FileSystem.get(conf);
			Path read_path = new Path(args[1]+"part-r-00000");

			if (!hdfs.exists(read_path)) {
				System.out.println("requested path is not found!!");
				result = -2;
			} else {
				hdfs.delete(new Path(args[1]+"_SUCCESS"), true);

				SequenceFile.Reader reader = new SequenceFile.Reader(hdfs, read_path, conf);
				Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
				Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);

				while(reader.next(key, value)) {
					System.out.printf("%s\t%s\n",key.toString(),value.toString());
				}

				IOUtils.closeStream(reader);
			}
		}

		Runtime.getRuntime().exit(result);
	}

	public static class WindowsLocalFileSystem extends LocalFileSystem {

		public WindowsLocalFileSystem() {
			super();
		}

		@Override
		public boolean mkdirs(final Path f, final FsPermission permission)
				throws IOException {
			final boolean result = super.mkdirs(f);
			this.setPermission(f, permission);
			return result;
		}

		@Override
		public void setPermission(final Path p, final FsPermission permission)
				throws IOException {
			try {
				super.setPermission(p, permission);
			} catch (final IOException e) {
				System.err.println(e.getMessage());
			}

		}
	}

	public static class Parser {

		private String year;
		private Integer month;
		private Integer arrivalDelayTime = 0;
		private boolean arrivalDelayAvailable = false;

		public Parser(Text value) {

			String[] columns = value.toString().split(",");
			this.year = columns[0];
			this.month = Integer.parseInt(columns[1]);

			if (!"NA".equals(columns[14])) {
				this.arrivalDelayTime = Integer.parseInt(columns[14]);
				if (this.arrivalDelayTime > 0) {
					this.arrivalDelayAvailable = true;
				}
			}

		}

		public String getYear() {
			return year;
		}

		public Integer getMonth() {
			return month;
		}

		public Integer getArrivalDelayTime() {
			return arrivalDelayTime;
		}

		public boolean isArrivalDelayAvailable() {
			return arrivalDelayAvailable;
		}
	}

	public static class CompositeKey implements
			WritableComparable<CompositeKey> {

		private String year;
		private Integer month;

		public CompositeKey() {

		}

		public String getYear() {
			return year;
		}

		public void setYear(String year) {
			this.year = year;
		}

		public Integer getMonth() {
			return month;
		}

		public void setMonth(Integer month) {
			this.month = month;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.year = WritableUtils.readString(in);
			this.month = in.readInt();
		}

		@Override
		public void write(DataOutput out) throws IOException {
			WritableUtils.writeString(out, year);
			out.writeInt(month);
		}

		@Override
		public int compareTo(CompositeKey o) {
			int result = this.year.compareTo(o.getYear());

			if (result == 0)
				result = this.month.compareTo(o.getMonth());
			return result;
		}

		@Override
		public String toString() {
			return (new StringBuilder()).append(year).append("\t").append(month)
					.toString();
		}
	}

	public static class SecParitioner extends
			Partitioner<CompositeKey, IntWritable> {

		@Override
		public int getPartition(CompositeKey key, IntWritable value,
				int numReduceTasks) {
			return key.getYear().hashCode() % numReduceTasks;
		}
	}

	public static class SecGroupingComparator extends WritableComparator {

		public SecGroupingComparator() {
			super(CompositeKey.class, true);
		}

		@SuppressWarnings("rawtypes")
		@Override
		public int compare(WritableComparable a, WritableComparable b) {

			CompositeKey k1 = (CompositeKey) a;
			CompositeKey k2 = (CompositeKey) b;

			return k1.getYear().compareTo(k2.getYear());
		}
	}

	public static class SecSortComparator extends WritableComparator {

		public SecSortComparator() {
			super(CompositeKey.class, true);
		}

		@SuppressWarnings("rawtypes")
		@Override
		public int compare(WritableComparable a, WritableComparable b) {

			CompositeKey k1 = (CompositeKey) a;
			CompositeKey k2 = (CompositeKey) b;

			int cmp = k1.getYear().compareTo(k2.getYear());

			if (cmp != 0)
				return cmp;

			return k1.getMonth() == k2.getMonth() ? 0 : (k1.getMonth() < k2
					.getMonth() ? -1 : 1);
		}
	}

	public static class SecMapper extends
			Mapper<LongWritable, Text, CompositeKey, IntWritable> {

		private CompositeKey outputKey = new CompositeKey();
		private final static IntWritable outputValue = new IntWritable(1);

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			Parser parser = new Parser(value);

			if (parser.isArrivalDelayAvailable()) {
				outputKey.setYear(parser.getYear());
				outputKey.setMonth(parser.getMonth());
				context.write(outputKey, outputValue);
			}
		}

	}

	public static class SecCombiner extends
			Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {

		private IntWritable outputValue = new IntWritable();
		private int sum;

		@Override
		protected void reduce(CompositeKey key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			sum = 0;
			for (IntWritable value : values) {
				sum += value.get();
			}

			outputValue.set(sum);
			context.write(key, outputValue);
		}
	}

	public static class SecReducer extends
			Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {

		private CompositeKey outputKey = new CompositeKey();
		private IntWritable outputValue = new IntWritable();
		private int sum;

		@Override
		protected void reduce(CompositeKey key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			sum = 0;
			int preMonth = key.getMonth();

			for (IntWritable value : values) {

				if (preMonth != key.getMonth()) {
					outputKey.setYear(key.getYear());
					outputKey.setMonth(preMonth);
					outputValue.set(sum);
					context.write(outputKey, outputValue);
					sum = 0;
				}

				sum += value.get();
				preMonth = key.getMonth();
			}

			if (preMonth == key.getMonth()) {
				outputKey.setYear(key.getYear());
				outputKey.setMonth(preMonth);
				outputValue.set(sum);
				context.write(outputKey, outputValue);
			}

		}
	}

}

 

참고서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

반응형
Comments