DBILITY

독거 가능성 100% 노후에 라면값이라도 하게 센스를 발휘합시다!😅
Please click on the ad so that I can pay for ramen in my old age!
点击一下广告,让老后吃个泡面钱吧!
老後にラーメン代だけでもするように広告を一回クリックしてください。

window에서 사용할 Flume Sources 테스트 본문

bigdata/flume

window에서 사용할 Flume Sources 테스트

DBILITY 2018. 4. 25. 20:54
반응형

최소 XP부터 지원이 되어야 할것 같다.

일단 kafka가 없다는 가정하에 시작해 보자.

실은 아직 kafka를 테스트 하지 못했다.

Syslog는 Syslog damon에서 보내줘야하니 패스하자.

몇시간 apache http server access log를 source로 logger sink를 돌려보니 properties변경도 체크를 한다.

logger sink는 max 16byte만 출력을 하니, custom sink가 필요할 듯 (매뉴얼참고)

 

standalone port monitoring tool이나 logger view도 custom source,channel,sink를 만들면 쓸만하것다.

목수에게 새망치가 생기면, 튀어나온 모든 것이 못으로 보인다더니만ㅎㅎ

이거 이전에 만든 batch프로그램은 서비스로 실행하고, Tray 적용하면 딱인데...이건 그냥 cmd /c tail......만 해도 되겠네..

 

  • Exec Source
    Power Shell
    Get-Content <filename> -wait
    윈도우용 tail 사용 다운로드

  • Spooling Directory Source
    spoolDir로 지정된 경로에 파일이 생성되면 pollDelay간격으로 체크한다.
    완료된 파일은 .COMPLETED저장되거나 deletePolicy가 immediate일 경우 즉시삭제

  • Netcat TCP/UDP
    agent가 설치된 client의 특정 app에서 socket접속하여 protocol에 맞춰 데이터전송 (TCP는 newline으로 이벤트발생)
    필요하다면 UDP는 local서버 수집용으로나 쓰면 되겠다.근데,local서버용은 Syslogd가 더 낫지 않을까?--
    테스트용 전송툴 사용 다운로드

loggerSink테스트용 CustomLoggerSink.java

org.apache.flume.event.EventHelper.java를 잘 보면 된다.

개발자 매뉴얼의 sink부분을 참고했다.

package com.dbility.bigdata.flume.sink;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventHelper;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
*
* Description
*
*
* @author hyperrookie@gmail.com
*
* @version 1.0.0
* @date 2018. 4. 26.
*=======================================================================
* Date            Name                     Revision History
*=======================================================================
* 2018. 4. 26.    hyperrookie@gmail.com    Creation
*=======================================================================
*/
public class CustomLoggerSink extends AbstractSink implements Configurable {

	private static final Logger logger = LoggerFactory.getLogger(CustomLoggerSink.class);

	private static final int defaultMaxBytes = 16;
	private int maxBytes;

	public Status process() throws EventDeliveryException {

		Status status = null;

	    Channel ch = getChannel();
	    Transaction txn = ch.getTransaction();

	    txn.begin();

	    try {

	      Event event = ch.take();

	      if ( event != null) {
	    	  if(logger.isInfoEnabled()) {
	    		  logger.info("Event : {}", EventHelper.dumpEvent(event, this.maxBytes == 0 ? event.getBody().length : this.maxBytes));
	    	  }
	      }
	      txn.commit();
	      status = Status.READY;

	    } catch (Throwable t) {

	    	txn.rollback();

	      status = Status.BACKOFF;

	      if (t instanceof Error) {
	        throw (Error)t;
	      }

	    } finally {
	    	txn.close();
	    }

	    return status;
	}

	public void configure(Context context) {

		int maxBytesProperty = context.getInteger("maxBytesToLog", defaultMaxBytes);
		logger.info("maxBytesProperty : {}",maxBytesProperty);
		if (maxBytesProperty < 0)
			maxBytesProperty = defaultMaxBytes;

		this.maxBytes = maxBytesProperty;
	}
}

 

내부망에 존재하는 hdfs에 어떻게 flume agent에서 data를 보내지?

client agent(exec,channel,avro) -> server agent(avro,channel,hdfs)로 구성하면 되겠네?

일당 포트포워딩이 필요하겠군. big-master쪽에 4141번으로 하고.avro source port 4141...서버쪽도 맞춰주고...기본 replicating flow구성

Client측 agent

#Source
agent.sources = execSource
agent.sources.execSource.type = exec
agent.sources.execSource.command = tail -f D://tools//Apache24//logs//flume-access.log
agent.sources.execSource.batchSize=10
agent.sources.execSource.batchTimeout=2000
agent.sources.execSource.channels = memoryChannel memoryChannel2

#agent.sources = spoolDirSource
#agent.sources.spoolDirSource.type = spooldir
#agent.sources.spoolDirSource.spoolDir = E:/apache-flume-1.8.0-bin/spool
#agent.sources.spoolDirSource.fileHeader=false
#agent.sources.spoolDirSource.deletePolicy=immediate
#agent.sources.spoolDirSource.channels = memoryChannel

#Channel
agent.channels = memoryChannel memoryChannel2

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 10000
agent.channels.memoryChannel.byteCapacityBufferPercentage = 20
agent.channels.memoryChannel.byteCapacity = 800000
 
agent.channels.memoryChannel2.type = memory
agent.channels.memoryChannel2.capacity = 10000
agent.channels.memoryChannel2.transactionCapacity = 10000
agent.channels.memoryChannel2.byteCapacityBufferPercentage = 20
agent.channels.memoryChannel2.byteCapacity = 800000

#Sink
agent.sinks = avroSink loggerSink
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname=big-master #ipaddress
agent.sinks.avroSink.port=4141

agent.sinks.loggerSink.channel = memoryChannel2
agent.sinks.loggerSink.type = com.dbility.bigdata.flume.sink.CustomLoggerSink
agent.sinks.loggerSink.maxBytesToLog = 10000

Server측 agent

[hadoop@big-master apache-flume-1.8.0-bin]$ vi conf/flume-conf.properties
#Source
agent.sources = avroSrc
agent.sources.avroSrc.type = avro
agent.sources.avroSrc.bind = 0.0.0.0
agent.sources.avroSrc.port = 4141
agent.sources.avroSrc.channels = memoryChannel

#Channel
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000000

#Sink
agent.sinks = hdfsSink
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.path = hdfs://hadoop-cluster/flume/events/%y%m%d/%H%M/%S
agent.sinks.hdfsSink.hdfs.writFormat = text
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.filePrefix = windowTail
agent.sinks.hdfsSink.hdfs.fileSuffix = .log

 

하둡저장결과

 

 

반응형

'bigdata > flume' 카테고리의 다른 글

flume kafka channel 테스트  (0) 2018.05.11
flume hadoop 연동 테스트 소스 데이터 생성  (0) 2018.05.03
flume-ng window batch  (0) 2018.04.24
flume-ng windows agent test  (0) 2018.04.23
Comments