DBILITY

hadoop partial sort exercise include search ( 부분 정렬 실습 3 ) 본문

bigdata/hadoop

hadoop partial sort exercise include search ( 부분 정렬 실습 3 )

DBILITY 2017. 3. 12. 00:35
반응형

2번째 실습에서 보였던 결과 저장 파일의 맨 앞줄은 PrintWriter로 write하니 이상한 문자가 사라졌다.
윈도 환경 eclipse에서 실행하였다.

리눅스 환경의 완전 분산 모드에선 Codec을 GzipCodec으로 변경했고, Configuration property 설정 부분을 주석처리 후 테스트했다.

hadoop3이 나오는 마당에 hadoop2환경까진 공부하고. 이후엔 SQL on Hadoop인 tajo까지 사용 가능해야 한다.

그때쯤엔 spring+mybatis환경에서 tajo jdbc driver를 통해 RIA UI에 hdfs 데이터를 리스트업 하는 것까지 해보고 싶다.

 

  1. pom.xml
    다운로드
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.dbility.hadoop</groupId>
    	<artifactId>execise</artifactId>
    	<version>1.0.0</version>
    	<properties>
    		<java.version>1.7</java.version>
    		<encoding.charset>UTF-8</encoding.charset>
    		<scp.user>hadoop</scp.user>
    		<scp.password>hadoop</scp.password>
    		<scp.host>big-master</scp.host>
    		<scp.copy2dir>home/hadoop</scp.copy2dir>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-core</artifactId>
    			<version>1.2.1</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>1.7.21</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>jcl-over-slf4j</artifactId>
    			<version>1.7.21</version>
    			<scope>provided</scope>
    			<exclusions>
    				<exclusion>
    					<artifactId>slf4j-api</artifactId>
    					<groupId>org.slf4j</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    		<dependency>
    			<groupId>ch.qos.logback</groupId>
    			<artifactId>logback-classic</artifactId>
    			<version>1.1.7</version>
    			<scope>provided</scope>
    			<exclusions>
    				<exclusion>
    					<artifactId>slf4j-api</artifactId>
    					<groupId>org.slf4j</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    	</dependencies>
    	<build>
    		<finalName>${project.artifactId}</finalName>
    		<plugins>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>2.3.2</version>
    				<configuration>
    					<source>${java.version}</source>
    					<target>${java.version}</target>
    				</configuration>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-jar-plugin</artifactId>
    				<version>2.3.2</version>
    				<configuration>
    					<archive>
    						<manifest>
    							<mainClass>com.dbility.hadoop.execise.PartialSortDriver</mainClass>
    						</manifest>
    					</archive>
    				</configuration>
    				<executions>
    					<execution>
    						<id>default-jar</id>
    						<phase>package</phase>
    						<goals>
    							<goal>jar</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-shade-plugin</artifactId>
    				<version>2.4.1</version>
    				<executions>
    					<execution>
    						<phase>package</phase>
    						<goals>
    							<goal>shade</goal>
    						</goals>
    						<configuration>
    							<artifactSet>
    								<includes>
    									<include>${project.groupId}:${project.artifactId}</include>
    								</includes>
    							</artifactSet>
    							<filters>
    								<filter>
    									<artifact>*:*</artifact>
    									<excludes>
    										<exclude>META-INF/maven/**</exclude>
    									</excludes>
    								</filter>
    							</filters>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-antrun-plugin</artifactId>
    				<version>1.8</version>
    				<dependencies>
    					<dependency>
    						<groupId>org.apache.ant</groupId>
    						<artifactId>ant-jsch</artifactId>
    						<version>1.9.7</version>
    					</dependency>
    					<dependency>
    						<groupId>com.jcraft</groupId>
    						<artifactId>jsch</artifactId>
    						<version>0.1.54</version>
    					</dependency>
    				</dependencies>
    				<executions>
    					<execution>
    						<phase>install</phase>
    						<goals>
    							<goal>run</goal>
    						</goals>
    						<configuration>
    							<tasks>
    								<scp file="${project.basedir}/target/${project.artifactId}.jar"
    									todir="${scp.user}:${scp.password}@${scp.host}:/${scp.copy2dir}"
    									trust="true">
    								</scp>
    							</tasks>
    						</configuration>
    					</execution>
    				</executions>
    			</plugin>
    		</plugins>
    	</build>
    </project>
  2. logback.xml
    다운로드
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
    	<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
    		<encoder>
    			<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
    		</encoder>
    	</appender>
    
    	<logger name="org.apache.hadoop.mapred.JobClient" level="info" additivity="false">
    		<appender-ref ref="console" />
    	</logger>
    
    	<root level="warn" >
    		<appender-ref ref="console" />
    	</root>
    
    </configuration>
  3. PartialSortDriver.java
    다운로드
    package com.dbility.hadoop.execise;
    
    import java.io.IOException;
    import java.io.PrintWriter;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.permission.FsPermission;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.MapFile.Reader;
    import org.apache.hadoop.io.SequenceFile.CompressionType;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapFileOutputFormat;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Partitioner;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.RunningJob;
    import org.apache.hadoop.mapred.SequenceFileInputFormat;
    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.lib.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     *
     * Description
     *
     *
     * @author hyperrookie@gmail.com
     *
     * @version 1.0.0
     * @date 2017. 3. 11.
     */
    public class PartialSortDriver extends Configured implements Tool {
    
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args)
    				.getRemainingArgs();
    
    		if (remainArgs.length != 5) {
    			System.out
    					.println("Usage : hadoop jar jarName [mainClass] <input_path> <sequence_path> <map_path> <save_path> <search_miles>");
    			return -1;
    		}
    
    		Path input_path = new Path(remainArgs[0]);
    		Path sequence_path = new Path(remainArgs[1]);
    		Path map_path = new Path(remainArgs[2]);
    		Path save_path = new Path(remainArgs[3] + "/search_miles.txt");
    		IntWritable search_miles = new IntWritable(
    				Integer.parseInt(remainArgs[4]));
    		FileSystem hdfs = FileSystem.get(getConf());
    
    		JobConf jobConf01 = new JobConf();
    
    		jobConf01.setJarByClass(PartialSortDriver.class);
    		jobConf01.setJobName("Text2SequenceFileConverter");
    
    		jobConf01.setInputFormat(TextInputFormat.class);
    
    		jobConf01.setMapperClass(Text2SequenceFileMapper.class);
    
    		jobConf01.setNumReduceTasks(0);
    
    		jobConf01.setOutputFormat(SequenceFileOutputFormat.class);
    		jobConf01.setOutputKeyClass(IntWritable.class);
    		jobConf01.setOutputValueClass(Text.class);
    
    		SequenceFileOutputFormat.setCompressOutput(jobConf01, true);
    		SequenceFileOutputFormat.setOutputCompressorClass(jobConf01,
    				DefaultCodec.class);
    		SequenceFileOutputFormat.setOutputCompressionType(jobConf01,
    				CompressionType.BLOCK);
    
    		FileInputFormat.addInputPath(jobConf01, input_path);
    		FileOutputFormat.setOutputPath(jobConf01, sequence_path);
    
    		if (!hdfs.exists(input_path)) {
    			System.out.printf("%s is not found!", input_path.getName());
    			return -2;
    		}
    
    		if (hdfs.exists(sequence_path)) {
    			hdfs.delete(sequence_path, true);
    		}
    
    		RunningJob job01 = JobClient.runJob(jobConf01);
    
    		if (job01.getJobState() != 2) {
    			System.out.printf("%s is failed!", job01.getJobName());
    			return -3;
    		}
    
    		JobConf jobConf02 = new JobConf(getConf(), PartialSortDriver.class);
    		jobConf02.setJobName("SequenceFile2MapFileConverter");
    
    		jobConf02.setInputFormat(SequenceFileInputFormat.class);
    		jobConf02.setOutputFormat(MapFileOutputFormat.class);
    		jobConf02.setOutputKeyClass(IntWritable.class);
    
    		SequenceFileOutputFormat.setCompressOutput(jobConf02, true);
    		SequenceFileOutputFormat.setOutputCompressorClass(jobConf02,
    				DefaultCodec.class);
    		SequenceFileOutputFormat.setOutputCompressionType(jobConf02,
    				CompressionType.BLOCK);
    
    		FileInputFormat.addInputPath(jobConf02, sequence_path);
    		FileOutputFormat.setOutputPath(jobConf02, map_path);
    
    		if (!hdfs.exists(sequence_path)) {
    			System.out.printf("%s is not found!", sequence_path.getName());
    			return -4;
    		} else {
    			hdfs.delete(new Path(remainArgs[1]+"/_SUCCESS"), true);
    			hdfs.delete(new Path(remainArgs[1]+"/_logs"), true);
    		}
    
    		if (hdfs.exists(map_path)) {
    			hdfs.delete(map_path, true);
    		}
    
    		RunningJob job02 = JobClient.runJob(jobConf02);
    
    		if (job02.getJobState() != 2) {
    			System.out.printf("%s is failed!", job02.getJobName());
    			return -5;
    		}
    
    		hdfs.delete(new Path(remainArgs[2]+"/_SUCCESS"), true);
    		hdfs.delete(new Path(remainArgs[2]+"/_logs"), true);
    
    		if (hdfs.exists(save_path)) {
    			hdfs.delete(save_path, true);
    		}
    
    
    
    		Reader[] readers = MapFileOutputFormat.getReaders(hdfs, map_path, getConf());
    		Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>();
    
    		IntWritable nextKey = new IntWritable();
    		Text value = new Text();
    
    		Reader reader = readers[partitioner.getPartition(search_miles, value, readers.length)];
    
    		Writable entry = reader.get(search_miles, value);
    		if ( entry == null ) {
    			System.out.printf("requested key [%s] is not found! ",search_miles);
    			return -6;
    		}
    
    		FSDataOutputStream outputStream = hdfs.create(save_path, true);
    		PrintWriter writer = new PrintWriter(outputStream);
    		do {
    			System.out.println(value);
    			writer.write(value.toString()+"\n");
    		} while ( reader.next(nextKey, value) && search_miles.equals(nextKey));
    
    		writer.close();
    		outputStream.close();
    
    		return 0;
    	}
    
    	public static void main(String[] args) throws Exception {
    
    		Configuration conf = new Configuration();
    
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set(
    				"io.serializations",
    				"org.apache.hadoop.io.serializer.JavaSerialization,"
    						+ "org.apache.hadoop.io.serializer.WritableSerialization");
    		conf.set("fs.file.impl",
    				"com.dbility.hadoop.execise.PartialSortDriver$WindowsLocalFileSystem");
    		conf.set("io.sort.mb", "512");
    
    		args = new String[] { "d:\\hadoop_test\\input_data",
    				"d:\\hadoop_test\\sequence_data", "d:\\hadoop_test\\map_data",
    				"d:\\hadoop_test\\",
    				"100" };
    
    		Runtime.getRuntime().exit(
    				ToolRunner.run(conf, new PartialSortDriver(), args));
    
    	}
    
    	public static class WindowsLocalFileSystem extends LocalFileSystem {
    
    		public WindowsLocalFileSystem() {
    			super();
    		}
    
    		@Override
    		public boolean mkdirs(final Path f, final FsPermission permission)
    				throws IOException {
    			final boolean result = super.mkdirs(f);
    			return result;
    		}
    
    		@Override
    		public void setPermission(final Path p, final FsPermission permission)
    				throws IOException {
    			try {
    				super.setPermission(p, permission);
    			} catch (final IOException e) {
    				System.err.println(e.getMessage());
    			}
    		}
    	}
    
    	public static class Parser {
    
    		private String year;
    		private Integer month;
    		private Integer distance;
    		private boolean distanceAvaliable = false;
    
    		public Parser(Text value) {
    			String[] columns = value.toString().split(",");
    
    			this.year = columns[0];
    			this.month = Integer.parseInt(columns[1]);
    
    			if (!"NA".equals(columns[18])) {
    				this.distance = Integer.parseInt(columns[18]);
    				if (this.distance > 0)
    					this.distanceAvaliable = true;
    			}
    		}
    
    		public String getYear() {
    			return year;
    		}
    
    		public Integer getMonth() {
    			return month;
    		}
    
    		public Integer getDistance() {
    			return distance;
    		}
    
    		public boolean isDistanceAvaliable() {
    			return distanceAvaliable;
    		}
    
    	}
    
    	public static class Text2SequenceFileMapper extends MapReduceBase implements
    			Mapper<LongWritable, Text, IntWritable, Text> {
    
    		private IntWritable outputKey = new IntWritable();
    
    		public void map(LongWritable key, Text value,
    				OutputCollector<IntWritable, Text> collector, Reporter reporter)
    				throws IOException {
    
    			try {
    				Parser parser = new Parser(value);
    
    				if (parser.isDistanceAvaliable()) {
    					outputKey.set(parser.distance);
    					collector.collect(outputKey, value);
    				}
    			} catch (Exception e) {
    				outputKey.set(0);
    				collector.collect(outputKey, value);
    			}
    
    		}
    	}
    
    }

 

반응형
Comments