Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- SQL
- window
- 보조정렬
- es6
- GIT
- mybatis
- react
- table
- R
- Kotlin
- vaadin
- tomcat
- plugin
- xPlatform
- JavaScript
- SSL
- hadoop
- mapreduce
- Eclipse
- 공정능력
- SPC
- NPM
- Express
- IntelliJ
- Android
- Java
- Python
- Sqoop
- MSSQL
- Spring
Archives
- Today
- Total
DBILITY
hadoop partial sort exercise ( 부분 정렬 실습 ) 본문
반응형
부분정렬(Partial Sort)은 매퍼출력을 MapFile로 변경해 데이터를 검색하는 방법이다.
맵태스크 실행시 파티셔너는 매퍼의 출력데이터가 전달될 리듀서를 결정하고,파티셔닝된 데이터는 키에 따라 정렬된다.
특정키에 대한 데이터 검색시 키가 저장된 맴파일에 접근해 조회한다.
부분정렬 3단계는 아래와 같다.
1.입력데이터를 시퀀스파일로 생성
2.시퀀스파일을 맵파일로 변경
3.맵파일에서 데이터 검색
※ org.apache.hadoop.mapred 패키지를 사용해 작성해야 함.
다음은 윈도우 환경 Eclipse상에서 테스트 되었습니다.
-
package com.dbility.hadoop.execise; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SequenceFileCreateDriver extends Configured implements Tool { public int run(String[] args) throws Exception { JobConf conf = new JobConf(SequenceFileCreateDriver.class); conf.setJobName("SequenceFileCreateDriver"); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set( "io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); conf.set("fs.file.impl", "com.dbility.hadoop.execise.SequenceFileCreateDriver$WindowsLocalFileSystem"); conf.setMapperClass(PartialMapper.class); conf.setNumReduceTasks(0); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(Text.class); SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat.setOutputCompressorClass(conf, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); //SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class); //SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); FileSystem hdfs = FileSystem.get(conf); Path path = new Path(args[1]); if (hdfs.exists(path)) hdfs.delete(path, true); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); args = new String[] { "d:/hadoop_test/*.csv","d:/hadoop_test/output" }; int result = ToolRunner.run(conf, new SequenceFileCreateDriver(), args); System.exit(result); } public static class WindowsLocalFileSystem extends LocalFileSystem { public WindowsLocalFileSystem() { super(); } public boolean mkdirs(final Path path, final FsPermission permission) throws IOException { final boolean result = super.mkdirs(path); this.setPermission(path, permission); return result; } public void setPermission(final Path path, final FsPermission permission) throws IOException { try { super.setPermission(path, permission); } catch ( final IOException ioe ) { System.err.println(ioe.getMessage()); } } } public static class Parser { private String year; private int month; private int distance; private int arrivalDelayTime = 0; private boolean arrivalDelayAvailable = false; private boolean distanceAvailable = false; public Parser(Text value) { String[] columns = value.toString().split(","); this.year = columns[0]; this.month = Integer.parseInt(columns[1]); if (!"NA".equals(columns[14])) { this.arrivalDelayAvailable = true; this.arrivalDelayTime = Integer.parseInt(columns[14]); } if (!"NA".equals(columns[18])) { this.distanceAvailable = true; this.distance = Integer.parseInt(columns[18]); } } public String getYear() { return year; } public int getMonth() { return month; } public int getArrivalDelayTime() { return arrivalDelayTime; } public boolean isArrivalDelayAvailable() { return arrivalDelayAvailable; } public int getDistance() { return distance; } public boolean isDistanceAvailable() { return distanceAvailable; } } public static class PartialMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { private IntWritable outputKey = new IntWritable(); public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> collector, Reporter reporter) throws IOException { try { Parser parser = new Parser(value); if ( parser.isDistanceAvailable() ) { outputKey.set(parser.getDistance()); collector.collect(outputKey, value); } } catch ( Exception e ) { outputKey.set(0); collector.collect(outputKey, value); e.printStackTrace(); } } } }
-
package com.dbility.hadoop.execise; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapFileCreateDriver extends Configured implements Tool { public int run(String[] args) throws Exception { JobConf conf = new JobConf(SequenceFileCreateDriver.class); conf.setJobName("MapFileCreateDriver"); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set( "io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); conf.set("fs.file.impl", "com.dbility.hadoop.execise.MapFileCreateDriver$WindowsLocalFileSystem"); conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputFormat(MapFileOutputFormat.class); conf.setOutputKeyClass(IntWritable.class); SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat.setOutputCompressorClass(conf, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); FileSystem hdfs = FileSystem.get(conf); Path path = new Path(args[1]); if (hdfs.exists(path)) hdfs.delete(path, true); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); args = new String[] { "d:/hadoop_test/output/","d:/hadoop_test/mapoutput" }; int res = ToolRunner.run(conf, new MapFileCreateDriver(), args); System.exit(res); } public static class WindowsLocalFileSystem extends LocalFileSystem { public WindowsLocalFileSystem() { super(); } public boolean mkdirs(final Path path, final FsPermission permission) throws IOException { final boolean result = super.mkdirs(path); this.setPermission(path, permission); return result; } public void setPermission(final Path path, final FsPermission permission) throws IOException { try { super.setPermission(path, permission); } catch ( final IOException ioe ) { System.err.println(ioe.getMessage()); } } } }
-
package com.dbility.hadoop.execise; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.MapFile.Reader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.MapFileOutputFormat; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SearchValueList extends Configured implements Tool { public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); Path path = new Path(remainArgs[0]); FileSystem hdfs = FileSystem.get(getConf()); Reader[] readers = MapFileOutputFormat.getReaders(hdfs, path, getConf()); IntWritable key = new IntWritable(); key.set(Integer.parseInt(remainArgs[1])); Text value = new Text(); Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>(); Reader reader = readers[partitioner.getPartition(key, value, readers.length)]; Writable entry = reader.get(key, value); if ( entry == null ){ System.out.println("The requested key was not found."); } IntWritable nextKey = new IntWritable(); do { System.out.println(value.toString()); } while (reader.next(nextKey, value) && key.equals(nextKey)); return 0; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); args = new String[] {"d:/hadoop_test/mapoutput/","100"}; int res = ToolRunner.run(conf, new SearchValueList(), args); System.exit(res); } }
참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop secondary sort exercise 3 ( 보조 정렬 실습 3 ) (0) | 2017.02.28 |
---|---|
hadoop secondary sort exercise 2 ( 보조 정렬 실습 2 ) (0) | 2017.02.21 |
hadoop secondary sort and multiple outputs exercise (0) | 2017.02.08 |
hadoop secondary sort exercise ( 보조 정렬 실습 ) (0) | 2017.02.06 |
hadoop Secondary Sort ( 보조 정렬 ) (0) | 2016.12.06 |
Comments