apache-kafka有关的问题


Apache Kafka-bootstrap.servers中url的DNS解析失败 (无法构建kafka producer)

我有一个问题,用主机名构造kafka producer。
这就是我所做的。
1) 我将host.name,端口配置设置为server.properties
侦听器 = 明文: // 192.168.0.102:9092
端口 = 9092
主机.名称 =


Kafka的日志目录应该只包含Kafka主题数据

我正在尝试在kubernetes环境中运行融合的kafka图像 & 面对
致命 [KafkaServer id = 0] 在KafkaServer启动期间出现致命错误。准备关闭 (kafka.server.KafkaServer)
卡夫卡。


如何在kafka创建一个新的消费群体

我在本地运行kafka遵循快速入门指南的说明在这里,
然后我在config/consumer.properties中定义了我的消费者组配置,以便我的消费者可以选择消息...


对象FlinkKafkaConsumer010不是包org.apache.flink.streaming.connectors.kafka的成员

我正在尝试组装一个小程序来使用apache flink连接到kafka主题。我需要使用FlinkKafkaConsumer010。
包uimp
import org.apache.flink.streaming.api.scala._
import org.apa...


如何在KSQLDB中构造输出主题的嵌套JSON消息

从一个源系统我收到了下面的事件有效负载
为下面的json有效负载创建了Stream1
事件JSON 1
{
“事件”: {
“头”: {
"Name":"abc &


在分布式模式下启动Kafka连接时出错

我开始卡夫卡连接分布模式,但得到以下错误。
这在我的其他开发集群上工作正常,但是,我只在我的生产集群中得到这个错误。
ERROR Stopping由于


如果超过特定的字节数,是否有可能丢弃聚合 (Kafka流)?

我的目标是在超过一定数量的字节 (序列化的json) 时将一个聚合设置为null。有没有办法或者保留时间是最终删除聚合的唯一方法?


在kafka-connect-http中支持第二个url的预身份验证?

我想使用现成的kafka连接器从REST API获取数据。我在汇合集线器上找到了kafka-connect-http连接器,但此连接器不支持预身份验证...


Quarkus背压配置

我得到了以下stacktrace使用quarkus smallrye反应消息与卡夫卡:
2020-07-24 01:38:31,662 ERROR [io.sma.rea.mes.kafka] (executor-thread-870) SRMSG18207: Unable to dispatch mesa.


REST代理中的EOS

如果数据正在使用REST代理注入Kafka集群,如何精确实现一次。
我使用的是confluent平台5.3版社区版。


Quarkus反向压缩管理

我得到了以下stacktrace使用quarkus反应消息与卡夫卡:
At org.jboss.threads.ContextClassLoaderSavingRunnable.ru n(ContextClassLoaderSavingRunnable.java:35)
在组织。


Org.apache.kafka.streams.errors.StreamsException错误

添加retention.ms = "604,800,000" 后,我在Kafka上得到以下错误。
Org.apache.kafka.streams.errors.StreamsException: 任务 [0_7] 中止发送,因为使用上一个捕获了错误


Kafka consumer fetch-min-size (fetch.min.bytes) 是否等待提到的大小被填充?

假设有107条记录,每条记录为kb。如果获取大小为15kb,在7次迭代中将消耗105kb。现在,只剩下2kb了,我会在下一个iterati中得到剩下的2条记录吗.


Kafka: 如何显示偏移

我对卡夫卡非常陌生。我已经在我的mac上使用自制软件安装了kafka和zookeeper,我正在玩快速入门指南。
我已经能够使用以下方式将消息推送到Kafka上


Kafka MirrorMaker2-不镜像消费者组偏移

我已经设置了MirrorMaker2 用于复制 2 DC之间的数据。
我的mm2.properties,
# Mm2.properties
Name = source->dest
Clusters = source,dest
Source.bootstrap.servers = localhost:9091
Dest.bootst.


如何使用命令行创建Kafka consumer group?

我使用的是社区版的Confluent 5.2。我用name employee-info创建了一个新主题。
我使用flink作为上述主题的消费者。在flink中,为了添加kafka源,我们需要在grou中传递.


Kafka Connect-alnot ALTER添加缺失字段SinkRecordField{schema = Schema{BYTES},name = 'createuid',isPrimaryKey = true},

我正在使用JDBC源连接器从Teradata表中读取数据并推送到Kafka主题。但是当我试图使用JDBC接收器连接器来读取Kafka主题并推送到Oracle表时,它会抛出.


Kafka-connect topic.前缀不带表名

我正在使用jdbc源连接器,我的表名称有特殊字符 (即 $) 这是DB引擎可以接受的,但是当我运行kafka-connect时,它会尝试创建kafka.


融合Kafka Connect: 同步运行多个sink连接器

我们使用的Kafka connect S3 接收器连接器连接到Kafka并将数据加载到S3 存储桶。现在,我想使用Copy命令将数据从S3 存储桶加载到AWS Redshift,为此我正在创建自己的


Kafka Connect HDFS Sink中的多个嵌套字段分区

我们正在运行kafka hdfs sink connector (版本 5.2.1),需要HDFS数据被多个嵌套字段分区。topics中的数据存储为Avro,并具有嵌套元素。How ever connect


JDBC Sink Connector-使用kafka从多个主题升级到多个表-connect-跟进

这与下面的线程中提到的主题有关。
JDBC Sink连接器-使用kafka-connect从多个主题升级到多个表
我知道它有点老的帖子。但是我的问题.


Spring-cloud-stream & kafka-客户端向后兼容

我们有一个微服务,目前使用spring-cloud-stream Ditmars.RELEASE,反过来使用kafka-clients 0.10.1.1。
我们有兴趣升级到spring-cloud-stream 2.0.0.RC3,在t.


使用Kafka进行数据建模?主题和分区

使用新服务 (例如非RDBMS数据存储或消息队列) 时,我首先想到的一件事是: “我应该如何构建我的数据?”。
我已经阅读并观看了一些介绍


如何在application.yml文件中配置多个kafka消费者

实际上我有一个基于springboot的微服务,我已经使用kafka从不同的系统产生/消费数据。
现在我的问题是我有两个不同的主题,基于主题我有两个。


春季-卡夫卡: 使用消费者暂停/恢复时发生的再平衡,这不应该按照文档

Spring-Kafka: 当根据文档使用暂停/恢复方法暂停/恢复消费者时,当使用自动分配但不起作用时,不应该发生重新平衡,重新平衡发生.


Kafka消费者再平衡算法

有人能告诉我什么是重新平衡算法为卡夫卡消费者?我想了解分区计数和消费者线程如何影响这一点。
谢谢,


我在哪里定义topic.metadata.refresh.int erval.ms?

我正在测试卡夫卡一点,希望能很快把它放入我的生产堆栈中。
我正在使用kafka-console-producer.sh和kafka-console-consumer.sh的文件来测试kafka的功能.


KAFKA: 到节点的连接失败身份验证由于: 身份验证失败由于无效的凭证与SASL机制SCRAM-SHA-256

我一直在尝试使用SASL_PLAINTEXT SCRAM-SHA-256 向我的Kafka经纪人添加SASL身份验证一段时间,但没有任何成功。我不断在Kafka的日志文件上得到以下错误。
……


Kafka生产者/消费者over WAN?

我在一个数据中心有一个Kafka集群。一群可能通过wan (甚至是internet) 进行通信的客户端将向/从集群发送/接收实时消息。
我从卡夫卡的


Kafka-消费者/生产者与所有Zookeper实例一起工作

我已经配置了一个Kafka代理集群和一个使用kafka_2.11-1.1.0 分配存档的Zk实例集群。
对于Kafka代理,我已经配置了config/server.properties
Broker.id = 1,2,3


用KafkaTemplate发布request.timeout.ms和Spring Kafka同步事件

我对通过Spring Kafka同步发布的事件的超时配置的最佳实践有点困惑。Spring Kafka文档提供了一个使用ListenableFu的示例.


Kafka Streams: 使用at_least_once时对状态存储的排序有任何保证吗?

我们有一个用处理器API构建的Kafka Streams Java拓扑。
在拓扑中,我们有一个单处理器,它保存到多个状态存储。
当我们使用at_least_once时,我们希望看到


有没有办法将Kafka消息从一个服务器上的主题转发到另一个服务器上的主题?

我有一个场景,我们使用fluentD代理将应用程序日志转发到Kafka主题,
由于Kafka团队引入了Kerberos认证和fluentD版本不支持这个authenti.


Kafka流异常: GroupAuthorizationException

我正在开发一个Kafka-Stream应用程序,它将从输入Kafka主题读取消息并过滤不需要的数据并推送到输出Kafka主题。
Kafka流配置:
@ Bean (名称 =


是否有可能将不完整的事件合并到KTable中?

我想知道KTable是否能满足我们的需求。
假设我在Kafka中有一个包含事件的主题myTopic,我在这个主题上插入了一个KafkaStreams应用程序。
让我们假设在时间t0,myTopic cont.