Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- tomcat
- mapreduce
- vaadin
- mybatis
- Kotlin
- Python
- Eclipse
- Express
- IntelliJ
- NPM
- R
- Sqoop
- table
- window
- Android
- SQL
- SPC
- GIT
- Java
- JavaScript
- 보조정렬
- xPlatform
- plugin
- SSL
- Spring
- hadoop
- react
- 공정능력
- es6
- MSSQL
Archives
- Today
- Total
DBILITY
hadoop partial sort exercise 2 ( 부분 정렬 실습 2 ) 본문
반응형
윈도 환경 eclipse에서 테스트했는데,
결과 파일 맨 앞이 이상하다.. 왜일까?
책 없이 타이핑(?)했다는데 만족해야 하나보다^^;
MRUnit으로 테스트를 해봐야겠군... 기억이 안나지만...
package com.dbility.hadoop.execise;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile.Reader;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapFileOutputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* Description
*
*
* @author hyperrookie@gmail.com
*
* @version 1.0.0
* @date 2017. 3. 8.
*/
public class PartialSortDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
String[] remainArgs = new GenericOptionsParser(getConf(), args)
.getRemainingArgs();
if (remainArgs.length != 4) {
System.out
.println("Usage : hadoop jar jarName "
+ "[mainClass] <input_path> <sequence_path> "
+ "<map_path> <search_miles>");
return -1;
}
FileSystem hdfs = FileSystem.get(getConf());
Path input_path = new Path(remainArgs[0]);
Path sequence_path = new Path(remainArgs[1]);
Path map_path = new Path(remainArgs[2]);
IntWritable searchMiles = new IntWritable(
Integer.parseInt(remainArgs[3]));
JobConf jobConf1 = new JobConf(getConf(), PartialSortDriver.class);
jobConf1.setJobName("SequenceFileCreator");
jobConf1.setMapperClass(SequenceMapper.class);
jobConf1.setNumReduceTasks(0);
jobConf1.setInputFormat(TextInputFormat.class);
jobConf1.setOutputFormat(SequenceFileOutputFormat.class);
jobConf1.setOutputKeyClass(IntWritable.class);
jobConf1.setOutputValueClass(Text.class);
SequenceFileOutputFormat.setCompressOutput(jobConf1, true);
SequenceFileOutputFormat.setOutputCompressorClass(jobConf1,
DefaultCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(jobConf1,
CompressionType.BLOCK);
FileInputFormat.addInputPath(jobConf1, input_path);
FileOutputFormat.setOutputPath(jobConf1, sequence_path);
if (!hdfs.exists(input_path)) {
System.err.println("input path is not found!");
return -2;
}
if (hdfs.exists(sequence_path)) {
hdfs.delete(sequence_path, true);
}
RunningJob job1 = JobClient.runJob(jobConf1);
if (job1.getJobState() != 2) {
System.err.printf("%s is failure!\n", job1.getJobName());
return -3;
}
JobConf jobConf2 = new JobConf(getConf(), PartialSortDriver.class);
jobConf2.setJobName("MapFileCreater");
jobConf2.setInputFormat(SequenceFileInputFormat.class);
jobConf2.setOutputFormat(MapFileOutputFormat.class);
jobConf2.setOutputKeyClass(IntWritable.class);
// jobConf2.setOutputValueClass(Text.class);
SequenceFileOutputFormat.setCompressOutput(jobConf2, true);
SequenceFileOutputFormat.setOutputCompressorClass(jobConf2,
DefaultCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(jobConf2,
CompressionType.BLOCK);
FileInputFormat.addInputPath(jobConf2, sequence_path);
FileOutputFormat.setOutputPath(jobConf2, map_path);
if (!hdfs.exists(sequence_path)) {
System.err.println("sequence path is not found!");
return -4;
}
if (hdfs.exists(map_path)) {
hdfs.delete(map_path, true);
}
RunningJob job2 = JobClient.runJob(jobConf2);
if (job2.getJobState() != 2) {
System.err.printf("%s is failure!\n", job2.getJobName());
return -5;
}
hdfs.delete(new Path(remainArgs[2] + "_SUCCESS"), true);
Reader[] readers = MapFileOutputFormat.getReaders(hdfs, map_path,
getConf());
Partitioner<IntWritable, Text> partition = new HashPartitioner<IntWritable, Text>();
IntWritable nextKey = new IntWritable();
Text value = new Text();
Reader reader = readers[partition.getPartition(searchMiles, value,
readers.length)];
Writable entry = reader.get(searchMiles, value);
if (entry == null) {
System.out.println("the requested key is not found!");
return -6;
}
FSDataOutputStream outputStream = hdfs.create(new Path(input_path
.getParent() + "\\output.txt"));
do {
// System.out.println(value);
outputStream.writeUTF(value.toString() + "\n");
} while (reader.next(nextKey, value) && searchMiles.equals(nextKey));
outputStream.close();
return 0;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
conf.set(
"io.serializaitons",
"org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
conf.set("fs.file.impl",
"com.dbility.hadoop.execise.PartialSortDriver$WindowsLocalFileSystem");
conf.set("io.sort.mb", "512");
args = new String[] { "d:/hadoop_test/input_data/",
"d:/hadoop_test/sequence_data/", "d:/hadoop_test/map_data/",
"100" };
int result = ToolRunner.run(conf, new PartialSortDriver(), args);
Runtime.getRuntime().exit(result);
}
public static class WindowsLocalFileSystem extends LocalFileSystem {
public WindowsLocalFileSystem() {
super();
}
public boolean mkdirs(final Path f, final FsPermission permission)
throws IOException {
final boolean result = super.mkdirs(f);
return result;
}
public void setPermission(final Path p, final FsPermission permission)
throws IOException {
try {
super.setPermission(p, permission);
} catch (final IOException ioe) {
System.err.println(ioe.getMessage());
}
}
}
public static class Parser {
private String year;
private Integer month;
private Integer distance;
private boolean distanceAvailable = false;
public Parser(Text value) {
String[] columns = value.toString().split(",");
this.year = columns[0];
this.month = Integer.parseInt(columns[1]);
if (!"NA".equals(columns[18])) {
this.distance = Integer.parseInt(columns[18]);
if (this.distance > 0)
this.distanceAvailable = true;
}
}
public String getYear() {
return year;
}
public Integer getMonth() {
return month;
}
public Integer getDistance() {
return distance;
}
public boolean isDistanceAvailable() {
return distanceAvailable;
}
}
public static class SequenceMapper extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, Text> {
private IntWritable outputKey = new IntWritable();
public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> collector, Reporter reporter)
throws IOException {
try {
Parser parser = new Parser(value);
if (parser.isDistanceAvailable()) {
outputKey.set(parser.getDistance());
collector.collect(outputKey, value);
}
} catch (Exception ex) {
outputKey.set(0);
collector.collect(outputKey, value);
}
}
}
}
참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판 - 정재화 지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop total sort exercise ( 전체 정렬 실습 ) (0) | 2017.03.12 |
---|---|
hadoop partial sort exercise include search ( 부분 정렬 실습 3 ) (0) | 2017.03.12 |
hadoop Text -> SequenceFile -> MapFile로 변환 (0) | 2017.03.07 |
hadoop Mapper이용 text파일을 SequenceFileFormat으로 저장 (0) | 2017.03.06 |
hadoop total sort ( 전체 정렬 ) (0) | 2017.03.01 |
Comments