DBILITY

hadoop Text -> SequenceFile -> MapFile로 변환 본문

bigdata/hadoop

hadoop Text -> SequenceFile -> MapFile로 변환

DBILITY 2017. 3. 7. 22:05
반응형

연습 삼아 그냥 해 봤다.

부분 정렬을 위해 맵 파일을 사용한다고 책에서 본걸 그냥 해봤는데 의외로 잘 돌아가다니..ㅋㅋ
다음 주까지 사용자 정의 정렬인 보조 정렬과 파일 포맷 변환하는 걸 다섯 번씩만 작성해 봐야겠다.

머리가 나쁘니까...

mapred package로 통일하였고,검색은 구현되어 있지 않다. 생각이 안나니까..
윈도 환경 eclipse에서만 테스트되었습니다.

 

MapFileCreater.java
다운로드

package com.dbility.hadoop.execise;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 *
 * Description
 *
 *
 * @author hyperrookie@gmail.com
 *
 * @version 1.0.0
 * @date 2017. 3. 7.
 */
public class MapFileCreater extends Configured implements Tool {

	public int run(String[] args) throws Exception {

		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

		if ( remainArgs.length != 3 ) {
			System.out.println("Usage : hadoop jar jarPath [mainClass] <input_path> <sequence_path> <map_path>");
			return -1;
		}

		JobConf jobConf1 = new JobConf(getConf());

		jobConf1.setJarByClass(MapFileCreater.class);
		jobConf1.setJobName("Text2SequenceFileConvertJob");

		jobConf1.setInputFormat(TextInputFormat.class);
		jobConf1.setMapperClass(SequenceMapper.class);

		jobConf1.setNumReduceTasks(0);

		jobConf1.setOutputFormat(SequenceFileOutputFormat.class);
		jobConf1.setOutputKeyClass(IntWritable.class);
		jobConf1.setOutputValueClass(Text.class);

		SequenceFileOutputFormat.setCompressOutput(jobConf1, true);
		SequenceFileOutputFormat.setOutputCompressorClass(jobConf1, DefaultCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(jobConf1, CompressionType.BLOCK);

		FileInputFormat.addInputPath(jobConf1, new Path(remainArgs[0]));
		FileOutputFormat.setOutputPath(jobConf1, new Path(remainArgs[1]));

		FileSystem hdfs = FileSystem.get(getConf());

		Path sequenceFilePath = new Path(remainArgs[1]);
		Path mapFilePath = new Path(remainArgs[2]);

		if ( hdfs.exists(sequenceFilePath) ) {
			hdfs.delete(sequenceFilePath, true);
		}

		RunningJob job1 =  JobClient.runJob(jobConf1);

		if ( job1.getJobState() != 2 ) {
			System.out.printf("%s failure!!\n",job1.getJobName());
			return -2;
		}

		JobConf jobConf2 = new JobConf(getConf());
		jobConf2.setJarByClass(MapFileCreater.class);
		jobConf2.setJobName("SequenceFiel2MapFileConverter");

		jobConf2.setInputFormat(SequenceFileInputFormat.class);
		jobConf2.setOutputFormat(MapFileOutputFormat.class);
		jobConf2.setOutputKeyClass(IntWritable.class);
		jobConf2.setOutputValueClass(Text.class);

		SequenceFileOutputFormat.setCompressOutput(jobConf2, true);
		SequenceFileOutputFormat.setOutputCompressorClass(jobConf2, DefaultCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(jobConf2, CompressionType.BLOCK);

		FileInputFormat.addInputPath(jobConf2, sequenceFilePath);
		FileOutputFormat.setOutputPath(jobConf2, mapFilePath);

		/*sequenceFilePath = new Path(remainArgs[1]+"_SUCCESS");

		if ( hdfs.exists(sequenceFilePath) ) {
			hdfs.delete(sequenceFilePath, true);
		}*/

		if ( hdfs.exists(mapFilePath) ) {
			hdfs.delete(mapFilePath, true);
		}

		RunningJob job2 = JobClient.runJob(jobConf2);

		return job2.getJobState() == 2 ? 0 : -3;
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		conf.set("fs.default.name", "file:///");
		conf.set("mapred.job.tracker", "local");
		conf.set(
				"io.serializations",
				"org.apache.hadoop.io.serializer.JavaSerialization,"
						+ "org.apache.hadoop.io.serializer.WritableSerialization");
		conf.set("fs.file.impl",
				"com.dbility.hadoop.execise.MapFileCreater$WindowsLocalFileSystem");
		conf.set("io.sort.mb", "512");

		args = new String[] {"d:/hadoop_test/input_data/","d:/hadoop_test/sequence_data/","d:/hadoop_test/map_data/"};

		int result = ToolRunner.run(conf, new MapFileCreater(), args);

		if ( result == 0 ) {

			FileSystem hdfs = FileSystem.get(conf);
			Path sequenceFilePath = new Path(args[1]+"_SUCCESS");
			Path MapFilePath = new Path(args[2]+"_SUCCESS");

			if ( hdfs.exists(sequenceFilePath) ) {
				hdfs.delete(sequenceFilePath, true);
			}

			if ( hdfs.exists(MapFilePath) ) {
				hdfs.delete(MapFilePath, true);
			}
		}

		Runtime.getRuntime().exit(result);
	}

	public static class WindowsLocalFileSystem extends LocalFileSystem {

		public WindowsLocalFileSystem() {
			super();
		}

		@Override
		public boolean mkdirs(final Path f, final FsPermission permission)
				throws IOException {
			final boolean result = super.mkdirs(f);

			return result;
		}

		@Override
		public void setPermission(final Path p, final FsPermission permission)
				throws IOException {
			try {
				super.setPermission(p, permission);
			} catch ( final IOException ioe ) {
				System.err.println(ioe.getMessage());
			}
		}

	}

	public static class Parser {

		private String year;
		private Integer month;
		private Integer distance;
		private boolean distanceAvailable = false;

		public Parser(Text value) {

			String[] columns = value.toString().split(",");

			this.year = columns[0];
			this.month = Integer.parseInt(columns[1]);

			if ( !"NA".equals(columns[18]) ) {
				this.distance = Integer.parseInt(columns[18]);
				this.distanceAvailable = true;
			}

		}

		public String getYear() {
			return year;
		}

		public void setYear(String year) {
			this.year = year;
		}

		public Integer getMonth() {
			return month;
		}

		public void setMonth(Integer month) {
			this.month = month;
		}

		public Integer getDistance() {
			return distance;
		}

		public void setDistance(Integer distance) {
			this.distance = distance;
		}

		public boolean isDistanceAvailable() {
			return distanceAvailable;
		}

		public void setDistanceAvailable(boolean distanceAvailable) {
			this.distanceAvailable = distanceAvailable;
		}
	}

	public static class SequenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {

		private IntWritable outputKey = new IntWritable();

		public void map(LongWritable key, Text value,
				OutputCollector<IntWritable, Text> collector,
				Reporter reporter) throws IOException {

			try {

				Parser parser = new Parser(value);

				if ( parser.isDistanceAvailable() && parser.getDistance() > 0 ) {

					outputKey.set(parser.getDistance());
					collector.collect(outputKey, value);
				}

			} catch ( Exception e ) {
				outputKey.set(0);
				collector.collect(outputKey, value);
				e.printStackTrace();
			}

		}

	}

}

참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음

반응형
Comments