Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- react
- R
- IntelliJ
- Sqoop
- Express
- Spring
- mapreduce
- Kotlin
- 보조정렬
- plugin
- MSSQL
- GIT
- Android
- mybatis
- SSL
- table
- Python
- tomcat
- 공정능력
- SPC
- Java
- window
- JavaScript
- Eclipse
- vaadin
- hadoop
- SQL
- xPlatform
- NPM
- es6
Archives
- Today
- Total
DBILITY
hadoop MultipleOutputs을 이용한 ASA 통계 작성 본문
반응형
- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dbility.hadoop</groupId> <artifactId>asa-flight-statistics</artifactId> <version>1.0.0</version> <properties> <scp.user>hadoop</scp.user> <scp.password>hadoop</scp.password> <scp.host>big-master</scp.host> <scp.copy2dir>home/hadoop/</scp.copy2dir> <java.version>1.6</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> <scope>provided</scope> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.3.2</version> <configuration> <archive> <manifest> <mainClass>com.dbility.hadoop.multioutput.AsaDelayDriver</mainClass> </manifest> </archive> <excludes> <exclude>**/property/**</exclude> </excludes> </configuration> <executions> <execution> <id>default-jar</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <includes> <include>${project.groupId}:${project.artifactId}</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/maven/**</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> <version>1.8</version> <dependencies> <dependency> <groupId>org.apache.ant</groupId> <artifactId>ant-jsch</artifactId> <version>1.9.7</version> </dependency> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.54</version> </dependency> </dependencies> <executions> <execution> <phase>install</phase> <goals> <goal>run</goal> </goals> <configuration> <tasks> <scp file="${project.basedir}/target/${project.artifactId}.jar" todir="${scp.user}:${scp.password}@${scp.host}:/${scp.copy2dir}" trust="true"> </scp> </tasks> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
- logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n </pattern> </encoder> </appender> <logger name="org.apache.hadoop.mapred.JobClient" additivity="false"> <level value="info" /> <appender-ref ref="STDOUT" /> </logger> <root level="warn"> <appender-ref ref="STDOUT" /> </root> </configuration>
- Mapper Class
package com.dbility.hadoop.multioutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.dbility.hadoop.util.AsaPerformParser; /** * * Description * * * @author hyperrookie@gmail.com * * @version 1.0.0 * @date 2016. 11. 12. */ public class AsaDelayMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outKey = new Text(); private static final IntWritable outValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { AsaPerformParser parser = new AsaPerformParser(value); if ( parser.isDepDelayAvailable() && parser.getDepDelayTime() > 0 ){ outKey.set("D,"+parser.getYear()+","+parser.getMonth()); context.write(outKey, outValue); } if ( parser.isArrDelayAvailable() && parser.getArrDelayTime() > 0 ){ outKey.set("A,"+parser.getYear()+","+parser.getMonth()); context.write(outKey, outValue); } } }
- Reducer Class
package com.dbility.hadoop.multioutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; /** * * Description * * * @author hyperrookie@gmail.com * * @version 1.0.0 * @date 2016. 11. 12. */ public class AsaDelayReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private Text outKey = new Text(); private IntWritable outValue = new IntWritable(); private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, IntWritable>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { String[] columns = key.toString().split(","); int sum; if ( columns[0].equals("D") ){ sum = 0; for (IntWritable value : values) { sum+=value.get(); } outKey.set(columns[1]+","+columns[2]); outValue.set(sum); mos.write("departure", outKey, outValue); } if ( columns[0].equals("A") ){ sum = 0; for (IntWritable value : values) { sum+=value.get(); } outKey.set(columns[1]+","+columns[2]); outValue.set(sum); mos.write("arrival", outKey, outValue); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } }
- Driver Class
package com.dbility.hadoop.multioutput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * Description * * * @author hyperrookie@gmail.com * * @version 1.0.0 * @date 2016. 11. 12. */ public class AsaDelayDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if ( remainArgs.length != 2 ) { System.out.println("Usage : hadoop jar JarFile mainClass <input> <output>"); return -1; } Job job = new Job(getConf(), "asaPerformAnlaysis"); job.setJarByClass(AsaDelayDriver.class); job.setMapperClass(AsaDelayMapper.class); job.setReducerClass(AsaDelayReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if (hdfs.exists(path)) hdfs.delete(path,true); return job.waitForCompletion(true) ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem"); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); args = new String[] {"d:/hadoop_test/2008.csv","d:/hadoop_test/output2008"}; Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDelayDriver(), args)); } }
참고서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop ChainMapper, Reducer exercise (0) | 2016.11.14 |
---|---|
하둡 정리 2 (0) | 2016.11.13 |
output Key,Value Class 미지정시 오류 (0) | 2016.11.11 |
hadoop WordCount GenericOptionParser 적용 예제 (0) | 2016.11.06 |
하둡 정리 1 (0) | 2016.10.11 |
Comments