首页 文章
  • 4 votes
     answers
     views

    Kafka 作为Akka持久性期刊[暂停]

    在实施事件采购的写作期刊的不同选项中,Kafka似乎是“外部”的一个非常合理的选择: 它有一个伟大的生态系统 这是有据可查的 它自然支持流媒体和听众 然而,考虑到Akka持久性,似乎Kafka期刊仅通过社区贡献的包来支持,该包在过去的两年中没有被修改过 . Kafka 不是一个好选择,是否有更好的选择,如果它是最好的选择,人们如何使用它与akka-persistance?
  • 151 votes
     answers
     views

    使用Kafka作为(CQRS)Eventstore . 好主意?

    虽然我之前遇到过Kafka,但我最近才意识到Kafka可能会被用作(CQRS,eventstore)的基础 . Kafka支持的要点之一: 事件捕获/存储,当然都是HA . 发布/订阅架构 能够重放事件日志,允许新订户在事后注册系统 . 诚然,我不是100%精通CQRS /事件采购,但这看起来非常接近eventstore应该是什么 . 有趣的是:我真的找不到关于Kafka被用作事件...
  • 0 votes
     answers
     views

    kafka火花流数据没有写入cassandra . 插入零行

    在从spark向cassandra写入数据时,数据不会被写入 .闪回是:我正在做一个kafka-sparkStreaming-cassandra集成 .我正在阅读kafka消息并尝试将其放入cassandra表 CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT) .kafka对spark-streaming的运行很酷,但对cassan...
  • 0 votes
     answers
     views

    Kafka Stream和Table以不同的时间间隔连接

    我有一个非常典型的场景,我需要在时间t1从一个主题到另一个主题的KTable执行KStream之间的交叉连接 . 为了解释这一点,我想出了一个图表 . 请参考图片 . 我想了解Kafka Streams是否可行 . 我非常清楚交叉连接是可能的 . 但是,我想知道在不同的时间范围内是否可以进行连接 .
  • 1 votes
     answers
     views

    使用UDP将传感器数据发送到apache Kafka

    在我的系统中,有许多传感器生成数据 . 我希望他们直接将生成的数据发送到Kafka,而不会干扰网络编程 . 据我所知,Kafka(Confluent版本)支持HTTP用于此类情况,但我的传感器不使用Http协议发送数据,并且它们有自己的协议 . 现在我的问题是: Kafka收到(了解)哪个网络布局数据抽象?我的意思是它可以接收二进制(物理网络布局数据)或更高数据抽象布局的数据,如Datalin...
  • 0 votes
     answers
     views

    Apache Kafka - 消费者不接收来自制作人的消息

    我很感激你的帮助 . 我正在构建一个Apache Kafka消费者来订阅另一个已经运行的Kafka . 现在,我的问题是,当我的制作人将消息推送到服务器时...我的消费者没有收到它们...我在我的日志打印中得到以下信息:: 13/08/30 18:00:58 INFO producer.SyncProducer: Connected to xx.xx.xx.xx:6667:false for pr...
  • 1 votes
     answers
     views

    Apache Kafka:接收主题,但没有来自Fiware Orion Context Broker的数据

    我在新的 Cloud 实例中安装了kafka,我想从Fiware Orion Context Broker获取数据 . 我已经在我的本地机器上完成了这个程序,并且运行良好 . 现在,当我在我的 Cloud 实例上运行zookeeper和Kafka时,我会自动收到来自Orion的主题,但主题是空的 . 我能够在本地编写一个主题从 生产环境 者到消费者的测试消息,但没有Orion数据被推送到我的主题 ...
  • 0 votes
     answers
     views

    与arduino客户端发布apache kafka连接

    当我尝试将我的Arduino uno与 PubSubClient 库连接起来时,我遇到了一个问题,就像我的Apache Kafka服务器的客户端一样 . 当我尝试 Build 连接时,在服务器上我有这个: [2016-09-07 16:30:59,093]警告来自/192.168.1.104的意外错误;关闭连接(org.apache.kafka.common.network .Selector)...
  • 3 votes
     answers
     views

    消费者没有在Apache Kafka中收到消息

    我正在构建一个Apache Kafka消费者来订阅另一个已经运行的Kafka . 现在,我的问题是当我的制作人将消息推送到服务器时...我的消费者没有收到它们 . Here I give Producer code, Properties properties = new Properties(); properties.put("metadata.broker.li...
  • 0 votes
     answers
     views

    Apache Kafka - 消费者基础

    我必须使用Apache Kafka连接到我公司的经纪人 . 问题是我之前从未使用过这种技术,这是我需要澄清的一点 . 实际上,我创建了一个带有Zookeeper / Server / Consumer的“本地”Kafka,它使用命令〜/ bin / kafka-console-consumer.sh --zookeeper localhost:2181 --topic testGaultier ...
  • 3 votes
     answers
     views

    Kafka Streams窗口加入了保留

    我们正在使用kafka streams的windows join来加入2个流,我们想知道: 为什么KS会在内部主题中添加24小时?例如,我们有一个1小时的窗口,但内部主题保留了25小时 . 我们可以将其配置为不添加24h吗? KS似乎在窗口中保留两个流的数据 - 内部主题和状态存储(rocksdb) - 有没有办法只保留连接左侧的流? [UPDATE] 例如,我们像这样创建JoinWi...
  • 1 votes
     answers
     views

    Kafka KStream - 使用带窗口的AbstractProcessor

    我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储 . 我期待看到 .punctuate() 大约每30秒调用一次 . 我得到的是保存here . (原始文件长达数千行) 总结 - .punctuate() 似乎随机地被调用,然后反复调用 . 它似乎不符合通过ProcessorContext.schedule()设置的值 . 编辑: 另一次运行相同的代码大约每四分钟调用 ....
  • 5 votes
     answers
     views

    阅读并处理来自Kafka的一批消息

    我想从kafka主题中定期读取一批消息,或者当读取的消息数达到一定数量时,将它们作为批处理发送到下游系统 . 目前,我的kafka拓扑结构由处理器终止,处理器保存消息,然后使用punctuate方法逐步处理批处理 . 但是,我不确定这是完美的,因为如果应用程序在调用punctuate方法之前崩溃,我认为一些消息会丢失(即消费者认为它已经完成了它们但它们不会出现在下游系统中) . batchQue...
  • 3 votes
     answers
     views

    kafka流会话窗口保留期限

    我们正在使用Kafka stream的SessionWindows来聚合相关事件的到来 . 与聚合一起,我们使用 until() API指定窗口的保留时间 . Stream info :会话窗口(不活动时间)为1分钟,传递到 until() 的保留时间为2分钟 . 我们使用自定义 TimestampExtractor 来映射事件的时间 . 例:事件:e1; eventTime:上午10:00...
  • 3 votes
     answers
     views

    Kafka Streams JDBC Source长期不兼容

    问题:在设置Kafka管道以使用带有Avro序列化器和反序列化器的Kafka Connect JDBC源之后,一旦我尝试使用Kafka Streams Java应用程序将该数据读入KStream,我就会收到以下错误 . org.apache.kafka.common.errors.SerializationException:LongDeserializer接收的数据大小不是8 我试图尽可能地...
  • 7 votes
     answers
     views

    无法检索Kafka Streams中的状态存储键的元数据

    我正在尝试使用Kafka Streams和分布在两个实例中的状态存储 . 以下是商店和相关KTable的定义方式: KTable<String, Double> userBalancesTable = kStreamBuilder.table( "balances-table", Consumed.with(String(), Dou...
  • 1 votes
     answers
     views

    Kafka Stream:KTable实现

    如何确定某个主题的KTable实现何时完成? 对于例如假设KTable有几百万行 . 下面的伪代码: KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows 在某个时间点,我想安排一个线程调用以下内容,写入主题:kt.to...
  • 0 votes
     answers
     views

    如何在加入Kstreams后选择String格式数据中的空值

    我已经在两个kstream上执行了连接操作,这些kstream由avro格式数据组成,然后我的键是Integer类型,值是string类型 . 输出是这样的: [KSTREAM-MERGE-0000000016]: 1, {"id": 1, "name": "john", "age": 26}/{"id&qu...
  • 1 votes
     answers
     views

    使用Kafka Streams在依赖对象之间进行排序

    我正在从表示依赖实体的RestfulAPI读取数据 . 例如来自/学生我得到学生对象和/老师我得到教师对象 . 学生与教师对象相关联(学生有教师ID) . 问题是我从Kafka /学生到学生话题,从/从教师到教师话题,但是当我尝试与Kafka Streams一起加入他们时,有时学生的事件在他们的老师活动到来之前到来,因此我没有收到学生和老师的联合记录(由于早到的学生) . 使用窗口不是最佳选择,因...
  • 1 votes
     answers
     views

    直接写到 Kafka 国营商店

    我们已经开始尝试使用Kafka来查看它是否可以用于聚合我们的应用程序数据 . 我认为我们的用例与Kafka流匹配,但我们不确定我们是否正确使用该工具 . 我们构建的概念证明似乎按设计工作,我不确定我们是否正在使用API . 我们的概念证明是使用kafka流来保持输出主题中关于程序的信息的运行记录,例如, { "numberActive": 0, "numb...
  • 2 votes
     answers
     views

    我可以在Kafka Broker的同一台机器上运行Kafka Streams应用程序吗?

    我有一个Kafka Streams应用程序,它从几个主题中获取数据并加入数据并将其放入另一个主题中 . Kafka 配置: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. Note: I am running Kafka Streams Applications on the same machines...
  • 0 votes
     answers
     views

    kafka流不提取所有记录

    我有一个java Kafka流应用程序,它从一个主题中读取一些过滤和转换,并将数据写回Kafka到另一个主题 . 我在每一步都打印流对象 . 我注意到如果我向输入主题发送了超过几十条记录,我的Kafka流应用程序会消耗一些记录 not . 当使用kafka-console-consumer.sh从输入主题中消费时,我确实收到了所有记录 . 我正在使用一个代理和一个分区主题运行Kafka 1.0....
  • 1 votes
     answers
     views

    我可以在Apache Kafka中将多个转换器/处理器连接到单个流

    在所有示例中,我都看到了Kafka的简单单变换器/处理器拓扑 . 我怀疑的是,我们是否可以通过分解应用于单个输入流的多个变换器/处理器来模块化应用程序逻辑 . 请在下面找到用例: 当前的应用程序配置是一个单一的处理器,包含所有处理逻辑任务,如过滤,验证,应用程序逻辑,延迟(Kafka对于dbs来说太快)和调用SP / push到下游 . 但我们现在计划通过将每个任务分解为Kstream的独立处理器...
  • 0 votes
     answers
     views

    Kafka Streams中的自我加入可能吗?

    我们将Kafka Streams视为解决飞行比较的一种方法 . 具体来说,我们有数据以每秒约15,000个事务的顺序到达Kafka主题,我们希望在记录滚动时对记录进行比较操作 . 记录非常宽(1900列或其左右),但比较操作发生在很少的列(~10-20) . 我们的比较窗口大概是一分钟 . 场景将是这样的: 消息1在时间00s到达foo,bar,foobar,barfoo,12,34的值 ...
  • 1 votes
     answers
     views

    KStreams确定哪些输入记录时间戳元数据在连接上保持不变

    希望有人知道这一点,或者可以指出我正确的方向...... 我有一个通过API REST请求创建的数据主题 . REST请求中收到的一个字段是记录EventTime的时间戳 . 这些记录生成到Kafka,EventTime设置为Record的元数据时间戳 . 我有另一个规则主题,通过向接收的值添加新字段来提供增强数据主题记录的信息 . 这两个主题都具有匹配的加入键 . 我的目标是使用处理器API在...
  • 1 votes
     answers
     views

    Kafka:添加批量旧数据

    使用Kafka进行基于时间的事件,使用窗口对Kafka Streams中的事件(会话)进行分组 . 我们应该如何处理来自不同来源的一组数据的到来,这些数据由旧数据组成? 比如说,您正在为客户进行网站分析 . 您从事件主题中的客户端接收事件数据,您可以在其中接收所有事件类型 . 由于某种原因,您没有从客户端收到订单(购买)数据,您只收到了构建会话的综合浏览量数据 . 一段时间之后,您会收到一批基于时...
  • 5 votes
     answers
     views

    Kafka - 此服务器不是该主题分区的领导者

    我有两个经纪人kafka 0.10.2.0 cluster.Replication factor是2.我正在对这个Kafka运行1.0.0 kafka流应用程序 . 在我的kafka流应用程序中,生成器配置有 retries = 10 and retry.backoff.ms = 100 运行几分钟后,我在Kakfa server.log中观察了以下日志 . 由于此Kafka流应用程序正在抛出'N...
  • 1 votes
     answers
     views

    Spark可以在同一节点上与Kafka一起运行,通过使Spark Streaming ETL进程更接近实时数据来进行优化吗?

    我已经得到了建议,并且在一些地方读到,在数据节点上运行Spark大大提高了批处理的性能 . 我还得到了保持Kafka服务在专用节点上隔离的建议 . 如果Kafka数据的大多数消费者都是Spark Streaming ETL进程,这些进程将转换后的数据版本放回到Kafka或其他一些存储机制上,那么它是否会在相同的节点上运行这些进程,即运行Spark服务与Kafka专用集群上的Kafka服务一起? 谢...
  • 2 votes
     answers
     views

    Bluemix Kafka流

    最新版本的IBM Message Bus是否支持Kafka Streams(此处描述:http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/) . 有关如何执行此操作的任何示例/示例?如果是,可以用node.js完成吗? 我想做的是 从源数据库(Cloudant)到Kafka流的...
  • -1 votes
     answers
     views

    Flafka(Http - > Flume-> Kafka - > Spark Streaming)

    我有一个用于实时流的用例,我们将使用Kafka(0.9)用于消息缓冲和火花流(1.6)用于流处理(HDP 2.4) . 我们将在Http上收到~80-90K /秒的活动 . 您能否建议一个推荐的架构,以便将数据提取到Kafka主题中,这些主题将由Spark流消耗 . 我们正在考虑flafka架构 . Flume是否正在收听Http并向Kafka(Flafka)发送实时流媒体的好选择? 请分享其他可...

热门问题