python操作kafka(confluent_kafka 生产)

时间:2023-03-09 08:09:01
python操作kafka(confluent_kafka 生产)
 #!/usr/bin/python
# -*- coding:utf-8 -*- from confluent_kafka import Producer
import json
import time
import sys def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) def c_kafka(topic, payloads, sleep_time=0, **conf):
gu = []
p = Producer(**conf)
for i in payloads:
gu.insert(0, "ping%s" % int(round(int(time.time() * 1000))))
p.poll(0)
p.produce(topic, json.dumps(i).encode('utf-8'), callback=delivery_report)
if sleep_time > 0:
p.flush()
time.sleep(int(sleep_time))
if sleep_time == 0:
p.flush
return gu if __name__ == "__main__":
args = sys.argv
ip = args[1]
rate_value = args[2]
ranges = args[3]
sleep_time = args[4]
payloads = []
conf = {
'bootstrap.servers': 'authtest.jdq.jd.local:9888',
'client.id': '23a49894'
}
topic = 'WDMEtlTest'
for i in range(int(ranges)):
payloads.append({
"timestamp": int(time.time()) + ((i + 1) * 60),
"timestampISO": "2018-09-12T06:22:40.000Z",
"topic": "devEtl03",
"ip": args[1],
"if_name": "Ethernet6/4",
"service_status":"在线",
"data_level": "debug",
"globalunique": '%s%s' % ('dci', int(time.time())),
}
)
print c_kafka(topic, payloads, sleep_time, **conf)