Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- R
- Express
- Sqoop
- IntelliJ
- window
- 공정능력
- SPC
- Kotlin
- react
- es6
- SQL
- Eclipse
- JavaScript
- MSSQL
- vaadin
- hadoop
- plugin
- Android
- xPlatform
- Python
- NPM
- 보조정렬
- mybatis
- SSL
- Java
- mapreduce
- tomcat
- Spring
- table
- GIT
Archives
- Today
- Total
DBILITY
hadoop secondary sort exercise ( 보조 정렬 실습 ) 본문
반응형
ASA운항기록 데이터(약 12G) 대상으로,보조정렬을 실습해 보았습니다.
지연도착시간을 30분/1시간 이내,1시간이상으로 구분하였으며,파일레이아웃은 [ 연 / 월 / 구분 / 횟수 ]형태로 출력됩니다.
- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dbility.hadoop</groupId> <artifactId>asa-analysis</artifactId> <version>1.0.0</version> <properties> <java.version>1.7</java.version> <scp.user>hadoop</scp.user> <scp.password>hadoop</scp.password> <scp.host>big-master</scp.host> <scp.copy2dir>/home/hadoop</scp.copy2dir> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.1.7</version> <scope>provided</scope> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.3.2</version> <configuration> <archive> <manifest> <mainClass> com.dbility.hadoop.sort.secondary.AsaDriver </mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>default-jar</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> <version>1.8</version> <dependencies> <dependency> <groupId>org.apache.ant</groupId> <artifactId>ant-jsch</artifactId> <version>1.9.7</version> </dependency> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.54</version> </dependency> </dependencies> <executions> <execution> <phase>install</phase> <goals> <goal>run</goal> </goals> <configuration> <tasks> <scp file="${project.basedir}/target/${project.artifactId}.jar" todir="${scp.user}:${scp.password}@${scp.host}:/${scp.copy2dir}" trust="true"> </scp> </tasks> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
- logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n </pattern> </encoder> </appender> <logger name="org.apache.hadoop.mapred.JobClient" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> <root level="warn"> <appender-ref ref="STDOUT" /> </root> </configuration>
- WindowsLocalFileSystem.java
package com.dbility.hadoop.util; import java.io.IOException; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; public class WindowsLocalFileSystem extends LocalFileSystem { public WindowsLocalFileSystem() { super(); } @Override public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { final boolean result = super.mkdirs(f); this.setPermission(f, permission); return result; } public void setPermission(final Path p, final FsPermission permission) throws IOException { try { super.setPermission(p, permission); } catch (final IOException ioe) { System.err.println(ioe.getMessage()); } } }
- PerformParser.java
package com.dbility.hadoop.util; import org.apache.hadoop.io.Text; public class PerformParser { private String year; private int month; private String division; private int arrivalDelayTime = 0; private boolean arrivalDelay = false; public PerformParser(Text value) { String[] cols = value.toString().split(","); year = cols[0]; month = Integer.parseInt(cols[1]); if ( !"NA".equals(cols[14]) ){ arrivalDelay = true; arrivalDelayTime = Integer.parseInt(cols[14]); if ( arrivalDelayTime > 0 ) { if ( arrivalDelayTime <= 30 ) { division = "1"; } else if ( arrivalDelayTime <= 60 ) { division = "2"; } else { division = "3"; } } } } public String getYear() { return year; } public int getMonth() { return month; } public String getDivision() { return division; } public boolean isArrivalDelay() { return arrivalDelay; } public int getArrivalDelayTime() { return arrivalDelayTime; } }
- CompositeKey.java
package com.dbility.hadoop.sort.secondary; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; public class CompositeKey implements WritableComparable<CompositeKey> { private String year; private Integer month; private String division; public CompositeKey() { } public CompositeKey(String year, int month, String division) { this.year = year; this.month = month; this.division = division; } public String getYear() { return year; } public void setYear(String year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public String getDivision() { return division; } public void setDivision(String division) { this.division = division; } @Override public void readFields(DataInput in) throws IOException { year = WritableUtils.readString(in); month = in.readInt(); division = WritableUtils.readString(in); } @Override public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, year); out.writeInt(month); WritableUtils.writeString(out, division); } @Override public int compareTo(CompositeKey o) { int result = year.compareTo(o.getYear()); if (result == 0) result = month == o.getMonth() ? 0 : (month < o.getMonth() ? -1 : 1); if (result == 0) result = division.compareTo(o.getDivision()); return result; } @Override public String toString() { return (new StringBuilder()).append(year).append("\t").append(month) .append("\t").append(division).toString(); } }
- AsaMapper.java
package com.dbility.hadoop.sort.secondary; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.dbility.hadoop.util.PerformParser; public class AsaMapper extends Mapper<LongWritable, Text, CompositeKey, IntWritable> { private CompositeKey outputKey = new CompositeKey(); private final static IntWritable outputValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { PerformParser parser = new PerformParser(value); if ( parser.isArrivalDelay() ) { if ( parser.getArrivalDelayTime() > 0 ) { outputKey.setYear(parser.getYear()); outputKey.setMonth(parser.getMonth()); outputKey.setDivision(parser.getDivision()); context.write(outputKey, outputValue); } } } }
- AsaPartitioner.java
package com.dbility.hadoop.sort.secondary; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class AsaPartitioner extends Partitioner<CompositeKey, IntWritable> { @Override public int getPartition(CompositeKey key, IntWritable value, int numReduceTask) { return key.getYear().hashCode() % numReduceTask; /*int result = 1; if ( Integer.parseInt(key.getYear()) < 2000 ) result = 0; return result;*/ } }
- AsaGroupingComparator.java
package com.dbility.hadoop.sort.secondary; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class AsaGroupingComparator extends WritableComparator { public AsaGroupingComparator() { super(CompositeKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey k1 = (CompositeKey)a; CompositeKey k2 = (CompositeKey)b; return k1.getYear().compareTo(k2.getYear()); } }
- AsaSortComparator.java
package com.dbility.hadoop.sort.secondary; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class AsaSortComparator extends WritableComparator { public AsaSortComparator() { super(CompositeKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey k1 = (CompositeKey)a; CompositeKey k2 = (CompositeKey)b; int result = k1.getYear().compareTo(k2.getYear()); if ( result != 0 ) return result; result = k1.getMonth() == k2.getMonth() ? 0 : ( k1.getMonth() < k2.getMonth() ? -1 : 1) ; if ( result != 0 ) return result; return k1.getDivision().compareTo(k2.getDivision()); } }
- AsaReducer.java
package com.dbility.hadoop.sort.secondary; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; public class AsaReducer extends Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> { private CompositeKey outputKey = new CompositeKey(); private IntWritable outputValue = new IntWritable(); @Override protected void reduce(CompositeKey key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; int preMonth = key.getMonth(); String preDivision = key.getDivision(); for (IntWritable value : values) { if ( key.getMonth() != preMonth || !key.getDivision().equals(preDivision) ) { outputKey.setYear(key.getYear()); outputKey.setMonth(preMonth); outputKey.setDivision(preDivision); outputValue.set(sum); context.write(outputKey, outputValue); sum = 0; } sum+=value.get(); preMonth = key.getMonth(); preDivision = key.getDivision(); } if ( key.getMonth() == preMonth && key.getDivision().equals(preDivision) ) { outputKey.setYear(key.getYear()); outputKey.setMonth(preMonth); outputKey.setDivision(preDivision); outputValue.set(sum); context.write(outputKey, outputValue); } } }
- AsaDriver.java
package com.dbility.hadoop.sort.secondary; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AsaDriver extends Configured implements Tool { private static final Logger log = LoggerFactory.getLogger(AsaDriver.class); public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if ( remainArgs.length != 2 ) { System.out.println("Usage : <input> <output>"); return -1; } Job job = new Job(getConf(), "Test"); job.setJarByClass(AsaDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(AsaMapper.class); job.setMapOutputKeyClass(CompositeKey.class); job.setMapOutputValueClass(IntWritable.class); job.setCombinerClass(AsaReducer.class); job.setNumReduceTasks(2); job.setPartitionerClass(AsaPartitioner.class); job.setSortComparatorClass(AsaSortComparator.class); job.setGroupingComparatorClass(AsaGroupingComparator.class); job.setReducerClass(AsaReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(CompositeKey.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if ( hdfs.exists(path) ) hdfs.delete(path, true); return job.waitForCompletion(true) ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /*conf.set("defalut.fs.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem"); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," +"org.apache.hadoop.io.serializer.WritableSerialization"); conf.set("mapred.map.child.java.opts", "-Xms1024M -Xmx1024M"); conf.set("io.sort.mb", "512"); args = new String[] {"d:/hadoop_test/2008.csv","d:/hadoop_test/ouput"};*/ Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDriver(), args)); } }
참고 서적 : 시작하세요! 하둡프로그래밍 개정2판(위키북스) - 정재화지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop partial sort exercise ( 부분 정렬 실습 ) (0) | 2017.02.16 |
---|---|
hadoop secondary sort and multiple outputs exercise (0) | 2017.02.08 |
hadoop Secondary Sort ( 보조 정렬 ) (0) | 2016.12.06 |
입력한 공항을 도착지로 년도별 최대 지연 도착 항공편 구하기 (0) | 2016.11.30 |
hadoop job list, kill (0) | 2016.11.30 |
Comments