일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- plugin
- Express
- SSL
- SPC
- 공정능력
- MSSQL
- hadoop
- Python
- window
- Android
- IntelliJ
- mybatis
- R
- NPM
- react
- 보조정렬
- es6
- xPlatform
- Kotlin
- vaadin
- Eclipse
- JavaScript
- tomcat
- Java
- table
- Spring
- Sqoop
- SQL
- mapreduce
- GIT
- Today
- Total
DBILITY
하둡 정리 2 본문
Mapper Class는 org.apache.hadoop.mapreduce.Mapper를 상속하여 구현한다.
map을 구현한다. 사용자정의 옵션을 입력받을 경우(-D property=value) setup을 구현하며,
context에서 해당 설정값을 추출하여 사용한다.public class AsaDelayMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private String workType; private Text outKey = new Text(); private static final IntWritable outValue = new IntWritable(1); @Override protected void setup(Context context) throws IOException, InterruptedException { workType = context.getConfiguration().get("workType"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { AsaPerformParser parser = new AsaPerformParser(value); if ( workType.equals("departure") ){ if (parser.isDepDelayAvailable() && parser.getDepDelayTime() > 0 ){ outKey.set(parser.getYear()+","+parser.getMonth()); context.write(outKey, outValue); } } else if ( workType.equals("arrival") ) { if (parser.isArrDelayAvailable() && parser.getArrDelayTime() > 0) { outKey.set(parser.getYear()+","+parser.getMonth()); context.write(outKey, outValue); } } } }
Reducer Class는 org.apache.hadoop.mapreduce.Reducer를 상속하여 구현한다.
reduce를 구현한다. 사용자정의 옵션을 입력받을 경우(-D property=value) setup을 구현하며,
context에서 해당 설정값을 추출하여 사용한다.
MultipleOuputs을 사용할 경우 cleanup을 구현하여 MultipleOuputs 객체를 닫아 준다.public class AsaDelayReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private String workType; private IntWritable outValue = new IntWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { workType = context.getConfiguration().get("workType"); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; if(workType.equals("departure")){ for (IntWritable value : values) { sum+=value.get(); } outValue.set(sum); context.write(key, outValue); } else if (workType.equals("arrival")) { for (IntWritable value : values) { sum+=value.get(); } outValue.set(sum); context.write(key, outValue); } } }
- Driver Class는 Configured를 상속받고,Tool interface를 구현한다.
public class AsaDelayDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { ..... } public static void main(String[] args) throws Exception { ..... } }
이때, main method에 ToolRunner.run method를 실행하며, hadoop configuration에 옵션을 추가할 수 있다.
참고로, 4~11라인까지는 윈도우환경 Eclipse에서 local debugging시 필요한 사항이다.public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem"); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," +"org.apache.hadoop.io.serializer.WritableSerialization"); args = new String[] { "d:/hadoop_test/2008.csv", "d:/hadoop_test/output2008" }; Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDelayDriver(), args)); }
run method를 구현하여 mapper,reducer등 실행에 필요한 작업을 기술한다.
이때,GenericOptionParser를 통해 parsing을 해줘야 하고, 남은 args는 input과 output이 된다.@Override public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (remainArgs.length != 2) { System.out.println("Usage : hadoop jar JarFile <mainclass> <input> <output>"); return -1; } Job job = new Job(getConf(), "asaPerformAnlaysis"); job.setJarByClass(AsaDelayDriver.class); job.setMapperClass(AsaDelayMapper.class); job.setReducerClass(AsaDelayReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if (hdfs.exists(path)) hdfs.delete(path, true); return job.waitForCompletion(true) ? 0 : -2; }
Generic Options
The supported generic options are:
-conf <configuration file> specify a configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated
files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated
jar files to include in the classpath.
-archives <comma separated list of archives> specify comma
separated archives to be unarchived on the compute machines.
The general command line syntax is: bin/hadoop command [genericOptions] [commandOptions] Generic command line arguments might modify Configuration objects, given to constructors. The functionality is implemented using Commons CLI. Examples: $ bin/hadoop dfs -fs darwin:8020 -ls /data list /data directory in dfs with namenode darwin:8020 $ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data list /data directory in dfs with namenode darwin:8020 $ bin/hadoop dfs -conf hadoop-site.xml -ls /data list /data directory in dfs with conf specified in hadoop-site.xml $ bin/hadoop job -D mapred.job.tracker=darwin:50020 -submit job.xml submit a job to job tracker darwin:50020 $ bin/hadoop job -jt darwin:50020 -submit job.xml submit a job to job tracker darwin:50020 $ bin/hadoop job -jt local -submit job.xml submit a job to local runner $ bin/hadoop jar -libjars testlib.jar -archives test.tgz -files file.txt inputjar args job submission with libjars, files and archives
하둡에서 제공하는 기본 카운터에 사용자정의 카운터를 추가하여 Logger를 대신하여 Map/Reduce의 동작을 체크할 수 있다.
enum클래스를 이용해 구현하며, 클래스명은 카운터그룹명이 되고,필드는 카운터이름이 된다.
context객체에서 getCounter 메서드를 통해 다룰 수 있다.public enum AsaDelayCounters { not_available_arrival, // 도착지연시간이 NA scheduled_arrival, // 정시도착 early_arrival, // 조기도착 not_available_departure,// 출발지연시간이 NA scheduled_departure, // 정시출발 early_departure, // 조기출발 }
다수의 파일출력이 필요할 경우
MultipleOutputs를 이용해 여러 개의 OutputCollectors를 생성하고,
AddNamedOutput 메서드를 통해 출력경로,포맷,키와 값 유형을 설정한다.public class AsaDelayDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { .... MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class); .... } .... } public class AsaDelayReducer extends Reducer<Text, IntWritable, Text, IntWritable> { .... private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, IntWritable>(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { .... mos.write("departure", outKey, outValue); .... mos.write("arrival", outKey, outValue); .... } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } }
'bigdata > hadoop' 카테고리의 다른 글
hadoop MRUnit 사용하기 (0) | 2016.11.17 |
---|---|
hadoop ChainMapper, Reducer exercise (0) | 2016.11.14 |
hadoop MultipleOutputs을 이용한 ASA 통계 작성 (0) | 2016.11.12 |
output Key,Value Class 미지정시 오류 (0) | 2016.11.11 |
hadoop WordCount GenericOptionParser 적용 예제 (0) | 2016.11.06 |