DBILITY

독거 가능성 100% 노후에 라면값이라도 하게 센스를 발휘합시다!😅
Please click on the ad so that I can pay for ramen in my old age!
点击一下广告,让老后吃个泡面钱吧!
老後にラーメン代だけでもするように広告を一回クリックしてください。

hadoop partial sort exercise ( 부분 정렬 실습 ) 본문

bigdata/hadoop

hadoop partial sort exercise ( 부분 정렬 실습 )

DBILITY 2017. 2. 16. 20:12
반응형

부분정렬(Partial Sort)은 매퍼출력을 MapFile로 변경해 데이터를 검색하는 방법이다.

맵태스크 실행시 파티셔너는 매퍼의 출력데이터가 전달될 리듀서를 결정하고,파티셔닝된 데이터는 키에 따라 정렬된다.

특정키에 대한 데이터 검색시 키가 저장된 맴파일에 접근해 조회한다.

 

부분정렬 3단계는 아래와 같다.

1.입력데이터를 시퀀스파일로 생성

2.시퀀스파일을 맵파일로 변경

3.맵파일에서 데이터 검색

 

※ org.apache.hadoop.mapred 패키지를 사용해 작성해야 함.

다음은 윈도우 환경 Eclipse상에서 테스트 되었습니다.

  1. SequenceFileCreateDriver.java
    다운로드
    package com.dbility.hadoop.execise;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SequenceFileCreateDriver extends Configured implements Tool {
    
    	public int run(String[] args) throws Exception {
    
    		JobConf conf = new JobConf(SequenceFileCreateDriver.class);
    		conf.setJobName("SequenceFileCreateDriver");
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set(
    				"io.serializations",
    				"org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
    		conf.set("fs.file.impl",
    				"com.dbility.hadoop.execise.SequenceFileCreateDriver$WindowsLocalFileSystem");
    
    
    		conf.setMapperClass(PartialMapper.class);
    		conf.setNumReduceTasks(0);
    
    		conf.setInputFormat(TextInputFormat.class);
    		conf.setOutputFormat(SequenceFileOutputFormat.class);
    		conf.setOutputKeyClass(IntWritable.class);
    		conf.setOutputValueClass(Text.class);
    
    		SequenceFileOutputFormat.setCompressOutput(conf, true);
    		SequenceFileOutputFormat.setOutputCompressorClass(conf, DefaultCodec.class);
    		SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
    		//SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
    		//SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
    
    		FileInputFormat.addInputPath(conf, new Path(args[0]));
    		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
    		FileSystem hdfs = FileSystem.get(conf);
    		Path path = new Path(args[1]);
    
    		if (hdfs.exists(path)) hdfs.delete(path, true);
    
    		JobClient.runJob(conf);
    
    
    		return 0;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = new Configuration();
    
    		args = new String[] { "d:/hadoop_test/*.csv","d:/hadoop_test/output" };
    
    		int result = ToolRunner.run(conf, new SequenceFileCreateDriver(), args);
    
    		System.exit(result);
    
    	}
    
    	public static class WindowsLocalFileSystem extends LocalFileSystem {
    
    		public WindowsLocalFileSystem() {
    			super();
    		}
    
    
    		public boolean mkdirs(final Path path, final FsPermission permission)
    				throws IOException {
    			final boolean result = super.mkdirs(path);
    			this.setPermission(path, permission);
    			return result;
    		}
    
    		public void setPermission(final Path path, final FsPermission permission)
    				throws IOException {
    			try {
    				super.setPermission(path, permission);
    			} catch ( final IOException ioe ) {
    				System.err.println(ioe.getMessage());
    			}
    		}
    	}
    
    	public static class Parser {
    
    		private String year;
    		private int month;
    		private int distance;
    		private int arrivalDelayTime = 0;
    		private boolean arrivalDelayAvailable = false;
    		private boolean distanceAvailable = false;
    
    		public Parser(Text value) {
    			String[] columns = value.toString().split(",");
    
    			this.year = columns[0];
    			this.month = Integer.parseInt(columns[1]);
    
    			if (!"NA".equals(columns[14])) {
    
    				this.arrivalDelayAvailable = true;
    				this.arrivalDelayTime = Integer.parseInt(columns[14]);
    
    			}
    
    			if (!"NA".equals(columns[18])) {
    
    				this.distanceAvailable = true;
    				this.distance = Integer.parseInt(columns[18]);
    
    			}
    		}
    
    		public String getYear() {
    			return year;
    		}
    
    		public int getMonth() {
    			return month;
    		}
    
    		public int getArrivalDelayTime() {
    			return arrivalDelayTime;
    		}
    
    		public boolean isArrivalDelayAvailable() {
    			return arrivalDelayAvailable;
    		}
    
    		public int getDistance() {
    			return distance;
    		}
    
    		public boolean isDistanceAvailable() {
    			return distanceAvailable;
    		}
    	}
    
    	public static class PartialMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
    
    		private IntWritable outputKey = new IntWritable();
    
    		public void map(LongWritable key, Text value,
    				OutputCollector<IntWritable, Text> collector, Reporter reporter)
    				throws IOException {
    			try {
    				Parser parser = new Parser(value);
    
    				if ( parser.isDistanceAvailable() ) {
    					outputKey.set(parser.getDistance());
    					collector.collect(outputKey, value);
    				}
    			} catch ( Exception e ) {
    				outputKey.set(0);
    				collector.collect(outputKey, value);
    				e.printStackTrace();
    			}
    		}
    	}
    }
  2. MapFileCreateDriver.java
    다운로드
    package com.dbility.hadoop.execise;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapFileOutputFormat;
    import org.apache.hadoop.mapred.SequenceFileInputFormat;
    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class MapFileCreateDriver extends Configured implements Tool {
    
    	public int run(String[] args) throws Exception {
    		JobConf conf = new JobConf(SequenceFileCreateDriver.class);
    		conf.setJobName("MapFileCreateDriver");
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set(
    				"io.serializations",
    				"org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
    		conf.set("fs.file.impl",
    				"com.dbility.hadoop.execise.MapFileCreateDriver$WindowsLocalFileSystem");
    
    		conf.setInputFormat(SequenceFileInputFormat.class);
    		conf.setOutputFormat(MapFileOutputFormat.class);
    		conf.setOutputKeyClass(IntWritable.class);
    
    		SequenceFileOutputFormat.setCompressOutput(conf, true);
    		SequenceFileOutputFormat.setOutputCompressorClass(conf, DefaultCodec.class);
    		SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
    
    
    		FileInputFormat.addInputPath(conf, new Path(args[0]));
    		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    
    		FileSystem hdfs = FileSystem.get(conf);
    		Path path = new Path(args[1]);
    
    		if (hdfs.exists(path)) hdfs.delete(path, true);
    
    		JobClient.runJob(conf);
    
    		return 0;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    
    		args = new String[] { "d:/hadoop_test/output/","d:/hadoop_test/mapoutput" };
    
    		int res = ToolRunner.run(conf, new MapFileCreateDriver(), args);
    
    		System.exit(res);
    	}
    
    	public static class WindowsLocalFileSystem extends LocalFileSystem {
    
    		public WindowsLocalFileSystem() {
    			super();
    		}
    
    
    		public boolean mkdirs(final Path path, final FsPermission permission)
    				throws IOException {
    			final boolean result = super.mkdirs(path);
    			this.setPermission(path, permission);
    			return result;
    		}
    
    		public void setPermission(final Path path, final FsPermission permission)
    				throws IOException {
    			try {
    				super.setPermission(path, permission);
    			} catch ( final IOException ioe ) {
    				System.err.println(ioe.getMessage());
    			}
    		}
    	}
    }
  3. SearchValueList.java
    다운로드
    package com.dbility.hadoop.execise;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.MapFile.Reader;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapred.MapFileOutputFormat;
    import org.apache.hadoop.mapred.Partitioner;
    import org.apache.hadoop.mapred.lib.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SearchValueList extends Configured implements Tool {
    
    	public int run(String[] args) throws Exception {
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		Path path = new Path(remainArgs[0]);
    		FileSystem hdfs = FileSystem.get(getConf());
    
    		Reader[] readers = MapFileOutputFormat.getReaders(hdfs, path, getConf());
    
    		IntWritable key = new IntWritable();
    		key.set(Integer.parseInt(remainArgs[1]));
    
    		Text value = new Text();
    
    		Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>();
    		Reader reader = readers[partitioner.getPartition(key, value, readers.length)];
    
    		Writable entry = reader.get(key, value);
    		if ( entry == null ){
    			System.out.println("The requested key was not found.");
    		}
    
    		IntWritable nextKey = new IntWritable();
    
    		do {
    			System.out.println(value.toString());
    		} while (reader.next(nextKey, value) && key.equals(nextKey));
    
    		return 0;
    
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = new Configuration();
    
    		args = new String[] {"d:/hadoop_test/mapoutput/","100"};
    		int res = ToolRunner.run(conf, new SearchValueList(), args);
    		System.exit(res);
    	}
    }

참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

 

반응형
Comments