DBILITY

M/R 미국 상업 항공편 지연 출발 통계 그래프 작성하기 본문

bigdata/hadoop

M/R 미국 상업 항공편 지연 출발 통계 그래프 작성하기

DBILITY 2016. 10. 3. 21:09
반응형
  1. 맵리듀스로 분석할 사항은
    2008년도 월별 지연 출발 통계로 ASA Data expo에서 2008년 데이터를 다운로드한다.
  2. 대상 데이터 구조를 분석하기 위해 ASA Data expo '09의 Variable Descriptions확인 결과
    CSV 파일의 1번 Year, 2번 Month, 16번 departure delay, in minutes임을 확인.
    Mapper구현 시 input value에 대해 split후 결과 배열의 1,15번 index를 사용하기로 함.
    1번의 월 값을 key로 사용하고, 15번 지연시간은 value로 NA 문자열이 포함되어 있으므로,
    Interger convert 오류가 발생할 수 있으며, 음수가 존재함, 양수만 카운트하도록 조건 추가.
    파일 사이즈 약 657M.
  3. 프로젝트 생성 및 Mapper, Reducer, Driver(Client) Class를 구현한다.
    윈도 우상 eclipse에서 실행 후 결과만 얻을 것이므로 build관련 설정은 제외함
    pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.dbility.hadoop.apps</groupId>
      <artifactId>departure_delay_2008</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <properties>
      	<java.version>1.7</java.version>
      	<hadoop.version>1.2.1</hadoop.version>
      	<slf4j.version>1.7.20</slf4j.version>
      	<logback.version>1.1.7</logback.version>
      </properties>
      <dependencies>
      	<dependency>
      		<groupId>org.apache.hadoop</groupId>
      		<artifactId>hadoop-core</artifactId>
      		<version>${hadoop.version}</version>
      	</dependency>
      	<dependency>
      		<groupId>org.slf4j</groupId>
      		<artifactId>slf4j-api</artifactId>
      		<version>${slf4j.version}</version>
      	</dependency>
      	<dependency>
      		<groupId>org.slf4j</groupId>
      		<artifactId>jcl-over-slf4j</artifactId>
      		<version>${slf4j.version}</version>
      	</dependency>
      	<dependency>
      		<groupId>org.slf4j</groupId>
      		<artifactId>log4j-over-slf4j</artifactId>
      		<version>${slf4j.version}</version>
      	</dependency>
      	<dependency>
      		<groupId>org.slf4j</groupId>
      		<artifactId>jul-to-slf4j</artifactId>
      		<version>${slf4j.version}</version>
      	</dependency>
      	<dependency>
      		<groupId>ch.qos.logback</groupId>
      		<artifactId>logback-classic</artifactId>
      		<version>${logback.version}</version>
      	</dependency>
      	<dependency>
      		<groupId>ch.qos.logback</groupId>
      		<artifactId>logback-core</artifactId>
      		<version>${logback.version}</version>
      	</dependency>
      </dependencies>
    </project>
    logback.xml
    <!--?xml version="1.0" encoding="UTF-8"?-->
    <configuration>
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder
                by default -->
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
     
        <logger name="org.apache.hadoop.mapred.JobClient" additivity="false">
            <level value="info" />
            <appender-ref ref="console" />
        </logger>
     
        <root level="warn">
            <appender-ref ref="console" />
        </root>
    </configuration>
    DepartureDelayMapper2008.java
    package com.dbility.hadoop.apps;
     
    import java.io.IOException;
     
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
     
    public class DepartureDelay2008Mapper extends
            Mapper<longwritable, text, intwritable> {
     
        private Text outputKey = new Text();
        private static final IntWritable one = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
     
            String[] columns = value.toString().split(",");
     
            if ( !columns[15].equals("NA") && Integer.parseInt(columns[15]) > 0 ){
                outputKey.set(columns[1]);
                context.write(outputKey, one);
            }
        }
     
    }

    DepartureDelayReducer2008.java
    package com.dbility.hadoop.apps;
     
    import java.io.IOException;
     
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
     
    public class DepartureDelay2008Reducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
     
        IntWritable count = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {
            int sum = 0;
     
            for (IntWritable value : values) {
                sum+=value.get();
            }
            count.set(sum);
            context.write(key, count);
        }
     
    }

    WindowsLocalFileSystem.java
    package com.dbility.hadoop.apps;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    
    public class WindowsLocalFileSystem extends LocalFileSystem {
    
    	public WindowsLocalFileSystem() {
    		super();
    	}
    
    	//@Override
    	public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
    		final boolean result = super.mkdirs(f);
    		this.setPermission(f, permission);
    		return result;
    	}
    
    	public void setPermission(final Path f, final FsPermission permission ) {
    		try {
    			super.setPermission(f, permission);
    		} catch (final IOException e) {
    			System.err.println(e.getMessage());
    		}
    	}
    
    }
    DepartureDelay2008Client.java
    package com.dbility.hadoop.apps;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class DepartureDelay2008Client {
    
    	private static final Logger log = LoggerFactory.getLogger(DepartureDelay2008Client.class);
    
    	public static void main(String[] args) throws Exception {
    
    		if ( args.length != 2 ){
    			log.info("Usage : <input> <output>");
    			Runtime.getRuntime().exit(1);
    		}
    
    		Configuration conf = new Configuration();
    
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set("fs.file.impl", "com.dbility.hadoop.apps.WindowsLocalFileSystem");
    		conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
    		conf.set("io.sort.mb","512");
    		conf.set("mapred.used.genericoptionsparser", "true");
    
    		Job job = new Job(conf,"DepartureDelay2008MapReduce");
    
    		job.setJarByClass(DepartureDelay2008Client.class);
    		job.setMapperClass(DepartureDelay2008Mapper.class);
    		job.setReducerClass(DepartureDelay2008Reducer.class);
    
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    		FileSystem hdfs = FileSystem.get(conf);
    		Path path = new Path(args[1]);
    		if ( hdfs.exists(path) ) hdfs.delete(path, true);
    
    		Runtime.getRuntime().exit(job.waitForCompletion(true) ? 0 : 1);
    
    	}
    
    }
    실행 로그
    23:58:25.283 [main] WARN  o.a.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Failed to set permissions of path: \tmp\hadoop-ROOKIE\mapred\staging\ROOKIE1086790840\.staging to 0700
    Failed to set permissions of path: \tmp\hadoop-ROOKIE\mapred\staging\ROOKIE1086790840\.staging\job_local1086790840_0001 to 0700
    23:58:25.298 [main] WARN  org.apache.hadoop.mapred.JobClient - No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    23:58:25.314 [main] WARN  o.a.h.io.compress.snappy.LoadSnappy - Snappy native library not loaded
    Failed to set permissions of path: \tmp\hadoop-ROOKIE\mapred\staging\ROOKIE1086790840\.staging\job_local1086790840_0001\job.split to 0644
    Failed to set permissions of path: \tmp\hadoop-ROOKIE\mapred\staging\ROOKIE1086790840\.staging\job_local1086790840_0001\job.splitmetainfo to 0644
    Failed to set permissions of path: \tmp\hadoop-ROOKIE\mapred\staging\ROOKIE1086790840\.staging\job_local1086790840_0001\job.xml to 0644
    23:58:25.486 [main] INFO  org.apache.hadoop.mapred.JobClient - Running job: job_local1086790840_0001
    23:58:26.487 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 0% reduce 0%
    23:58:27.489 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 4% reduce 0%
    23:58:28.489 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 9% reduce 0%
    23:58:29.489 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 19% reduce 0%
    23:58:30.497 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 23% reduce 0%
    23:58:31.512 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 33% reduce 0%
    23:58:32.512 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 38% reduce 0%
    23:58:33.513 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 47% reduce 0%
    23:58:34.513 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 52% reduce 0%
    23:58:35.520 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 61% reduce 0%
    23:58:36.521 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 66% reduce 0%
    23:58:37.525 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 76% reduce 0%
    23:58:38.553 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 85% reduce 0%
    23:58:39.556 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 90% reduce 0%
    23:58:40.559 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 100% reduce 0%
    23:58:41.561 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 100% reduce 100%
    23:58:41.562 [main] INFO  org.apache.hadoop.mapred.JobClient - Job complete: job_local1086790840_0001
    23:58:41.583 [main] INFO  org.apache.hadoop.mapred.JobClient - Counters: 17
    23:58:41.583 [main] INFO  org.apache.hadoop.mapred.JobClient -   File Output Format Counters 
    23:58:41.583 [main] INFO  org.apache.hadoop.mapred.JobClient -     Bytes Written=123
    23:58:41.583 [main] INFO  org.apache.hadoop.mapred.JobClient -   File Input Format Counters 
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Bytes Read=689494964
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -   FileSystemCounters
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_READ=8461331084
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_WRITTEN=289237537
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -   Map-Reduce Framework
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input groups=12
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output materialized bytes=22191676
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine output records=0
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map input records=7009728
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce shuffle bytes=0
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce output records=12
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Spilled Records=6906730
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output bytes=16789602
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Total committed heap usage (bytes)=23908057088
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     SPLIT_RAW_BYTES=1974
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output records=2700974
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine input records=0
    23:58:41.584 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input records=2700974
    실행결과 part-r-00000
    1   247948
    10  162531
    11  157278
    12  263949
    2   252765
    3   271969
    4   220864
    5   220614
    6   271014
    7   253632
    8   231349
    9   147061​
  4. 실행 후 결과를 토대로 보고서에 삽입할 엑셀 피벗 차트를 작성한다.
    뭔가 이상한데, 천조국이라 그런가?! 숫자가 너무 큼. 정확한 결과인지는 CSV데이터를 RDBMS에 IMPORT 후 확인해야 하겠다!



    다음은 지연 출발 사유별로 필드가 있길래 조건을 수정해가며 분석해 보았습니다만, 이게 정상인지 확신할 수 없으며,
    Quartz JobChain이나 SpringBatch jobstep 같은 개념이 당연히 있을 텐데, 아직 공부를 못해서 단순하게 수정해가며 작성되었다.
    (나중에 확인한 결과 ChainMapper, ChainReducer, MultipleOutputs가 있다. 역시 무식하면 죄를 짓는다)

 

반응형
Comments