Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- vaadin
- JavaScript
- Java
- Kotlin
- IntelliJ
- SPC
- Android
- xPlatform
- tomcat
- hadoop
- 보조정렬
- MSSQL
- SQL
- Eclipse
- plugin
- react
- Spring
- mapreduce
- Python
- table
- window
- GIT
- Express
- SSL
- NPM
- es6
- 공정능력
- Sqoop
- mybatis
- R
Archives
- Today
- Total
DBILITY
hadoop secondary sort 4 ( 보조정렬 실습 ) 본문
반응형
ASA항공운항 기록에서 년/월별 지연도착 통계를 정렬 후 SequenceFile로 저장하고, 읽는 걸 해봄.
package com.dbility.hadoop.execise;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySortDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
String[] remainArgs = new GenericOptionsParser(getConf(), args)
.getRemainingArgs();
if (remainArgs.length != 2) {
System.out
.println("Usage : hadoop jar jarName [mainClass] <input_path> <output_path>");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(SecondarySortDriver.class);
job.setJobName("SecondarySort");
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(SecMapper.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(SecCombiner.class);
job.setNumReduceTasks(1);
job.setPartitionerClass(SecParitioner.class);
job.setGroupingComparatorClass(SecGroupingComparator.class);
job.setSortComparatorClass(SecSortComparator.class);
job.setReducerClass(SecReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(CompositeKey.class);
job.setOutputValueClass(IntWritable.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
Path input_path = new Path(remainArgs[0]);
Path output_path = new Path(remainArgs[1]);
FileSystem hdfs = FileSystem.get(getConf());
FileInputFormat.addInputPath(job, input_path);
FileOutputFormat.setOutputPath(job, output_path);
if ( hdfs.exists(output_path) ) hdfs.delete(output_path, true);
return job.waitForCompletion(true) ? 0 : -2;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
conf.set(
"io.serializatilons",
"org.apache.hadoop.io.serializer.JavaSerialization,"
+ "org.apache.hadoop.io.serializer.WritableSerialization");
conf.set("fs.file.impl",
"com.dbility.hadoop.execise.SecondarySortDriver$WindowsLocalFileSystem");
args = new String[] { "d:/hadoop_test/input_data/",
"d:/hadoop_test/sec_data/" };
int result = ToolRunner.run(conf, new SecondarySortDriver(), args);
if ( result == 0 ) {
FileSystem hdfs = FileSystem.get(conf);
Path read_path = new Path(args[1]+"part-r-00000");
if (!hdfs.exists(read_path)) {
System.out.println("requested path is not found!!");
result = -2;
} else {
hdfs.delete(new Path(args[1]+"_SUCCESS"), true);
SequenceFile.Reader reader = new SequenceFile.Reader(hdfs, read_path, conf);
Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
while(reader.next(key, value)) {
System.out.printf("%s\t%s\n",key.toString(),value.toString());
}
IOUtils.closeStream(reader);
}
}
Runtime.getRuntime().exit(result);
}
public static class WindowsLocalFileSystem extends LocalFileSystem {
public WindowsLocalFileSystem() {
super();
}
@Override
public boolean mkdirs(final Path f, final FsPermission permission)
throws IOException {
final boolean result = super.mkdirs(f);
this.setPermission(f, permission);
return result;
}
@Override
public void setPermission(final Path p, final FsPermission permission)
throws IOException {
try {
super.setPermission(p, permission);
} catch (final IOException e) {
System.err.println(e.getMessage());
}
}
}
public static class Parser {
private String year;
private Integer month;
private Integer arrivalDelayTime = 0;
private boolean arrivalDelayAvailable = false;
public Parser(Text value) {
String[] columns = value.toString().split(",");
this.year = columns[0];
this.month = Integer.parseInt(columns[1]);
if (!"NA".equals(columns[14])) {
this.arrivalDelayTime = Integer.parseInt(columns[14]);
if (this.arrivalDelayTime > 0) {
this.arrivalDelayAvailable = true;
}
}
}
public String getYear() {
return year;
}
public Integer getMonth() {
return month;
}
public Integer getArrivalDelayTime() {
return arrivalDelayTime;
}
public boolean isArrivalDelayAvailable() {
return arrivalDelayAvailable;
}
}
public static class CompositeKey implements
WritableComparable<CompositeKey> {
private String year;
private Integer month;
public CompositeKey() {
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public Integer getMonth() {
return month;
}
public void setMonth(Integer month) {
this.month = month;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year = WritableUtils.readString(in);
this.month = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeString(out, year);
out.writeInt(month);
}
@Override
public int compareTo(CompositeKey o) {
int result = this.year.compareTo(o.getYear());
if (result == 0)
result = this.month.compareTo(o.getMonth());
return result;
}
@Override
public String toString() {
return (new StringBuilder()).append(year).append("\t").append(month)
.toString();
}
}
public static class SecParitioner extends
Partitioner<CompositeKey, IntWritable> {
@Override
public int getPartition(CompositeKey key, IntWritable value,
int numReduceTasks) {
return key.getYear().hashCode() % numReduceTasks;
}
}
public static class SecGroupingComparator extends WritableComparator {
public SecGroupingComparator() {
super(CompositeKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
CompositeKey k1 = (CompositeKey) a;
CompositeKey k2 = (CompositeKey) b;
return k1.getYear().compareTo(k2.getYear());
}
}
public static class SecSortComparator extends WritableComparator {
public SecSortComparator() {
super(CompositeKey.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
CompositeKey k1 = (CompositeKey) a;
CompositeKey k2 = (CompositeKey) b;
int cmp = k1.getYear().compareTo(k2.getYear());
if (cmp != 0)
return cmp;
return k1.getMonth() == k2.getMonth() ? 0 : (k1.getMonth() < k2
.getMonth() ? -1 : 1);
}
}
public static class SecMapper extends
Mapper<LongWritable, Text, CompositeKey, IntWritable> {
private CompositeKey outputKey = new CompositeKey();
private final static IntWritable outputValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Parser parser = new Parser(value);
if (parser.isArrivalDelayAvailable()) {
outputKey.setYear(parser.getYear());
outputKey.setMonth(parser.getMonth());
context.write(outputKey, outputValue);
}
}
}
public static class SecCombiner extends
Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {
private IntWritable outputValue = new IntWritable();
private int sum;
@Override
protected void reduce(CompositeKey key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outputValue.set(sum);
context.write(key, outputValue);
}
}
public static class SecReducer extends
Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> {
private CompositeKey outputKey = new CompositeKey();
private IntWritable outputValue = new IntWritable();
private int sum;
@Override
protected void reduce(CompositeKey key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
sum = 0;
int preMonth = key.getMonth();
for (IntWritable value : values) {
if (preMonth != key.getMonth()) {
outputKey.setYear(key.getYear());
outputKey.setMonth(preMonth);
outputValue.set(sum);
context.write(outputKey, outputValue);
sum = 0;
}
sum += value.get();
preMonth = key.getMonth();
}
if (preMonth == key.getMonth()) {
outputKey.setYear(key.getYear());
outputKey.setMonth(preMonth);
outputValue.set(sum);
context.write(outputKey, outputValue);
}
}
}
}
참고서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop 2.x winutils (0) | 2017.03.27 |
---|---|
hadoop 2.6.x eclipse plugin (0) | 2017.03.23 |
hadoop total sort exercise ( 전체 정렬 실습 ) (0) | 2017.03.12 |
hadoop partial sort exercise include search ( 부분 정렬 실습 3 ) (0) | 2017.03.12 |
hadoop partial sort exercise 2 ( 부분 정렬 실습 2 ) (0) | 2017.03.08 |
Comments