DBILITY

hadoop secondary sort exercise ( 보조 정렬 실습 ) 본문

bigdata/hadoop

hadoop secondary sort exercise ( 보조 정렬 실습 )

DBILITY 2017. 2. 6. 20:24
반응형

ASA운항기록 데이터(약 12G) 대상으로,보조정렬을 실습해 보았습니다.
지연도착시간을 30분/1시간 이내,1시간이상으로 구분하였으며,파일레이아웃은 [ 연 / 월  / 구분 / 횟수 ]형태로 출력됩니다.

  1. pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.dbility.hadoop</groupId>
    	<artifactId>asa-analysis</artifactId>
    	<version>1.0.0</version>
    	<properties>
    		<java.version>1.7</java.version>
    		<scp.user>hadoop</scp.user>
    		<scp.password>hadoop</scp.password>
    		<scp.host>big-master</scp.host>
    		<scp.copy2dir>/home/hadoop</scp.copy2dir>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-core</artifactId>
    			<version>1.2.1</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.7.20</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>jcl-over-slf4j</artifactId>
    			<version>1.7.20</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>log4j-over-slf4j</artifactId>
    			<version>1.7.20</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>ch.qos.logback</groupId>
    			<artifactId>logback-classic</artifactId>
    			<version>1.1.7</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>ch.qos.logback</groupId>
    			<artifactId>logback-access</artifactId>
    			<version>1.1.7</version>
    			<scope>provided</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<finalName>${project.artifactId}</finalName>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>2.3.2</version>
    				<configuration>
    					<source>${java.version}</source>
    					<target>${java.version}</target>
    				</configuration>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-jar-plugin</artifactId>
    				<version>2.3.2</version>
    				<configuration>
    					<archive>
    						<manifest>
    							<mainClass>
    								com.dbility.hadoop.sort.secondary.AsaDriver
    							</mainClass>
    						</manifest>
    					</archive>
    				</configuration>
    				<executions>
    					<execution>
    						<id>default-jar</id>
    						<phase>package</phase>
    						<goals>
    							<goal>jar</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-antrun-plugin</artifactId>
    				<version>1.8</version>
    				<dependencies>
    					<dependency>
    						<groupId>org.apache.ant</groupId>
    						<artifactId>ant-jsch</artifactId>
    						<version>1.9.7</version>
    					</dependency>
    					<dependency>
    						<groupId>com.jcraft</groupId>
    						<artifactId>jsch</artifactId>
    						<version>0.1.54</version>
    					</dependency>
    				</dependencies>
    				<executions>
    					<execution>
    						<phase>install</phase>
    						<goals>
    							<goal>run</goal>
    						</goals>
    						<configuration>
    							<tasks>
    								<scp file="${project.basedir}/target/${project.artifactId}.jar"
    									todir="${scp.user}:${scp.password}@${scp.host}:/${scp.copy2dir}"
    									trust="true">
    								</scp>
    							</tasks>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    		</plugins>
    	</build>
    </project>
  2. logback.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
    	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    		<encoder>
    			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
    			</pattern>
    		</encoder>
    	</appender>
    
    	<logger name="org.apache.hadoop.mapred.JobClient" level="info"
    		additivity="false">
    		<appender-ref ref="STDOUT" />
    	</logger>
    
    	<root level="warn">
    		<appender-ref ref="STDOUT" />
    	</root>
    </configuration>
  3. WindowsLocalFileSystem.java
    package com.dbility.hadoop.util;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    
    public class WindowsLocalFileSystem extends LocalFileSystem {
    
    	public WindowsLocalFileSystem() {
    		super();
    	}
    
    	@Override
    	public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
    		final boolean result = super.mkdirs(f);
    		this.setPermission(f, permission);
    		return result;
    	}
    
    	public void setPermission(final Path p, final FsPermission permission)
    			throws IOException {
    		try {
    			super.setPermission(p, permission);
    		} catch (final IOException ioe) {
    			System.err.println(ioe.getMessage());
    		}
    	}
    }
  4. PerformParser.java
    package com.dbility.hadoop.util;
    
    import org.apache.hadoop.io.Text;
    
    public class PerformParser {
    
    	private String year;
    	private int month;
    	private String division;
    
    	private int arrivalDelayTime = 0;
    	private boolean arrivalDelay = false;
    
    	public PerformParser(Text value) {
    		String[] cols = value.toString().split(",");
    
    		year = cols[0];
    		month = Integer.parseInt(cols[1]);
    
    		if ( !"NA".equals(cols[14]) ){
    			arrivalDelay = true;
    
    			arrivalDelayTime = Integer.parseInt(cols[14]);
    
    			if ( arrivalDelayTime > 0 ) {
    
    				if ( arrivalDelayTime <= 30 ) {
    					division = "1";
    				} else if ( arrivalDelayTime <= 60 ) {
    					division = "2";
    				} else {
    					division = "3";
    				}
    			}
    		}
    	}
    
    	public String getYear() {
    		return year;
    	}
    
    	public int getMonth() {
    		return month;
    	}
    
    	public String getDivision() {
    		return division;
    	}
    
    	public boolean isArrivalDelay() {
    		return arrivalDelay;
    	}
    
    	public int getArrivalDelayTime() {
    		return arrivalDelayTime;
    	}
    }
  5. CompositeKey.java
    package com.dbility.hadoop.sort.secondary;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableUtils;
    
    public class CompositeKey implements WritableComparable<CompositeKey> {
    
    	private String year;
    	private Integer month;
    	private String division;
    
    	public CompositeKey() {
    	}
    
    	public CompositeKey(String year, int month, String division) {
    		this.year = year;
    		this.month = month;
    		this.division = division;
    	}
    
    	public String getYear() {
    		return year;
    	}
    
    	public void setYear(String year) {
    		this.year = year;
    	}
    
    	public int getMonth() {
    		return month;
    	}
    
    	public void setMonth(int month) {
    		this.month = month;
    	}
    
    	public String getDivision() {
    		return division;
    	}
    
    	public void setDivision(String division) {
    		this.division = division;
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		year = WritableUtils.readString(in);
    		month = in.readInt();
    		division = WritableUtils.readString(in);
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		WritableUtils.writeString(out, year);
    		out.writeInt(month);
    		WritableUtils.writeString(out, division);
    	}
    
    	@Override
    	public int compareTo(CompositeKey o) {
    
    		int result = year.compareTo(o.getYear());
    
    		if (result == 0)
    			result = month == o.getMonth() ? 0 : (month < o.getMonth() ? -1 : 1);
    
    		if (result == 0)
    			result = division.compareTo(o.getDivision());
    
    		return result;
    	}
    
    	@Override
    	public String toString() {
    		return (new StringBuilder()).append(year).append("\t").append(month)
    				.append("\t").append(division).toString();
    	}
    }
  6. AsaMapper.java
    package com.dbility.hadoop.sort.secondary;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import com.dbility.hadoop.util.PerformParser;
    
    public class AsaMapper extends
    		Mapper<LongWritable, Text, CompositeKey, IntWritable> {
    
    	private CompositeKey outputKey = new CompositeKey();
    	private final static IntWritable outputValue = new IntWritable(1);
    
    	@Override
    	protected void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    
    		PerformParser parser = new PerformParser(value);
    
    		if ( parser.isArrivalDelay() ) {
    
    			if ( parser.getArrivalDelayTime() > 0 ) {
    
    				outputKey.setYear(parser.getYear());
    				outputKey.setMonth(parser.getMonth());
    				outputKey.setDivision(parser.getDivision());
    
    				context.write(outputKey, outputValue);
    			}
    		}
    	}
    }
  7. AsaPartitioner.java
    package com.dbility.hadoop.sort.secondary;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class AsaPartitioner extends Partitioner<CompositeKey, IntWritable> {
    
    	@Override
    	public int getPartition(CompositeKey key, IntWritable value, int numReduceTask) {
    		return key.getYear().hashCode() % numReduceTask;
    		/*int result = 1;
    
    		if ( Integer.parseInt(key.getYear()) < 2000 ) result = 0;
    
    		return result;*/
    	}
    }
  8. AsaGroupingComparator.java
    package com.dbility.hadoop.sort.secondary;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class AsaGroupingComparator extends WritableComparator {
    
    	public AsaGroupingComparator() {
    		super(CompositeKey.class,true);
    	}
    
    	@Override
    	public int compare(WritableComparable a, WritableComparable b) {
    
    		CompositeKey k1 = (CompositeKey)a;
    		CompositeKey k2 = (CompositeKey)b;
    
    		return k1.getYear().compareTo(k2.getYear());
    	}
    }
  9. AsaSortComparator.java
    package com.dbility.hadoop.sort.secondary;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class AsaSortComparator extends WritableComparator {
    
    	public AsaSortComparator() {
    		super(CompositeKey.class,true);
    	}
    
    	@Override
    	public int compare(WritableComparable a, WritableComparable b) {
    
    		CompositeKey k1 = (CompositeKey)a;
    		CompositeKey k2 = (CompositeKey)b;
    
    		int result = k1.getYear().compareTo(k2.getYear());
    
    		if ( result != 0 ) return result;
    
    		result = k1.getMonth() == k2.getMonth() ? 0 : ( k1.getMonth() < k2.getMonth() ? -1 : 1) ;
    
    		if ( result != 0 ) return result;
    
    		return k1.getDivision().compareTo(k2.getDivision());
    	}
    }
  10. AsaReducer.java
    package com.dbility.hadoop.sort.secondary;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class AsaReducer extends Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {
    
    	private CompositeKey outputKey = new CompositeKey();
    	private IntWritable outputValue = new IntWritable();
    
    	@Override
    	protected void reduce(CompositeKey key, Iterable values,
    			Context context)
    			throws IOException, InterruptedException {
    
    		int sum = 0;
    		int preMonth = key.getMonth();
    		String preDivision = key.getDivision();
    
    		for (IntWritable value : values) {
    
    			if ( key.getMonth() != preMonth || !key.getDivision().equals(preDivision) ) {
    
    				outputKey.setYear(key.getYear());
    				outputKey.setMonth(preMonth);
    				outputKey.setDivision(preDivision);
    				outputValue.set(sum);
    				context.write(outputKey, outputValue);
    				sum = 0;
    			}
    
    			sum+=value.get();
    			preMonth = key.getMonth();
    			preDivision = key.getDivision();
    		}
    
    		if ( key.getMonth() == preMonth && key.getDivision().equals(preDivision) ) {
    
    			outputKey.setYear(key.getYear());
    			outputKey.setMonth(preMonth);
    			outputKey.setDivision(preDivision);
    			outputValue.set(sum);
    
    			context.write(outputKey, outputValue);
    		}
    	}
    }
  11. AsaDriver.java
    package com.dbility.hadoop.sort.secondary;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AsaDriver extends Configured implements Tool {
    
    	private static final Logger log = LoggerFactory.getLogger(AsaDriver.class);
    
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		if ( remainArgs.length != 2 ) {
    			System.out.println("Usage : <input> <output>");
    			return -1;
    		}
    
    		Job job = new Job(getConf(), "Test");
    
    		job.setJarByClass(AsaDriver.class);
    		job.setInputFormatClass(TextInputFormat.class);
    
    		job.setMapperClass(AsaMapper.class);
    		job.setMapOutputKeyClass(CompositeKey.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		job.setCombinerClass(AsaReducer.class);
    
    		job.setNumReduceTasks(2);
    		job.setPartitionerClass(AsaPartitioner.class);
    		job.setSortComparatorClass(AsaSortComparator.class);
    		job.setGroupingComparatorClass(AsaGroupingComparator.class);
    
    		job.setReducerClass(AsaReducer.class);
    		job.setOutputFormatClass(TextOutputFormat.class);
    		job.setOutputKeyClass(CompositeKey.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
    
    		FileSystem hdfs = FileSystem.get(getConf());
    		Path path = new Path(remainArgs[1]);
    
    		if ( hdfs.exists(path) ) hdfs.delete(path, true);
    
    		return job.waitForCompletion(true) ? 0 : -2;
    	}
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    
    	    /*conf.set("defalut.fs.name", "file:///");
    	    conf.set("mapred.job.tracker", "local");
    	    conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem");
    	    conf.set("io.serializations",
    	         "org.apache.hadoop.io.serializer.JavaSerialization,"
    	        +"org.apache.hadoop.io.serializer.WritableSerialization");
    	    conf.set("mapred.map.child.java.opts", "-Xms1024M -Xmx1024M");
    	    conf.set("io.sort.mb", "512");
    
    		args = new String[] {"d:/hadoop_test/2008.csv","d:/hadoop_test/ouput"};*/
    
    		Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDriver(), args));
    	}
    }

asa-analysis.zip
다운로드

참고 서적 : 시작하세요!  하둡프로그래밍 개정2판(위키북스) - 정재화지음

반응형
Comments