-
15 votesanswersviews
如何获取kafka主题的分区的最新偏移量?
我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量 . 但是我无法让它发挥作用 . from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartiti... -
1 votesanswersviews
使用python库检索kafka中的使用者组偏移量
我有python脚本,我需要使用kafka1代理群集检索从kafka主题读取的一组使用者的当前使用者组偏移量 . 这些是本地kafka使用者,它们将偏移量存储在kafka集群中,而不是存储在zookeeper中 . 脚本本身不需要消费任何消息,只需读取其他消费者的当前偏移量 . 我意识到可以用 kafka-consumer-groups.sh 做到这一点,但理想情况下我想避免依赖shell命令 .... -
0 votesanswersviews
连接从我的本地机器在EC2机器上运行的Kafka
我是Kafka的新手并在论坛中搜索了不同的帖子,但找不到解决方案 . 我已经在EC2实例上安装了kafka,并尝试从我的ubuntu本地机器上连接它 . 我的目标是让python kafka客户端(包括Producer和Consumer)在我的本地机器上运行,并通过EC2 kafka实例发送/接收数据 . 那可能吗? Properties set in server.properties conf... -
0 votesanswersviews
Kafka 生产环境 商能否以复制品的配额率提供数据?
我有一个属于客户端的kafka 生产环境 者,clientid - “p1”,配额为50 MBps . 现在我使用“bin / kafka-producer-perf-test.sh”测试了我的制作人的性能,并且在写入没有副本的分区时,我能够获得接近50 MBps的吞吐量 . 我在一个有三个副本的分区上尝试了相同的实验 . 但这次吞吐量降低到30 MBps . 我的问题是,即使存在副本,kafk... -
0 votesanswersviews
分区未分配给Kafka使用者实例
当我使用单个实例启动使用者时,它会显示在使用者组中,但它不会消耗主题中的数据 . 之后,如果我启动另一个消费者并且我的第一个消费者开始消费数据,但最新的消费者实例没有分配任何分配 . 下面是第一个消费者实例启动时的信息日志 . INFO:kafka.client:从[(u'kafka-broker1.ap-south-1.staging.internal',9092,0)引导群集元数据] INF... -
3 votesanswersviews
NoBrokersAvailable:NoBrokersAvailable-Kafka错误
我已经开始学习 Kafka 了 . 尝试基本操作 . 关于“经纪人”,我一直坚持这一点 . 我的kafka正在运行,但是当我想创建一个分区时 . from kafka import TopicPartition (ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234') consumer.assign([T... -
1 votesanswersviews
kafka-python消费者给出错误
我是kafka和kafka-python的新手 . 安装kafka-python之后,我尝试从这里开始实现消费者代码 - http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写使用者代码,并尝试从那里运行python代码 . 但是我收到以下错误: 回溯(最近一次调用最后一次):文件“KafkaCons... -
1 votesanswersviews
异常AttributeError:“'KafkaProducer'对象没有属性'_closed'”
我是使用python生成消息,但我得到这个例外,在我的脚本下面是.. `来自kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers ='192.168.1.6:9092')producer.send('test',b'Welcome Nagarajan') . get(timeout = 60) 收到错误消息...... -
0 votesanswersviews
尝试通过Cloudera Data Science Workbench连接Kafka时没有经纪人可用
我正在尝试通过Cloudera Data Science Workbench在我们的内部Hadoop集群上实现GitHub项目(https://github.com/tomatoTomahto/CDH-Sensor-Analytics) . 在Cloudera Data Science Workbench上运行项目时,我在尝试通过Python api KafkaProducer(bootstrap... -
0 votesanswersviews
如何使用ssl配置创建Kafka-python生成器
我正在尝试用ssl创建kafka 生产环境 者 . 我需要有关如何在构造函数中设置SSL参数的信息,kafka-python客户端中提供的信息不够具有描述性 . 什么是 ssl_certfile , ssl_cafile , ssl_keyfile 参数 . 我不确定在哪里查找这些文件 . producer = KafkaProducer(bootstrap_servers=kafka_broke... -
0 votesanswersviews
Kafka python消费者列为不活跃
我有一个作为消费者群体运行的python kafka消费者 . 我可以看到它不断消耗事件 . 但是当我跑步的时候 bin/kafka-consumer-groups --bootstrap-server localhost:9092 --list 它未在输出中列出 . 虽然我跑的时候 bin/kafka-consumer-groups --bootstrap-server localhost:9... -
1 votesanswersviews
在Kafka-python中重置消费者组内的kafka LAG(更改偏移量)
我发现这是我使用kafka-consumer-groups.sh工具重置LAG的地方How to change start offset for topic?但是我需要在应用程序中重置它 . 我找到了这个例子,但它似乎没有重置它 . kafka-python read from last produced message after a consumer restart例子 consumer =... -
1 votesanswersviews
创建主题,但在Kubernetes上使用Python获取Kafka FailedPayloadsError
我在python kafka-library中使用SimpleProducer . 这个脚本以前完美无缺地使用了我尝试过的其他更加硬配置的kafka-setup . kafka = KafkaClient(u'[masterNodeIp]:[servicePort]') producer = SimpleProducer(kafka) #make a simple message, while t... -
9 votesanswersviews
kafka-python: 生产环境 者无法连接
kafka-python(1.0.0)在连接到代理时抛出错误 . 同时/ usr / bin / kafka-console-producer和/ usr / bin / kafka-console-consumer工作正常 . Python应用程序过去也运行良好,但在zookeeper重新启动后,它不再能够连接 . 我正在使用文档中的裸骨示例: from kafka import KafkaPr...