Kafka-Python 使用教程
Kafka 简介
- Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发。它是一个高吞吐量,低延迟的平台,用于处理实时数据。Kafka 的主要特点是其能够处理大量数据并保证数据的可靠性。它使用分布式系统来处理数据,这意味着它可以在多个服务器上运行,从而提高了可靠性和可扩展性。
- 在 Kafka 中,数据被组织成 topics,每个 topic 可以有多个 partitions。每个 partition 都是一个有序的、不可变的消息序列。Kafka 使用 zookeeper 来管理集群中的 broker,以及 topic 和 partition 的元数据。生产者将消息发送到 topic 中,消费者从 topic 中读取消息。Kafka 还支持流处理,可以将多个 topic 的数据进行处理并输出到新的 topic 中。
简介
- Apache Kafka 分布式流处理系统的 Python 客户端。kafka-python 的设计功能与官方 java 客户端非常相似,带有一些 pythonic 接口(例如,消费者迭代器)。
- kafka-python 最适用于较新的代理(0.9+),但向后兼容旧版本(至 0.8.0)。某些功能只会在较新的经纪人上启用。例如,完全协调的消费者组 —— 即动态分区分配给同一组中的多个消费者 —— 需要使用 0.9 个 kafka broker。支持早期代理版本的此功能需要编写和维护自定义领导选举和成员 / 健康检查代码(可能使用 zookeeper 或 consul)。对于较旧的代理,您可以通过使用配置管理工具(如 chef、ansible 等)手动为每个消费者实例分配不同的分区来实现类似的效果。这种方法可以很好地工作,但它不支持失败时的重新平衡。有关详细信息,请参阅兼容性。
安装
1 | pip install kafka-python |
KafkaProducer
KafkaProducer
是一个高级的异步消息生产者。该类旨在尽可能与官方 Java 客户端类似地运行。有关详细信息,请参阅 KafkaProducer 。1
2
3
4
5from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('my_topic', b'my_message')这将创建一个生产者实例,它将连接到本地 Kafka 代理。我们可以使用
send()
方法将消息发送到 Kafka 主题
KafkaConsumer
KafkaConsumer
是一个高级消息消费者,旨在尽可能类似于官方 java 客户端的操作。完全支持协调的消费者组需要使用支持组 API 的 kafka 代理:kafka v0.9+。- 有关 API 和配置详细信息,请参阅 KafkaConsumer 。
- 消费者迭代器返回 ConsumerRecords,这是简单的命名元组,公开基本的消息属性:topic、partition、offset、key、and value
1
2
3
4
5
6from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message.value)这将创建一个消费者实例,它将连接到本地 Kafka 代理,并开始消费名为 “my_topic” 的主题的消息。我们可以使用 for 循环迭代消费者对象,以便在接收到新消息时处理它们。在这个例子中,我们只是简单地打印出消息。