DBILITY

하둡 정리 2 본문

bigdata/hadoop

하둡 정리 2

DBILITY 2016. 11. 13. 09:20
반응형
  • Mapper Class는 org.apache.hadoop.mapreduce.Mapper를 상속하여 구현한다.
    map을 구현한다. 사용자정의 옵션을 입력받을 경우(-D property=value) setup을 구현하며,
    context에서 해당 설정값을 추출하여 사용한다.

    public class AsaDelayMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    	private String workType;
    	private Text outKey = new Text();
    	private static final IntWritable outValue = new IntWritable(1);
    
    	@Override
    	protected void setup(Context context) throws IOException, InterruptedException {
    		workType = context.getConfiguration().get("workType");
    	}
    
    	@Override
    	protected void map(LongWritable key, Text value, Context context)
    			throws IOException, InterruptedException {
    
    		AsaPerformParser parser = new AsaPerformParser(value);
    
    		if ( workType.equals("departure") ){
    			if (parser.isDepDelayAvailable() && parser.getDepDelayTime() > 0 ){
    				outKey.set(parser.getYear()+","+parser.getMonth());
    				context.write(outKey, outValue);
    			}
    		} else if ( workType.equals("arrival") ) {
    			if (parser.isArrDelayAvailable() && parser.getArrDelayTime() > 0) {
    				outKey.set(parser.getYear()+","+parser.getMonth());
    				context.write(outKey, outValue);
    			}
    		}
    	}
    }
    
  • Reducer Class는 org.apache.hadoop.mapreduce.Reducer를 상속하여 구현한다.
    reduce를 구현한다. 사용자정의 옵션을 입력받을 경우(-D property=value) setup을 구현하며,
    context에서 해당 설정값을 추출하여 사용한다.
    MultipleOuputs을 사용할 경우 cleanup을 구현하여 MultipleOuputs 객체를 닫아 준다.

    public class AsaDelayReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    	private String workType;
    	private IntWritable outValue = new IntWritable();
    
    	@Override
    	protected void setup(Context context) throws IOException, InterruptedException {
    		workType = context.getConfiguration().get("workType");
    	}
    
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    		int sum = 0;
    
    		if(workType.equals("departure")){
    			for (IntWritable value : values) {
    				sum+=value.get();
    			}
    			outValue.set(sum);
    			context.write(key, outValue);
    		} else if (workType.equals("arrival")) {
    			for (IntWritable value : values) {
    				sum+=value.get();
    			}
    			outValue.set(sum);
    			context.write(key, outValue);
    		}
    	}
    }
    
  • Driver Class는 Configured를 상속받고,Tool interface를 구현한다.
    public class AsaDelayDriver extends Configured implements Tool {
    
    	@Override
    	public int run(String[] args) throws Exception {
    	.....
    	}
    	public static void main(String[] args) throws Exception {
    	.....
    	}
    }
    

    이때, main method에 ToolRunner.run method를 실행하며, hadoop configuration에 옵션을 추가할 수 있다.
    참고로, 4~11라인까지는 윈도우환경 Eclipse에서 local debugging시 필요한 사항이다.

    public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    
    		conf.set("fs.default.name", "file:///");
    		conf.set("mapred.job.tracker", "local");
    		conf.set("fs.file.impl", "com.dbility.hadoop.util.WindowsLocalFileSystem");
    		conf.set("io.serializations",
    				 "org.apache.hadoop.io.serializer.JavaSerialization,"
    				+"org.apache.hadoop.io.serializer.WritableSerialization");
    
    		args = new String[] { "d:/hadoop_test/2008.csv", "d:/hadoop_test/output2008" };
    
    		Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDelayDriver(), args));
    	}
    
    run method를 구현하여 mapper,reducer등 실행에 필요한 작업을 기술한다.
    이때,GenericOptionParser를 통해 parsing을 해줘야 하고, 남은 args는 input과 output이 된다.
    @Override
    	public int run(String[] args) throws Exception {
    
    		String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
    
    		if (remainArgs.length != 2) {
    			System.out.println("Usage : hadoop jar JarFile <mainclass> <input> <output>");
    			return -1;
    		}
    
    		Job job = new Job(getConf(), "asaPerformAnlaysis");
    
    		job.setJarByClass(AsaDelayDriver.class);
    		job.setMapperClass(AsaDelayMapper.class);
    		job.setReducerClass(AsaDelayReducer.class);
    
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);
    
    		FileInputFormat.addInputPath(job, new Path(remainArgs[0]));
    		FileOutputFormat.setOutputPath(job, new Path(remainArgs[1]));
    
    		FileSystem hdfs = FileSystem.get(getConf());
    		Path path = new Path(remainArgs[1]);
    		if (hdfs.exists(path))
    			hdfs.delete(path, true);
    
    		return job.waitForCompletion(true) ? 0 : -2;
    	}

    Generic Options
    The supported generic options are:
         -conf <configuration file>     specify a configuration file
         -D <property=value>            use value for given property
         -fs <local|namenode:port>      specify a namenode
         -jt <local|jobtracker:port>    specify a job tracker
         -files <comma separated list of files>    specify comma separated
                                files to be copied to the map reduce cluster
         -libjars <comma separated list of jars>   specify comma separated
                                jar files to include in the classpath.
         -archives <comma separated list of archives>    specify comma
                 separated archives to be unarchived on the compute machines.

    The general command line syntax is:
    
     bin/hadoop command [genericOptions] [commandOptions]
     
    Generic command line arguments might modify Configuration objects, given to constructors.
    
    The functionality is implemented using Commons CLI.
    
    Examples:
    
     $ bin/hadoop dfs -fs darwin:8020 -ls /data
     list /data directory in dfs with namenode darwin:8020
     
     $ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
     list /data directory in dfs with namenode darwin:8020
         
     $ bin/hadoop dfs -conf hadoop-site.xml -ls /data
     list /data directory in dfs with conf specified in hadoop-site.xml
         
     $ bin/hadoop job -D mapred.job.tracker=darwin:50020 -submit job.xml
     submit a job to job tracker darwin:50020
         
     $ bin/hadoop job -jt darwin:50020 -submit job.xml
     submit a job to job tracker darwin:50020
         
     $ bin/hadoop job -jt local -submit job.xml
     submit a job to local runner
     
     $ bin/hadoop jar -libjars testlib.jar 
     -archives test.tgz -files file.txt inputjar args
     job submission with libjars, files and archives
    
  • 하둡에서 제공하는 기본 카운터에 사용자정의 카운터를 추가하여 Logger를 대신하여 Map/Reduce의 동작을 체크할 수 있다.
    enum클래스를 이용해 구현하며, 클래스명은 카운터그룹명이 되고,필드는 카운터이름이 된다.
    context객체에서 getCounter 메서드를 통해 다룰 수 있다.

    public enum AsaDelayCounters {
    	not_available_arrival,	// 도착지연시간이 NA
    	scheduled_arrival,		// 정시도착
    	early_arrival,			// 조기도착
    	not_available_departure,// 출발지연시간이 NA
    	scheduled_departure,	// 정시출발
    	early_departure,		// 조기출발
    }
    
  • 다수의 파일출력이 필요할 경우
    MultipleOutputs를 이용해 여러 개의 OutputCollectors를 생성하고,
    AddNamedOutput 메서드를 통해 출력경로,포맷,키와 값 유형을 설정한다.

    public class AsaDelayDriver extends Configured implements Tool {
    
    	@Override
    	public int run(String[] args) throws Exception {
    		....
    		MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);
    		MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);
    		....
    	}
    	....
    }
    
    public class AsaDelayReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    	....
    	private MultipleOutputs<Text, IntWritable> mos;
    
    	@Override
    	protected void setup(Context context) throws IOException, InterruptedException {
    		mos = new MultipleOutputs<Text, IntWritable>(context);
    	}
    
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
    			throws IOException, InterruptedException {
    		....
    		mos.write("departure", outKey, outValue);
    		....
    		mos.write("arrival", outKey, outValue);
    		....
    	}
    
    	@Override
    	protected void cleanup(Context context) throws IOException,
    			InterruptedException {
    		mos.close();
    	}
    }


반응형
Comments