rd_kafka 的常见发布和订阅配置选项的整理

2021-05-29 16:25:25

rd_kafka 的常见发布和订阅配置选项的整理。这些选项可以用于设置 Kafka 生产者(发布者)和消费者(订阅者)的行为和属性。

发布者配置选项:

  1. bootstrap.servers:Kafka 集群的地址列表,用于引导连接。
    示例代码:

    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr));
    
  2. message.send.max.retries:消息发送的最大重试次数。
    示例代码:

    rd_kafka_conf_set(conf, "message.send.max.retries", "3", errstr, sizeof(errstr));
    
  3. request.required.acks:生产者需要接收到的分区写入确认数。
    示例代码:

    rd_kafka_conf_set(conf, "request.required.acks", "-1", errstr, sizeof(errstr));
    
  4. compression.codec:消息压缩编解码算法。
    示例代码:

    rd_kafka_conf_set(conf, "compression.codec", "gzip", errstr, sizeof(errstr));
    
  5. message.timeout.ms:消息发送超时时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "message.timeout.ms", "5000", errstr, sizeof(errstr));
    
  6. partitioner:生产者使用的分区器。
    示例代码:

    rd_kafka_conf_set(conf, "partitioner", "random", errstr, sizeof(errstr));
  7. max.in.flight.requests.per.connection:单个连接上允许的未完成请求的最大数量。
    示例代码:

    rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", errstr, sizeof(errstr));
  8. max.poll.interval.ms:消费者在轮询期间可空闲的最大时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "max.poll.interval.ms", "300000", errstr, sizeof(errstr));
  9. message.max.bytes:单个消息的最大字节数。
    示例代码:

    rd_kafka_conf_set(conf, "message.max.bytes", "1000000", errstr, sizeof(errstr));
  10. socket.send.buffer.bytes:发送套接字的缓冲区大小(字节)。
    示例代码:

    rd_kafka_conf_set(conf, "socket.send.buffer.bytes", "65536", errstr, sizeof(errstr));
    
  11. socket.receive.buffer.bytes:接收套接字的缓冲区大小(字节)。
    示例代码:

    rd_kafka_conf_set(conf, "socket.receive.buffer.bytes", "65536", errstr, sizeof(errstr));
    
  12. enable.idempotence:启用幂等性发送,确保消息的顺序性和唯一性。
    示例代码:

    rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr));
    
  13. retries:消息发送的总重试次数,包括网络错误和可重试的应用程序错误。
    示例代码:

    rd_kafka_conf_set(conf, "retries", "5", errstr, sizeof(errstr));


    14 在 librdkafka 中,设置自定义分区器回调函数的方式是通过 rd_kafka_conf_set_partitioner_cb 函数来实现。以下是一个正确的示例代码:

    int32_t my_partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) {

        // 自定义分区逻辑
        // 返回一个分区号(0 到 partition_cnt-1)
    }
    
    // 创建 Kafka 配置对象
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // 设置自定义分区器回调函数
    rd_kafka_conf_set_partitioner_cb(conf, my_partitioner_cb);

订阅者配置选项:

  1. bootstrap.servers:Kafka 集群的地址列表,用于引导连接。
    示例代码:

    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr));
  2. group.id:消费者所属的消费者组的唯一标识符。
    示例代码:

    rd_kafka_conf_set(conf, "group.id", "my-consumer-group", errstr, sizeof(errstr));
  3. auto.offset.reset:当消费者在启动时没有有效的偏移量时,对应的消费位置。
    示例代码:

    rd_kafka_conf_set(conf, "auto.offset.reset","earliest", errstr, sizeof(errstr));
  4. enable.auto.commit:启用自动提交消费位移。
    示例代码:

    rd_kafka_conf_set(conf, "enable.auto.commit", "true", errstr, sizeof(errstr));
  5. auto.commit.interval.ms:自动提交消费位移的间隔时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", errstr, sizeof(errstr));
  6. max.poll.records:每次轮询从单个分区中返回的最大记录数。
    示例代码:

    rd_kafka_conf_set(conf, "max.poll.records", "100", errstr, sizeof(errstr));
  7. fetch.wait.max.ms:在没有可用消息时,消费者等待获取新消息的最大时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "fetch.wait.max.ms", "100", errstr, sizeof(errstr));
  8. session.timeout.ms:消费者组中消费者被认为失效之前的超时时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "session.timeout.ms", "6000", errstr, sizeof(errstr));
  9. heartbeat.interval.ms:心跳间隔时间(毫秒),用于检测消费者组中的消费者是否存活。
    示例代码:

    rd_kafka_conf_set(conf, "heartbeat.interval.ms", "2000", errstr, sizeof(errstr));
  10. fetch.max.bytes:单次拉取请求从代理返回的最大数据量(字节)。
    示例代码:

    rd_kafka_conf_set(conf, "fetch.max.bytes", "1048576", errstr, sizeof(errstr));
    
  11. queued.min.messages:当消息排队的消息数低于此阈值时,轮询将阻塞(等待更多消息)。
    示例代码:

    rd_kafka_conf_set(conf, "queued.min.messages", "1000", errstr, sizeof(errstr));
    
  12. queued.max.messages.kbytes:待处理消息队列的最大总大小(千字节)。
    示例代码:

    rd_kafka_conf_set(conf, "queued.max.messages.kbytes", "10240", errstr, sizeof(errstr));
    

这些是一些常见的 rd_kafka 发布和订阅配置选项。

https://blog.csdn.net/dwjlyl/article/details/127432628