WUT_Computer_Science/Programming/PSD/kafka/kafka_consumer.py

22 lines
576 B
Python
Raw Permalink Normal View History

2024-06-16 21:47:34 +02:00
from confluent_kafka import Consumer, KafkaError
conf = {'bootstrap.servers': "localhost:9092",
'group.id': "test_group",
'auto.offset.reset': 'earliest'}
consumer = Consumer(**conf)
consumer.subscribe(['transactions'])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close()