DBILITY

flume hadoop 연동 테스트 소스 데이터 생성 본문

bigdata/flume

flume hadoop 연동 테스트 소스 데이터 생성

DBILITY 2018. 5. 3. 19:30
반응형

hadoop cluster에 avro sink를 통해 저장하는 걸 가상으로 테스트해보고 싶은데, 그렇다고 능력은 안되고,

flume-ng-core 소스중 org.apache.flume.source.SequenceGeneratorSource.java를 살짝 바꿨다.

PollableSourceRunner를 보니 getMaxBackOffSleepInterval만큼 Tread를 sleep상태로 둔다.기본 5000ms

이것 저거 바꿔볼 몸 상태가 아니다.목,어깨,손이 아프다.

build후엔 flume directory에 lib 또는 plugin.d/plugin명/lib 넣으면 된다.안되면 말고~

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.dbility.bigdata.flume.source</groupId>
	<artifactId>flume-source-generator</artifactId>
	<version>0.0.1</version>
	<properties>
		<java.version>1.8</java.version>
		<flume.version>1.8.0</flume.version>
		<slf4j.version>1.6.1</slf4j.version>
		<encoding-char>utf-8</encoding-char>
	</properties>
	<repositories>
		<repository>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
			<id>dbility</id>
			<name>dbility Repository</name>
			<url>http://112.216.48.138/nexus/content/groups/public/</url>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<releases>
				<updatePolicy>never</updatePolicy>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
			<id>dbility</id>
			<name>dbility Repository</name>
			<url>http://112.216.48.138/nexus/content/groups/public/</url>
		</pluginRepository>
	</pluginRepositories>

	<dependencies>
		<dependency>
			<groupId>org.apache.flume</groupId>
			<artifactId>flume-ng-core</artifactId>
			<version>${flume.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
	</dependencies>

	<build>

		<finalName>${project.artifactId}-${project.version}</finalName>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>${java.version}</source>
					<target>${java.version}</target>
					<encoding>${encoding-char}</encoding>
				</configuration>
			</plugin>
		</plugins>

	</build>
</project>

RandomDataGenerator.java

package com.dbility.bigdata.flume.source;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;

import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
*
* Description
*
*
* @author hyperrookie@gmail.com
*
* @version 0.0.1
* @date 2018. 5. 2.
*=======================================================================
* Date            Name                     Revision History
*=======================================================================
* 2018. 5. 2.     hyperrookie@gmail.com    Creation
*=======================================================================
*/
public class RandomDataGenerator extends AbstractPollableSource implements Configurable {

	private static final Logger logger = LoggerFactory.getLogger(RandomDataGenerator.class);
	private int batchSize;
	private SourceCounter sourceCounter;
	private long totalEvents;
	private long eventsSent = 0;
	private String[] column;
	private String[] columnLength;
	private String virtualDataStr;

	@Override
	protected void doConfigure(Context context) throws FlumeException {
		batchSize = context.getInteger("batchSize", 1);
	    totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);

		column = context.getString("column", "randomKey").split(",");
		columnLength = context.getString("columnLength", "").split(",");

	    Preconditions.checkArgument(batchSize > 0, "batchSize was %s but expected positive", batchSize);
	    if (sourceCounter == null) {
	      sourceCounter = new SourceCounter(getName());
	    }
	}

	@Override
	  protected Status doProcess() throws EventDeliveryException {
		Status status = Status.READY;
	    long eventsSentTX = eventsSent;
	    virtualDataStr = "";

	    try {
	      if (batchSize == 1) {
	        if (eventsSentTX < totalEvents) {

	        	virtualDataStr+=","+getName()+"-"+InetAddress.getLocalHost().getHostAddress();

	        	for (int j = 0 ; j < column.length; j++) {
					if(column[j].toString().equals("randomKey")) {
							virtualDataStr+=","+RandomUtil.randomKey();
					} else {
						virtualDataStr+=","+RandomUtil.randomKey(Integer.parseInt(columnLength[j]));
					}
        	    }

	        	virtualDataStr+=","+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.getDefault()));

	          getChannelProcessor().processEvent(
	        		  EventBuilder.withBody(String.valueOf((eventsSentTX++)+virtualDataStr).getBytes()));
	          sourceCounter.incrementEventAcceptedCount();
	          logger.info("processEvent batchSize : {}", batchSize);
	        } else {
	          //status = Status.BACKOFF;
	        }
	      } else {
	        List<Event> batchArrayList = new ArrayList<>(batchSize);
	        for (int i = 0; i < batchSize; i++) {

	          virtualDataStr = "";

	          if (eventsSentTX < totalEvents) {
	        	  virtualDataStr+=","+getName()+"-"+InetAddress.getLocalHost().getHostAddress();
	        	  for (int j = 0 ; j < column.length; j++) {
						if(column[j].toString().equals("randomKey")) {
								virtualDataStr+=","+RandomUtil.randomKey();
						} else {
							virtualDataStr+=","+RandomUtil.randomKey(Integer.parseInt(columnLength[j]));
						}
	        	  }
	        	  virtualDataStr+=","+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.getDefault()));
	        	  batchArrayList.add(i, EventBuilder.withBody(String
	                    .valueOf((eventsSentTX++)+virtualDataStr).getBytes()));
	          } else {
	            //status = Status.BACKOFF;
	            break;
	          }
	        }
	        if (!batchArrayList.isEmpty()) {
	          getChannelProcessor().processEventBatch(batchArrayList);
	          sourceCounter.incrementAppendBatchAcceptedCount();
	          sourceCounter.addToEventAcceptedCount(batchArrayList.size());
	          logger.info("processEventBatch batchArrayList.size() : {}", batchArrayList.size());
	          logger.info("processEventBatch batchArrayList : {}", batchArrayList);

	        }
	      }
	      eventsSent = eventsSentTX;

	    } catch (ChannelException ex) {
	      logger.error( getName() + " source could not write to channel.", ex);
	    } catch (UnknownHostException ex) {
	      logger.error( getName() + " source could not read host address.", ex);
		}
	    status = Status.BACKOFF;
	    return status;
	  }

	  @Override
	  protected void doStart() throws FlumeException {

	    logger.info("RandomDataGenerator source do starting");
	    sourceCounter.start();
	    logger.debug("RandomDataGenerator source do started");

	  }

	  @Override
	  protected void doStop() throws FlumeException {

	    logger.info("RandomDataGenerator source do stopping");
	    sourceCounter.stop();
	    logger.info("RandomDataGenerator source do stopped. Metrics:{}",getName(), sourceCounter);

	  }
}

 

