dgadiraju
4/5/2017 - 3:54 PM

flume-to-kafka-and-hdfs.conf

# fmp.conf: a multiplex agent to save one copy of data in HDFS and 
# other copy streamed to Kafka so that data can be processed by 
# streaming technologies such as Spark Streaming

# Name the components on this agent
fmp.sources = logsource
fmp.sinks = kafkasink hdfssink
fmp.channels = kafkachannel hdfschannel

# Describe/configure the source
fmp.sources.logsource.type = exec
fmp.sources.logsource.command = tail -F /opt/gen_logs/logs/access.log

# Describe the kafka sink
fmp.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
fmp.sinks.kafkasink.brokerList = nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667
fmp.sinks.kafkasink.topic = kafkadg

# Describe the HDFS sink
fmp.sinks.hdfssink.type = hdfs
fmp.sinks.hdfssink.hdfs.path = hdfs://nn01.itversity.com:8020/user/dgadiraju/wllabssa/flume_example_%Y-%m-%d
fmp.sinks.hdfssink.hdfs.fileType = DataStream
fmp.sinks.hdfssink.hdfs.rollInterval = 120
fmp.sinks.hdfssink.hdfs.rollSize = 10485760
fmp.sinks.hdfssink.hdfs.rollCount = 100
fmp.sinks.hdfssink.hdfs.filePrefix = retail
fmp.sinks.hdfssink.hdfs.fileSuffix = .txt
fmp.sinks.hdfssink.hdfs.inUseSuffix = .tmp
fmp.sinks.hdfssink.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory for kafkasink
fmp.channels.kafkachannel.type = memory
fmp.channels.kafkachannel.capacity = 1000
fmp.channels.kafkachannel.transactionCapacity = 100

# Use a channel which buffers events in file for hdfssink
fmp.channels.hdfschannel.type = file
fmp.channels.hdfschannel.capacity = 1000
fmp.channels.hdfschannel.transactionCapacity = 100

# Bind the source and sink to the channel
fmp.sources.logsource.channels = hdfschannel kafkachannel
fmp.sinks.kafkasink.channel = kafkachannel
fmp.sinks.hdfssink.channel = hdfschannel