安装
python中使用kafka,首先要安装kafka-python
这个库:1
pip install kafka-python
使用
kafka的使用需要2部分配合,KafkaConsumer
和KafkaProducer
。
KafkaProducer负责将数据写到kafka中,类似一个数据的生产者
KafkaConsumer负责读kafka中的数据,类似一个数据的消费者
KafkaConsumer
示例1:获取topic下的所有分区的数据1
2
3
4
5
6
7
8
9
10
11
12from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
KafkaConsumer可以设置很多不同的配置,除了上述的group_id
、bootstrap_servers
外,还有enable_auto_commit
、sasl_plain_username
、sasl_plain_password
、max_poll_records
、max_poll_interval_ms
等等,具体的可以进到源码中查看(默认是允许自动提交)。
上面这种实现是最简单的一种实现,类似一个循环,不停的去读consumer获取到的数据,并且不指定partition,当存在多个partitin,而当前group下只有一个消费者时,会随机获取各个partition的数据
同组的不同消费者会消费不同的分区。当消费者数量大于分区的数量,会有消费者收不到消息;当消费者数量小于分区的数量,会有消费者消费多个分区
即当当前group下存在2个消费者,这两个消费者不会消费相同topic、partiton的数据,可以起3个线程运行以下代码进行验证:1
2
3
4def job():
consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
for msg in consumer:
print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
示例2:获取指定分区的数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16topic = 'my_topic'
consumer = KafkaConsumer(group_id='my-group', bootstrap_servers=['localhost:9092'])
# 获取topic的分区
partitions = consumer.partitions_for_topic(topic)
print(partitions)
# consumer 指定主题和分区
# 方法1: assign
consumer.assign([TopicPartition(topic, partition=0), TopicPartition(topic, partition=1)])
# 方法2:subscribe
# consumer.subscribe(topic)
for msg in consumer:
print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
auto.offset.reset
值含义解释:
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
示例3:主动拉去消息,并手动提交offset
上述2个示例中,其实都使用了默认的enable_auto_commit=True、auto_commit_interval_ms=5000配置,即:
隔5s,Consumer 将会提交Offset。但是在实际使用场景中,这个时间间隔并不好控制,它取决于我们消费数据的速度。所以一般实现的方式是:处理完消息后,手动提交offset1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20consumer = KafkaConsumer(group_id='my-group', bootstrap_servers=['localhost:9092'], enable_auto_commit=False, max_poll_records=500, max_poll_interval_ms=60000)
tp = TopicPartition(topic, partition)
consumer.assign([tp])
# 获取上次提交的offset
committed_offset = consumer.committed(tp)
consumer.seek(tp, committed_offset)
# 定位到offset
consumer.seek(tp, committed_offset)
while True:
msg = consumer.poll(timeout_ms=60000)
# 获取最新的offset
end_offset = consumer.end_offsets([tp])[tp]
if len(msg) > 0:
for data in msg.values():
for consumer_record in data:
# todo: do your work
# 提交offset。Defaults to currently consumed offsets for all subscribed partitions
consumer.commit()
<未完待续……>