DBILITY

hadoop total sort exercise ( 전체 정렬 실습 ) 본문

bigdata/hadoop

hadoop total sort exercise ( 전체 정렬 실습 )

DBILITY 2017. 3. 12. 17:54
반응형

리눅스 완전 분산 모드에서 테스트되었으며,
정상적인 방법은 아니지만,윈도우환경 eclipse상에선 분산 환경이 아니니 DistributedCache부분을 주석처리, codec을 DefaultCodec으로 변경하고, 테스트하였다.

package com.dbility.hadoop.execise;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TotalSortDriver extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {

		String[] remainArgs = new GenericOptionsParser(getConf(), args)
				.getRemainingArgs();

		if (remainArgs.length != 2) {
			System.out
					.println("Usage : hadoop jar jarName [mainClass] <input_path> <ouput_path>");
			return -1;
		}

		JobConf jobConf01 = new JobConf(getConf(), TotalSortDriver.class);

		jobConf01.setJobName("TotalSortTest");

		jobConf01.setInputFormat(SequenceFileInputFormat.class);
		jobConf01.setOutputFormat(SequenceFileOutputFormat.class);
		jobConf01.setOutputKeyClass(IntWritable.class);
		jobConf01.setOutputValueClass(Text.class);

		SequenceFileOutputFormat.setCompressOutput(jobConf01, true);
		SequenceFileOutputFormat.setOutputCompressorClass(jobConf01,
				GzipCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(jobConf01,
				CompressionType.BLOCK);

		Path input_path = new Path(remainArgs[0]);
		Path output_path = new Path(remainArgs[1]);

		FileInputFormat.addInputPath(jobConf01, input_path);
		FileOutputFormat.setOutputPath(jobConf01, output_path);

		Path input_dir = FileInputFormat.getInputPaths(jobConf01)[0];
		input_dir = input_dir.makeQualified(input_dir.getFileSystem(jobConf01));

		Path partitionFile = new Path(input_dir, "_partitions");
		TotalOrderPartitioner.setPartitionFile(jobConf01, partitionFile);

		InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
				0.1, 1000, 10);
		InputSampler.writePartitionFile(jobConf01, sampler);

		URI partitionUri = new URI(partitionFile.toString()+"#_partitions");
		DistributedCache.addCacheFile(partitionUri, jobConf01);
		DistributedCache.createSymlink(jobConf01);

		RunningJob job01 = JobClient.runJob(jobConf01);

		return job01.getJobState() == 2 ? 0 : -2;
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Runtime.getRuntime().exit(
				ToolRunner.run(conf, new TotalSortDriver(), args));

	}

}

참고 서적 : 안녕하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

 

반응형
Comments