Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Java
- mapreduce
- MSSQL
- R
- window
- es6
- GIT
- vaadin
- Eclipse
- Kotlin
- Android
- SQL
- IntelliJ
- Python
- NPM
- hadoop
- plugin
- mybatis
- SPC
- 공정능력
- tomcat
- table
- Spring
- SSL
- react
- JavaScript
- xPlatform
- Sqoop
- 보조정렬
- Express
Archives
- Today
- Total
DBILITY
hadoop partial sort exercise include search ( 부분 정렬 실습 3 ) 본문
bigdata/hadoop
hadoop partial sort exercise include search ( 부분 정렬 실습 3 )
DBILITY 2017. 3. 12. 00:35반응형
2번째 실습에서 보였던 결과 저장 파일의 맨 앞줄은 PrintWriter로 write하니 이상한 문자가 사라졌다.
윈도 환경 eclipse에서 실행하였다.
리눅스 환경의 완전 분산 모드에선 Codec을 GzipCodec으로 변경했고, Configuration property 설정 부분을 주석처리 후 테스트했다.
hadoop3이 나오는 마당에 hadoop2환경까진 공부하고. 이후엔 SQL on Hadoop인 tajo까지 사용 가능해야 한다.
그때쯤엔 spring+mybatis환경에서 tajo jdbc driver를 통해 RIA UI에 hdfs 데이터를 리스트업 하는 것까지 해보고 싶다.
-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dbility.hadoop</groupId> <artifactId>execise</artifactId> <version>1.0.0</version> <properties> <java.version>1.7</java.version> <encoding.charset>UTF-8</encoding.charset> <scp.user>hadoop</scp.user> <scp.password>hadoop</scp.password> <scp.host>big-master</scp.host> <scp.copy2dir>home/hadoop</scp.copy2dir> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.21</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.3.2</version> <configuration> <archive> <manifest> <mainClass>com.dbility.hadoop.execise.PartialSortDriver</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>default-jar</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <includes> <include>${project.groupId}:${project.artifactId}</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/maven/**</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> <version>1.8</version> <dependencies> <dependency> <groupId>org.apache.ant</groupId> <artifactId>ant-jsch</artifactId> <version>1.9.7</version> </dependency> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.54</version> </dependency> </dependencies> <executions> <execution> <phase>install</phase> <goals> <goal>run</goal> </goals> <configuration> <tasks> <scp file="${project.basedir}/target/${project.artifactId}.jar" todir="${scp.user}:${scp.password}@${scp.host}:/${scp.copy2dir}" trust="true"> </scp> </tasks> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
-
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern> </encoder> </appender> <logger name="org.apache.hadoop.mapred.JobClient" level="info" additivity="false"> <appender-ref ref="console" /> </logger> <root level="warn" > <appender-ref ref="console" /> </root> </configuration>
-
package com.dbility.hadoop.execise; import java.io.IOException; import java.io.PrintWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile.Reader; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapFileOutputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * Description * * * @author hyperrookie@gmail.com * * @version 1.0.0 * @date 2017. 3. 11. */ public class PartialSortDriver extends Configured implements Tool { public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args) .getRemainingArgs(); if (remainArgs.length != 5) { System.out .println("Usage : hadoop jar jarName [mainClass] <input_path> <sequence_path> <map_path> <save_path> <search_miles>"); return -1; } Path input_path = new Path(remainArgs[0]); Path sequence_path = new Path(remainArgs[1]); Path map_path = new Path(remainArgs[2]); Path save_path = new Path(remainArgs[3] + "/search_miles.txt"); IntWritable search_miles = new IntWritable( Integer.parseInt(remainArgs[4])); FileSystem hdfs = FileSystem.get(getConf()); JobConf jobConf01 = new JobConf(); jobConf01.setJarByClass(PartialSortDriver.class); jobConf01.setJobName("Text2SequenceFileConverter"); jobConf01.setInputFormat(TextInputFormat.class); jobConf01.setMapperClass(Text2SequenceFileMapper.class); jobConf01.setNumReduceTasks(0); jobConf01.setOutputFormat(SequenceFileOutputFormat.class); jobConf01.setOutputKeyClass(IntWritable.class); jobConf01.setOutputValueClass(Text.class); SequenceFileOutputFormat.setCompressOutput(jobConf01, true); SequenceFileOutputFormat.setOutputCompressorClass(jobConf01, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(jobConf01, CompressionType.BLOCK); FileInputFormat.addInputPath(jobConf01, input_path); FileOutputFormat.setOutputPath(jobConf01, sequence_path); if (!hdfs.exists(input_path)) { System.out.printf("%s is not found!", input_path.getName()); return -2; } if (hdfs.exists(sequence_path)) { hdfs.delete(sequence_path, true); } RunningJob job01 = JobClient.runJob(jobConf01); if (job01.getJobState() != 2) { System.out.printf("%s is failed!", job01.getJobName()); return -3; } JobConf jobConf02 = new JobConf(getConf(), PartialSortDriver.class); jobConf02.setJobName("SequenceFile2MapFileConverter"); jobConf02.setInputFormat(SequenceFileInputFormat.class); jobConf02.setOutputFormat(MapFileOutputFormat.class); jobConf02.setOutputKeyClass(IntWritable.class); SequenceFileOutputFormat.setCompressOutput(jobConf02, true); SequenceFileOutputFormat.setOutputCompressorClass(jobConf02, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(jobConf02, CompressionType.BLOCK); FileInputFormat.addInputPath(jobConf02, sequence_path); FileOutputFormat.setOutputPath(jobConf02, map_path); if (!hdfs.exists(sequence_path)) { System.out.printf("%s is not found!", sequence_path.getName()); return -4; } else { hdfs.delete(new Path(remainArgs[1]+"/_SUCCESS"), true); hdfs.delete(new Path(remainArgs[1]+"/_logs"), true); } if (hdfs.exists(map_path)) { hdfs.delete(map_path, true); } RunningJob job02 = JobClient.runJob(jobConf02); if (job02.getJobState() != 2) { System.out.printf("%s is failed!", job02.getJobName()); return -5; } hdfs.delete(new Path(remainArgs[2]+"/_SUCCESS"), true); hdfs.delete(new Path(remainArgs[2]+"/_logs"), true); if (hdfs.exists(save_path)) { hdfs.delete(save_path, true); } Reader[] readers = MapFileOutputFormat.getReaders(hdfs, map_path, getConf()); Partitioner<IntWritable, Text> partitioner = new HashPartitioner<IntWritable, Text>(); IntWritable nextKey = new IntWritable(); Text value = new Text(); Reader reader = readers[partitioner.getPartition(search_miles, value, readers.length)]; Writable entry = reader.get(search_miles, value); if ( entry == null ) { System.out.printf("requested key [%s] is not found! ",search_miles); return -6; } FSDataOutputStream outputStream = hdfs.create(save_path, true); PrintWriter writer = new PrintWriter(outputStream); do { System.out.println(value); writer.write(value.toString()+"\n"); } while ( reader.next(nextKey, value) && search_miles.equals(nextKey)); writer.close(); outputStream.close(); return 0; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set( "io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); conf.set("fs.file.impl", "com.dbility.hadoop.execise.PartialSortDriver$WindowsLocalFileSystem"); conf.set("io.sort.mb", "512"); args = new String[] { "d:\\hadoop_test\\input_data", "d:\\hadoop_test\\sequence_data", "d:\\hadoop_test\\map_data", "d:\\hadoop_test\\", "100" }; Runtime.getRuntime().exit( ToolRunner.run(conf, new PartialSortDriver(), args)); } public static class WindowsLocalFileSystem extends LocalFileSystem { public WindowsLocalFileSystem() { super(); } @Override public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { final boolean result = super.mkdirs(f); return result; } @Override public void setPermission(final Path p, final FsPermission permission) throws IOException { try { super.setPermission(p, permission); } catch (final IOException e) { System.err.println(e.getMessage()); } } } public static class Parser { private String year; private Integer month; private Integer distance; private boolean distanceAvaliable = false; public Parser(Text value) { String[] columns = value.toString().split(","); this.year = columns[0]; this.month = Integer.parseInt(columns[1]); if (!"NA".equals(columns[18])) { this.distance = Integer.parseInt(columns[18]); if (this.distance > 0) this.distanceAvaliable = true; } } public String getYear() { return year; } public Integer getMonth() { return month; } public Integer getDistance() { return distance; } public boolean isDistanceAvaliable() { return distanceAvaliable; } } public static class Text2SequenceFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { private IntWritable outputKey = new IntWritable(); public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> collector, Reporter reporter) throws IOException { try { Parser parser = new Parser(value); if (parser.isDistanceAvaliable()) { outputKey.set(parser.distance); collector.collect(outputKey, value); } } catch (Exception e) { outputKey.set(0); collector.collect(outputKey, value); } } } }
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop secondary sort 4 ( 보조정렬 실습 ) (0) | 2017.03.12 |
---|---|
hadoop total sort exercise ( 전체 정렬 실습 ) (0) | 2017.03.12 |
hadoop partial sort exercise 2 ( 부분 정렬 실습 2 ) (0) | 2017.03.08 |
hadoop Text -> SequenceFile -> MapFile로 변환 (0) | 2017.03.07 |
hadoop Mapper이용 text파일을 SequenceFileFormat으로 저장 (0) | 2017.03.06 |
Comments