flume整合kafka

时间:2022-09-24 02:54:03
# Please paste flume.conf here. Example:
# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1 # For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type = syslogtcp
tier1.sources.source1.bind = 127.0.0.1
tier1.sources.source1.port = 9999
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = memory tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = ggz
tier1.sinks.sink1.brokerList = ha1:9092
tier1.sinks.sink1.requiredAcks = 1
tier1.sinks.sink1.batchSize = 20 # Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel. tier1.channels.channel1.capacity = 100

测试:

生产者:nc ha1 9999

消费者:kafka-console-consumer –zookeeper ha0 –topic ggz –from-beginning