Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- 보조정렬
- Sqoop
- SPC
- NPM
- R
- es6
- hadoop
- mybatis
- Eclipse
- plugin
- GIT
- Express
- window
- MSSQL
- tomcat
- SSL
- vaadin
- Spring
- react
- 공정능력
- Kotlin
- table
- SQL
- IntelliJ
- JavaScript
- mapreduce
- Java
- Python
- Android
- xPlatform
Archives
- Today
- Total
DBILITY
flume hadoop 연동 테스트 소스 데이터 생성 본문
반응형
hadoop cluster에 avro sink를 통해 저장하는 걸 가상으로 테스트해보고 싶은데, 그렇다고 능력은 안되고,
flume-ng-core 소스중 org.apache.flume.source.SequenceGeneratorSource.java를 살짝 바꿨다.
PollableSourceRunner를 보니 getMaxBackOffSleepInterval만큼 Tread를 sleep상태로 둔다.기본 5000ms
이것 저거 바꿔볼 몸 상태가 아니다.목,어깨,손이 아프다.
build후엔 flume directory에 lib 또는 plugin.d/plugin명/lib 넣으면 된다.안되면 말고~
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dbility.bigdata.flume.source</groupId>
<artifactId>flume-source-generator</artifactId>
<version>0.0.1</version>
<properties>
<java.version>1.8</java.version>
<flume.version>1.8.0</flume.version>
<slf4j.version>1.6.1</slf4j.version>
<encoding-char>utf-8</encoding-char>
</properties>
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>dbility</id>
<name>dbility Repository</name>
<url>http://112.216.48.138/nexus/content/groups/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<releases>
<updatePolicy>never</updatePolicy>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>dbility</id>
<name>dbility Repository</name>
<url>http://112.216.48.138/nexus/content/groups/public/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${encoding-char}</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
RandomDataGenerator.java
package com.dbility.bigdata.flume.source;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
*
* Description
*
*
* @author hyperrookie@gmail.com
*
* @version 0.0.1
* @date 2018. 5. 2.
*=======================================================================
* Date Name Revision History
*=======================================================================
* 2018. 5. 2. hyperrookie@gmail.com Creation
*=======================================================================
*/
public class RandomDataGenerator extends AbstractPollableSource implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(RandomDataGenerator.class);
private int batchSize;
private SourceCounter sourceCounter;
private long totalEvents;
private long eventsSent = 0;
private String[] column;
private String[] columnLength;
private String virtualDataStr;
@Override
protected void doConfigure(Context context) throws FlumeException {
batchSize = context.getInteger("batchSize", 1);
totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);
column = context.getString("column", "randomKey").split(",");
columnLength = context.getString("columnLength", "").split(",");
Preconditions.checkArgument(batchSize > 0, "batchSize was %s but expected positive", batchSize);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
}
@Override
protected Status doProcess() throws EventDeliveryException {
Status status = Status.READY;
long eventsSentTX = eventsSent;
virtualDataStr = "";
try {
if (batchSize == 1) {
if (eventsSentTX < totalEvents) {
virtualDataStr+=","+getName()+"-"+InetAddress.getLocalHost().getHostAddress();
for (int j = 0 ; j < column.length; j++) {
if(column[j].toString().equals("randomKey")) {
virtualDataStr+=","+RandomUtil.randomKey();
} else {
virtualDataStr+=","+RandomUtil.randomKey(Integer.parseInt(columnLength[j]));
}
}
virtualDataStr+=","+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.getDefault()));
getChannelProcessor().processEvent(
EventBuilder.withBody(String.valueOf((eventsSentTX++)+virtualDataStr).getBytes()));
sourceCounter.incrementEventAcceptedCount();
logger.info("processEvent batchSize : {}", batchSize);
} else {
//status = Status.BACKOFF;
}
} else {
List<Event> batchArrayList = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
virtualDataStr = "";
if (eventsSentTX < totalEvents) {
virtualDataStr+=","+getName()+"-"+InetAddress.getLocalHost().getHostAddress();
for (int j = 0 ; j < column.length; j++) {
if(column[j].toString().equals("randomKey")) {
virtualDataStr+=","+RandomUtil.randomKey();
} else {
virtualDataStr+=","+RandomUtil.randomKey(Integer.parseInt(columnLength[j]));
}
}
virtualDataStr+=","+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.getDefault()));
batchArrayList.add(i, EventBuilder.withBody(String
.valueOf((eventsSentTX++)+virtualDataStr).getBytes()));
} else {
//status = Status.BACKOFF;
break;
}
}
if (!batchArrayList.isEmpty()) {
getChannelProcessor().processEventBatch(batchArrayList);
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(batchArrayList.size());
logger.info("processEventBatch batchArrayList.size() : {}", batchArrayList.size());
logger.info("processEventBatch batchArrayList : {}", batchArrayList);
}
}
eventsSent = eventsSentTX;
} catch (ChannelException ex) {
logger.error( getName() + " source could not write to channel.", ex);
} catch (UnknownHostException ex) {
logger.error( getName() + " source could not read host address.", ex);
}
status = Status.BACKOFF;
return status;
}
@Override
protected void doStart() throws FlumeException {
logger.info("RandomDataGenerator source do starting");
sourceCounter.start();
logger.debug("RandomDataGenerator source do started");
}
@Override
protected void doStop() throws FlumeException {
logger.info("RandomDataGenerator source do stopping");
sourceCounter.stop();
logger.info("RandomDataGenerator source do stopped. Metrics:{}",getName(), sourceCounter);
}
}
collector - rndGen.properties
#Source
agent.sources = randomGen
agent.sources.randomGen.type = com.dbility.bigdata.flume.source.RandomDataGenerator
agent.sources.randomGen.batchSize = 5
agent.sources.randomGen.column = randomKey,machcd,profile,itdsc
agent.sources.randomGen.columnLength = 20,3,14,20
agent.sources.randomGen.channels = memoryChannel
#Channel
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100000
agent.channels.memoryChannel.transactionCapacity = 10000
agent.channels.memoryChannel.keep-alive = 60
#Sink
agent.sinks = avroSink
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname=big-master
agent.sinks.avroSink.port=4141
agent.sinks.avroSink.batch-size=5
server - flume.properties
agent.sources = avroSrc
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memoryChannel
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000000
agent.sinks = hdfsSink
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.path = hdfs://hadoop-cluster/flume/events/%y%m%d/%H%M/%S
agent.sinks.hdfsSink.hdfs.writFormat = text
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.filePrefix = rndGen_%[FQDN]_%y%m%d%H%M%S
agent.sinks.hdfsSink.hdfs.fileSuffix = .log
#Sink
#agent.sinks = loggerSink
#agent.sinks.loggerSink.channel = memoryChannel
#agent.sinks.loggerSink.type = logger
#agent.sinks.loggerSink.maxBytesToLog = 10000
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events/180503
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1504
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events/180503/1503/
Found 15 items
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503/03
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503/06
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503/11
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503/16
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503/21
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:03 /flume/events/180503/1503/26
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/31
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/36
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/37
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/41
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/42
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/46
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/47
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/52
drwxr-xr-x - hadoop supergroup 0 2018-05-03 15:04 /flume/events/180503/1503/57
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events/180503/1503/57
Found 1 items
-rw-r--r-- 3 hadoop supergroup 435 2018-05-03 15:04 /flume/events/180503/1503/57/rndGen_big-master_180503150357.1525327437028.log
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -cat /flume/events/180503/1503/57/rndGen_big-master_180503150357.1525327437028.log
70,randomGen-192.168.100.18,60qq-72dml14c-2asz-37uo,i57,3qmjhjt0ydrubz,cum5z057zlshbta760ki,2018-05-03 15:03:57
71,randomGen-192.168.100.18,0wem-ob37iybb-g5oq-22nc,lyl,d68u3fhh9dxm84,6tpmfhzvppnvb2011y2z,2018-05-03 15:03:57
72,randomGen-192.168.100.18,ll9l-hrmihmt2-q971-xu81,ax6,lh38qurtytmxzl,yvqu99au0hhxs5ltsdfp,2018-05-03 15:03:57
73,randomGen-192.168.100.18,pde2-f6vcar2o-uaob-z547,021,xiaga6lvv5sihg,5vj0wqjm07j94fcij8id,2018-05-03 15:03:57
74,randomGen-192.168.100.18,rioy-i8caolah-20ks-7bpw,k4w,6ezym4lt4tejqf,76cukh5ckdj5kbbkg97c,2018-05-03 15:03:57
[root@big-master apache-flume-1.8.0-bin]#
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2018-05-03 23:59 /flume/events/180503
drwxr-xr-x - hadoop supergroup 0 2018-05-04 08:26 /flume/events/180504
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls -R /flume/events | wc -l
27353
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -du -h /flume/events
5.5 M /flume/events/180503
5.2 M /flume/events/180504
반응형
'bigdata > flume' 카테고리의 다른 글
flume kafka channel 테스트 (0) | 2018.05.11 |
---|---|
window에서 사용할 Flume Sources 테스트 (0) | 2018.04.25 |
flume-ng window batch (0) | 2018.04.24 |
flume-ng windows agent test (0) | 2018.04.23 |
Comments