DBILITY

hadoop MultipleOutputs을 이용한 ASA 통계 작성 본문

bigdata/hadoop

hadoop MultipleOutputs을 이용한 ASA 통계 작성

DBILITY 2016. 11. 12. 18:32
반응형

https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

 

MultipleOutputs (Hadoop 1.2.1 API)

static void addNamedOutput(Job job, String namedOutput, Class  outputFormatClass, Class  keyClass, Class  valueClass)           Adds a named output for the job.

hadoop.apache.org

  1. pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.dbility.hadoop</groupId>
    	<artifactId>asa-flight-statistics</artifactId>
    	<version>1.0.0</version>
    	<properties>
    		<scp.user>hadoop</scp.user>
    		<scp.password>hadoop</scp.password>
    		<scp.host>big-master</scp.host>
    		<scp.copy2dir>home/hadoop/</scp.copy2dir>
    		<java.version>1.6</java.version>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-core</artifactId>
    			<version>1.2.1</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.7.20</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>jcl-over-slf4j</artifactId>
    			<version>1.7.20</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>ch.qos.logback</groupId>
    			<artifactId>logback-classic</artifactId>
    			<version>1.1.7</version>
    			<scope>provided</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<finalName>${project.artifactId}</finalName>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-jar-plugin</artifactId>
    				<version>2.3.2</version>
    				<configuration>
    					<archive>
    						<manifest>
    							<mainClass>com.dbility.hadoop.multioutput.AsaDelayDriver</mainClass>
    						</manifest>
    					</archive>
    					<excludes>
    						<exclude>**/property/**</exclude>
    					</excludes>
    				</configuration>
    				<executions>
    					<execution>
    						<id>default-jar</id>
    						<phase>package</phase>
    						<goals>
    							<goal>jar</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>2.3.2</version>
    				<configuration>
    					<source>${java.version}</source>
    					<target>${java.version}</target>
    				</configuration>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-shade-plugin</artifactId>
    				<version>2.4.1</version>
    				<executions>
    					<execution>
    						<phase>package</phase>
    						<goals>
    							<goal>shade</goal>
    						</goals>
    						<configuration>
    							<artifactSet>
    								<includes>
    									<include>${project.groupId}:${project.artifactId}</include>
    								</includes>
    							</artifactSet>
    							<filters>
    								<filter>
    									<artifact>*:*</artifact>
    									<excludes>
    										<exclude>META-INF/maven/**</exclude>
    									</excludes>
    								</filter>
    							</filters>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-antrun-plugin</artifactId>
    				<version>1.8</version>
    				<dependencies>
    					<dependency>
    						<groupId>org.apache.ant</groupId>
    						<artifactId>ant-jsch</artifactId>
    						<version>1.9.7</version>
    					</dependency>
    					<dependency>
    						<groupId>com.jcraft</groupId>
    						<artifactId>jsch</artifactId>
    						<version>0.1.54</version>
    					</dependency>
    				</dependencies>
    				<executions>
    					<execution>
    						<phase>install</phase>
    						<goals>
    							<goal>run</goal>
    						</goals>
    						<configuration>
    							<tasks>
    								<scp file="${project.basedir}/target/${project.artifactId}.jar"
    									todir="${scp.user}:${scp.password}@${scp.host}:/${scp.copy2dir}"
    									trust="true">
    								</scp>
    							</tasks>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    		</plugins>
    	</build>
    </project>
  2. logback.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
    	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    		<encoder>
    			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
    			</pattern>
    		</encoder>
    	</appender>
    
    	<logger name="org.apache.hadoop.mapred.JobClient" additivity="false">
    		<level value="info" />
    		<appender-ref ref="STDOUT" />
    	</logger>
    
    	<root level="warn">
    		<appender-ref ref="STDOUT" />
    	</root>
    </configuration>
  3. Mapper Class

    package com.dbility.hadoop.multioutput;
     
    import java.io.IOException;
     
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
     
    import com.dbility.hadoop.util.AsaPerformParser;
     
    /**
     *
     * Description
     *
     *
     * @author hyperrookie@gmail.com
     *
     * @version 1.0.0
     * @date 2016. 11. 12.
     */
    public class AsaDelayMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
     
        private Text outKey = new Text();
        private static final IntWritable outValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
     
            AsaPerformParser parser = new AsaPerformParser(value);
     
            if ( parser.isDepDelayAvailable() && parser.getDepDelayTime() > 0 ){
                outKey.set("D,"+parser.getYear()+","+parser.getMonth());
                context.write(outKey, outValue);
            }
     
            if ( parser.isArrDelayAvailable() && parser.getArrDelayTime() > 0 ){
                outKey.set("A,"+parser.getYear()+","+parser.getMonth());
                context.write(outKey, outValue);
            }
        }
    }​
  4. Reducer Class

    package com.dbility.hadoop.multioutput;
     
    import java.io.IOException;
     
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
     
    /**
     *
     * Description
     *
     *
     * @author hyperrookie@gmail.com
     *
     * @version 1.0.0
     * @date 2016. 11. 12.
     */
    public class AsaDelayReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
     
     
        private Text outKey = new Text();
        private IntWritable outValue = new IntWritable();
        private MultipleOutputs<Text, IntWritable> mos;
     
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }
     
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
     
            String[] columns = key.toString().split(",");
            int sum;
            if ( columns[0].equals("D") ){
                sum = 0;
     
                for (IntWritable value : values) {
                    sum+=value.get();
                }
                outKey.set(columns[1]+","+columns[2]);
                outValue.set(sum);
                mos.write("departure", outKey, outValue);
            }
     
            if ( columns[0].equals("A") ){
                sum = 0;
     
                for (IntWritable value : values) {
                    sum+=value.get();
                }
                outKey.set(columns[1]+","+columns[2]);
                outValue.set(sum);
                mos.write("arrival", outKey, outValue);
            }
        }
     
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            mos.close();
        }
    }​
  5. Driver Class
    package com.dbility.hadoop.multioutput;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * 
     * Description
     * 
     * 
     * @author hyperrookie@gmail.com
     * 
     * @version 1.0.0
     * @date 2016. 11. 12.
     */
    public class AsaDelayDriver extends Configured implements Tool {
    
    	@Override
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		if ( remainArgs.length != 2 ) {
    			System.out.println("Usage : hadoop jar JarFile mainClass <input> <output>");
    			return -1;
    		}
    
    		Job job = new Job(getConf(), "asaPerformAnlaysis");
    
    		job.setJarByClass(AsaDelayDriver.class);
    		job.setMapperClass(AsaDelayMapper.class);
    		job.setReducerClass(AsaDelayReducer.class);
    
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
    
    		FileSystem hdfs = FileSystem.get(getConf());
    		Path path = new Path(remainArgs[1]);
    		if (hdfs.exists(path)) hdfs.delete(path,true);
    
    		return job.waitForCompletion(true) ? 0 : -2;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem");
    		conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
    
    		args = new String[] {"d:/hadoop_test/2008.csv","d:/hadoop_test/output2008"};
    
    		Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDelayDriver(), args));
    	}
    
    }

asa-flight-statistics.zip
다운로드

참고서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

반응형

'bigdata > hadoop' 카테고리의 다른 글

hadoop ChainMapper, Reducer exercise  (0) 2016.11.14
하둡 정리 2  (0) 2016.11.13
output Key,Value Class 미지정시 오류  (0) 2016.11.11
hadoop WordCount GenericOptionParser 적용 예제  (0) 2016.11.06
하둡 정리 1  (0) 2016.10.11
Comments