DBILITY

hadoop partial sort exercise 2 ( 부분 정렬 실습 2 ) 본문

bigdata/hadoop

hadoop partial sort exercise 2 ( 부분 정렬 실습 2 )

DBILITY 2017. 3. 8. 23:07
반응형

윈도 환경 eclipse에서 테스트했는데,

결과 파일 맨 앞이 이상하다.. 왜일까?

책 없이 타이핑(?)했다는데 만족해야 하나보다^^;
MRUnit으로 테스트를 해봐야겠군... 기억이 안나지만...

 

PartialSortDriver.java
다운로드

package com.dbility.hadoop.execise;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile.Reader;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *
 * Description
 *
 *
 * @author hyperrookie@gmail.com
 *
 * @version 1.0.0
 * @date 2017. 3. 8.
 */
public class PartialSortDriver extends Configured implements Tool {

	public int run(String[] args) throws Exception {

		String[] remainArgs = new GenericOptionsParser(getConf(), args)
				.getRemainingArgs();

		if (remainArgs.length != 4) {
			System.out
					.println("Usage : hadoop jar jarName "
							+ "[mainClass] <input_path> <sequence_path> "
							+ "<map_path> <search_miles>");
			return -1;
		}

		FileSystem hdfs = FileSystem.get(getConf());
		Path input_path = new Path(remainArgs[0]);
		Path sequence_path = new Path(remainArgs[1]);
		Path map_path = new Path(remainArgs[2]);
		IntWritable searchMiles = new IntWritable(
				Integer.parseInt(remainArgs[3]));

		JobConf jobConf1 = new JobConf(getConf(), PartialSortDriver.class);
		jobConf1.setJobName("SequenceFileCreator");

		jobConf1.setMapperClass(SequenceMapper.class);
		jobConf1.setNumReduceTasks(0);

		jobConf1.setInputFormat(TextInputFormat.class);
		jobConf1.setOutputFormat(SequenceFileOutputFormat.class);
		jobConf1.setOutputKeyClass(IntWritable.class);
		jobConf1.setOutputValueClass(Text.class);

		SequenceFileOutputFormat.setCompressOutput(jobConf1, true);
		SequenceFileOutputFormat.setOutputCompressorClass(jobConf1,
				DefaultCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(jobConf1,
				CompressionType.BLOCK);

		FileInputFormat.addInputPath(jobConf1, input_path);
		FileOutputFormat.setOutputPath(jobConf1, sequence_path);

		if (!hdfs.exists(input_path)) {
			System.err.println("input path is not found!");
			return -2;
		}

		if (hdfs.exists(sequence_path)) {
			hdfs.delete(sequence_path, true);
		}

		RunningJob job1 = JobClient.runJob(jobConf1);

		if (job1.getJobState() != 2) {
			System.err.printf("%s is failure!\n", job1.getJobName());
			return -3;
		}

		JobConf jobConf2 = new JobConf(getConf(), PartialSortDriver.class);
		jobConf2.setJobName("MapFileCreater");

		jobConf2.setInputFormat(SequenceFileInputFormat.class);
		jobConf2.setOutputFormat(MapFileOutputFormat.class);
		jobConf2.setOutputKeyClass(IntWritable.class);
		// jobConf2.setOutputValueClass(Text.class);

		SequenceFileOutputFormat.setCompressOutput(jobConf2, true);
		SequenceFileOutputFormat.setOutputCompressorClass(jobConf2,
				DefaultCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(jobConf2,
				CompressionType.BLOCK);

		FileInputFormat.addInputPath(jobConf2, sequence_path);
		FileOutputFormat.setOutputPath(jobConf2, map_path);

		if (!hdfs.exists(sequence_path)) {
			System.err.println("sequence path is not found!");
			return -4;
		}

		if (hdfs.exists(map_path)) {
			hdfs.delete(map_path, true);
		}

		RunningJob job2 = JobClient.runJob(jobConf2);

		if (job2.getJobState() != 2) {
			System.err.printf("%s is failure!\n", job2.getJobName());
			return -5;
		}

		hdfs.delete(new Path(remainArgs[2] + "_SUCCESS"), true);

		Reader[] readers = MapFileOutputFormat.getReaders(hdfs, map_path,
				getConf());
		Partitioner<IntWritable, Text> partition = new HashPartitioner<IntWritable, Text>();

		IntWritable nextKey = new IntWritable();
		Text value = new Text();

		Reader reader = readers[partition.getPartition(searchMiles, value,
				readers.length)];

		Writable entry = reader.get(searchMiles, value);

		if (entry == null) {
			System.out.println("the requested key is not found!");
			return -6;
		}

		FSDataOutputStream outputStream = hdfs.create(new Path(input_path
				.getParent() + "\\output.txt"));
		do {
			// System.out.println(value);
			outputStream.writeUTF(value.toString() + "\n");
		} while (reader.next(nextKey, value) && searchMiles.equals(nextKey));

		outputStream.close();

		return 0;
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		conf.set("fs.default.name", "file:///");
		conf.set("mapred.job.tracker", "local");
		conf.set(
				"io.serializaitons",
				"org.apache.hadoop.io.serializer.JavaSerialization,"
						+ "org.apache.hadoop.io.serializer.WritableSerialization");
		conf.set("fs.file.impl",
				"com.dbility.hadoop.execise.PartialSortDriver$WindowsLocalFileSystem");
		conf.set("io.sort.mb", "512");

		args = new String[] { "d:/hadoop_test/input_data/",
				"d:/hadoop_test/sequence_data/", "d:/hadoop_test/map_data/",
				"100" };

		int result = ToolRunner.run(conf, new PartialSortDriver(), args);

		Runtime.getRuntime().exit(result);

	}

	public static class WindowsLocalFileSystem extends LocalFileSystem {

		public WindowsLocalFileSystem() {
			super();
		}

		public boolean mkdirs(final Path f, final FsPermission permission)
				throws IOException {
			final boolean result = super.mkdirs(f);
			return result;
		}

		public void setPermission(final Path p, final FsPermission permission)
				throws IOException {
			try {
				super.setPermission(p, permission);
			} catch (final IOException ioe) {
				System.err.println(ioe.getMessage());
			}
		}

	}

	public static class Parser {

		private String year;
		private Integer month;
		private Integer distance;
		private boolean distanceAvailable = false;

		public Parser(Text value) {

			String[] columns = value.toString().split(",");

			this.year = columns[0];
			this.month = Integer.parseInt(columns[1]);

			if (!"NA".equals(columns[18])) {
				this.distance = Integer.parseInt(columns[18]);
				if (this.distance > 0)
					this.distanceAvailable = true;
			}

		}

		public String getYear() {
			return year;
		}

		public Integer getMonth() {
			return month;
		}

		public Integer getDistance() {
			return distance;
		}

		public boolean isDistanceAvailable() {
			return distanceAvailable;
		}

	}

	public static class SequenceMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, IntWritable, Text> {

		private IntWritable outputKey = new IntWritable();

		public void map(LongWritable key, Text value,
				OutputCollector<IntWritable, Text> collector, Reporter reporter)
				throws IOException {
			try {
				Parser parser = new Parser(value);

				if (parser.isDistanceAvailable()) {
					outputKey.set(parser.getDistance());
					collector.collect(outputKey, value);
				}

			} catch (Exception ex) {
				outputKey.set(0);
				collector.collect(outputKey, value);
			}

		}

	}

}

참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판 - 정재화 지음

반응형
Comments