collector - rndGen.properties

#Source
agent.sources = randomGen
agent.sources.randomGen.type = com.dbility.bigdata.flume.source.RandomDataGenerator
agent.sources.randomGen.batchSize = 5
agent.sources.randomGen.column = randomKey,machcd,profile,itdsc
agent.sources.randomGen.columnLength = 20,3,14,20
agent.sources.randomGen.channels = memoryChannel

#Channel
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100000
agent.channels.memoryChannel.transactionCapacity = 10000
agent.channels.memoryChannel.keep-alive = 60

#Sink
agent.sinks = avroSink
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname=big-master
agent.sinks.avroSink.port=4141
agent.sinks.avroSink.batch-size=5

 

server - flume.properties

agent.sources = avroSrc
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memoryChannel

agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000000

agent.sinks = hdfsSink
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.path = hdfs://hadoop-cluster/flume/events/%y%m%d/%H%M/%S
agent.sinks.hdfsSink.hdfs.writFormat = text
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.filePrefix = rndGen_%[FQDN]_%y%m%d%H%M%S
agent.sinks.hdfsSink.hdfs.fileSuffix = .log

#Sink
#agent.sinks = loggerSink
#agent.sinks.loggerSink.channel = memoryChannel
#agent.sinks.loggerSink.type = logger
#agent.sinks.loggerSink.maxBytesToLog = 10000

 

 

[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events/180503
Found 2 items
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1504
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events/180503/1503/
Found 15 items
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503/03
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503/06
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503/11
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503/16
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503/21
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:03 /flume/events/180503/1503/26
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/31
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/36
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/37
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/41
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/42
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/46
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/47
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/52
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 15:04 /flume/events/180503/1503/57
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events/180503/1503/57
Found 1 items
-rw-r--r--   3 hadoop supergroup        435 2018-05-03 15:04 /flume/events/180503/1503/57/rndGen_big-master_180503150357.1525327437028.log
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -cat /flume/events/180503/1503/57/rndGen_big-master_180503150357.1525327437028.log
70,randomGen-192.168.100.18,60qq-72dml14c-2asz-37uo,i57,3qmjhjt0ydrubz,cum5z057zlshbta760ki,2018-05-03 15:03:57
71,randomGen-192.168.100.18,0wem-ob37iybb-g5oq-22nc,lyl,d68u3fhh9dxm84,6tpmfhzvppnvb2011y2z,2018-05-03 15:03:57
72,randomGen-192.168.100.18,ll9l-hrmihmt2-q971-xu81,ax6,lh38qurtytmxzl,yvqu99au0hhxs5ltsdfp,2018-05-03 15:03:57
73,randomGen-192.168.100.18,pde2-f6vcar2o-uaob-z547,021,xiaga6lvv5sihg,5vj0wqjm07j94fcij8id,2018-05-03 15:03:57
74,randomGen-192.168.100.18,rioy-i8caolah-20ks-7bpw,k4w,6ezym4lt4tejqf,76cukh5ckdj5kbbkg97c,2018-05-03 15:03:57
[root@big-master apache-flume-1.8.0-bin]#
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls /flume/events
Found 2 items
drwxr-xr-x   - hadoop supergroup          0 2018-05-03 23:59 /flume/events/180503
drwxr-xr-x   - hadoop supergroup          0 2018-05-04 08:26 /flume/events/180504
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -ls -R /flume/events | wc -l
27353
[root@big-master apache-flume-1.8.0-bin]# hdfs dfs -du -h /flume/events
5.5 M  /flume/events/180503
5.2 M  /flume/events/180504

 

반응형

'bigdata > flume' 카테고리의 다른 글

flume kafka channel 테스트  (0) 2018.05.11
window에서 사용할 Flume Sources 테스트  (0) 2018.04.25
flume-ng window batch  (0) 2018.04.24
flume-ng windows agent test  (0) 2018.04.23
Comments