kafka生产规划和运维-八零岁月
记录所见
分享所感

kafka生产规划和运维

一、Kafka集群运维和规划

其实任何开源的分布式系统在开始规划时,就需要考虑到业务场景,以及生产环境的周边可观测系统,比如如下几个方面:

  • 规划和部署生产级别的集群(包含官方最佳实践以及一些针对不停场景推荐的配置项变更)
  • 执行一些部署后的操作(比如滚动重启集群,集群备份,数据迁移等等)
  • 集群的可观测性(监控重要统计数据,理解kafka内部行为的具体含义以及是否需要报警通知)

二、集群规划

本节主要介绍,kafka集群在生产环境部署前的一些规划,包含硬件配置选择,网络以及文件系统和其他考虑的选型.

1.硬件和OS

注意:通常对于分布式的开源服务来将对于硬件本身没有太高的要求,但当需要承载一定量级的业务时,我们需要考虑一些硬件是否能够支撑对应的业务场景,并且通常来讲针对不同的业务场景选择不同的硬件(如果可选择),也许会适当降低资源成本。

1.0 OS

一般来说,对于运行Linux中的kafka集群不需要过多的OS以及kernel参数调整,但如下几种情况可以根据具体情况进行参考:

  • 文件描述符(fd): broker节点上fd限制可以参考(number_of_partitions)*(partition_size/segment_size)公式
  • 套接字缓冲区(socket buffer): 该参数可以增加多数据中心之间的数据传输(一般异地集群备份建议调整以增加吞吐)
  • 最大内存映射区域数(vm.max_map_count): 当kafka broker节点拥有太多分区时应该密切关注系统级别的该参数,默认为65535。每一个日志段,分配的分区,都需要一对index/timeindex文件,而每个文件都会消耗一个内存区域(一个日志段使用2个内存映射区域),因此一个分区至少需要2个内存区域,一个broker上拥有50000分区时,将会消耗100000个内存区域,此时默认的参数就会导致broker 以OutOfMemoryError方式crash掉。

注意:每个分区的日志段的数量取决于段(segment)的大小,负载,以及保留策略

kafka使用大量的文件以及socket和客户端进行通讯,我们都知道,在Linux下,一切皆文件,因此系统需要设置更多的可用文件描述符>。

在大多数的默认系统配置中,单个进程可用使用1024个文件描述符,对于kafka来说太小了,建议调整到至少100000,但是通常和操作 系统以及发行版本有关系,需要根据具体的OS进行调整。

可用通过计算Kafka数据目录中的.index文件来计算当前的mmap编号。.index文件大多数代表了内存映射文件。

# 1.统计.index文件个数
$ find . -name ‘*index’ | wc -l
# 2.为每个session设置vm.max_map_count参数,这将计算当前内存映射文件的数量,mmap限制的最小值就是打开文件的ulimit限制 # 该值要远大于index的数量
$ sysctl -w vm.max_map_count=262144
# 3.持久化mmap参数
$ echo ‘vm.max_map_count=262144’ >> /etc/sysctl.conf $ sysctl -p

1.1 内存

Kafka严重依赖文件系统来存储和缓存消息。

所有数据都会立即写入文件系统上的持久日志中,而不必刷新到磁盘。实际上,这仅意味着将其转移到内核的页面缓存pagecache中。 当回收内存时,操作系统会将可用内存转移到磁盘缓存中,而对性能的影响很小。

同时,Kafka非常小心地使用堆空间heap space,不需要将堆大小设置为超过6 GB,这样将会在32G内存的机器上缓存28-30G的数据到文件系统。

因此,生产集群需要足够的内存来缓存活动的reader和writer。在Confluent的使用建议中,提出了对内存需求的粗略估计方式,比如需要缓冲30s,那么内存需求大概为write_throughput * 30

通常来讲64G内存配置的机器是一个不错的选择.

1.2 CPU

大多数的kafka集群对于cpu的要求不是那么高,因此对于CPU的配置没有像其他资源那么重要(但是通常同等资源都是由一定比例配比的)。

注意: 如果开启了SSL,那么可能对集群的整体性能有一定影响,且对cpu有一定要求,具体需要考虑到cpu的类型以及具体的JVM实现细节(通常来讲内部服务均不会开启SSL,因为管理成本很高,且性能上略有损失,安全上可以通过整体的IT安全进行要求和管控)

通常情况下建议使用较新的多核处理器,通用集群可以设置为24核心。

如果需要在更快的CPU或更多的内核之间进行选择,请选择更多的内核,因为多核提供的额外并发性将远远超过稍快的时钟速度。

1.3 Disk

生产集群建议使用多块磁盘来最大化整体的吞吐,不要与应用程序日志或其他OS文件系统活动共享用于Kafka数据的相同驱动器,以确保良好的延迟。

在官方的最佳实践中建议,可以将多块磁盘构建成RAID,或者直接将独立的多块磁盘作为kafka的数据存储,也就是JBOD方案(Just Bunch Of Disks)。

备注:如果软RAID的话其实会在存储方面增加一层数据均衡,增加了集群的复杂度,因此一般可用选择后者,而且RAID主要用于提供冗余,对于开源分布式服务来讲,在软件层面上基本都会保证数据的冗余。

不过在实际的场景中,具体选择使用多块盘做RAID还是直接使用多块盘挂载,以下有几种场景可以考虑:

如果配置多个数据目录,则Broker将在路径中放置一个新分区,该分区中当前存储的分区数最少。每个分区将完全位于数据目录之一中,如果分区之间的数据不平衡,就会导致磁盘之间的负载不平衡

RAID在平衡磁盘之间的负载方面做得更好,它能在较低的水平上平衡负载。RAID的主要缺点是减少了可用的磁盘空间(RAID0除外),好处是可以容忍磁盘故障(RAID1,RAID5等)。

在生产中强烈不建议使用RAID 5 or RAID 6 ,会严重影响到写吞吐的性能,并且在磁盘故障时会有重建阵列的I/O成本(RAID0下也存在重建I/O的成本)

如果额外的成本可以接受,建议使用RAID10(容量减半,多一份冗余),否则,建议Kafka服务器配置多个日志目录,每个目录都安装在单独的驱动器上。

linked使用8×7200转的sata磁盘,一般来说,磁盘吞吐量是性能瓶颈,磁盘越多越好。

kafka官方文档中其实建议使用多个驱动器以获得良好的吞吐量,因为每个路径都独立挂载在不同的磁盘上,这使得多块物理磁盘磁头同时执行物理I/O写操作,可以极大地加速Kafka消息生产的速度。

注意: 通常在使用本地盘时,容量可能会比较大,当磁盘容量超过2T时,Linux下默认的MBR分区就不能满足容量的要求了,此时需要在分区时进行GPT分区,否则等线上业务真正上线后会发现超过2T的空间就被浪费了。

另外一个问题就是,磁盘容量规划的问题,虽然kafka默认为全部的日志数据设置了7天保留时间,但是往往在海量的数据消费场景中,单天的数据量也可能达到好几个T,这就导致了需要提前对业务的场景和使用方式进行提前规划,并提前计算最少的存储量。

但一般对于磁盘空间的规划可以根据消息量大概估算,比如一天7亿条消息,单条1kb,消息副本为3(可保证2节点同时挂),那么大概的存储空间为7亿*3*1KB/1000/1000=2100G,也就是这种规模下的数据,一天产生2T的数据,实际使用数据为700G,1400G数据为冗余数据,此时我们在规划磁盘容量时就需要考虑到单天数据量的大小,以及数据的保留时间。

注意: 如果客户端开启了消息压缩,整体的数据能再相对小一些,可以根据具体情况来考虑

1.4 Network

在分布式系统中,快速可靠的网络是性能的一个重要组成部分(因此通常分布式系统中建议在同机房)。

低延迟确保节点可以轻松通信,而高带宽有助于集群节点之前的副本移动和恢复(往往在kafka集群中优先瓶颈点都是带宽)。

目前大多数的数据中心基本都是千兆(1 GbE)或万兆网络(10 GbE),对于大多数集群通常都是足够的。

应该尽量避免集群跨越多个数据中心,即使数据中心在很近的距离同地区,也要避免跨越巨大地理距离的集群。

备注:实际上在分布式系统中分区是肯定会发生的,通过避免跨机房部署能够降低分区的概率

Kafka集群假设所有节点都是相等的,较大的延迟可能会加剧分布式系统中的问题,并使调试和解决变得更加困难。

注意: 如果业务上需要异地进行数据读写,推荐的方法是在每个数据中心中部署一个本地Kafka集群,每个数据中心中的应用程序实例只与它们的本地集群交互,并在集群之间进行镜像(kafka提供了mirror-maker工具)。

1.5 Filesystem

现在操作系统中,大部分的系统应该都使用了Ext4XFS系统,官方也推荐使用这两种文件系统,但是对于具体的文件系统的选择,官方提供了如下几种场景和需要注意的点。

使用各种文件系统创建和挂载选项,在具有大量消息负载的集群上执行了比较测试,XFS带来了更好的本地时间(最好的EXT4配置是160ms vs. 250ms+),以及更低的平均等待时间。XFS性能在磁盘性能方面的可变性也较小。

不论是使用哪种文件系统,推荐修改默认的挂载参数:

  • noatime: 此选项禁止在读取文件时更新文件的atime(最后访问时间)属性,这可以消除大量的文件系统写操作,特别是在引导消费者的情况下,Kafka完全不依赖于atime属性,所以禁用它是安全的
$ cat /etc/fstab UUID=”4231b126-7e67-45c4-b8bf-554006291d35″ /export1 xfs defaults,noatime 0 2

XFS文件系统挂载参数优化:

  • largeio: 这会影响stat调用报告的首选I/O大小,尽管这可以在较大的磁盘写入上实现更高的性能,但实际上对性能的影响很小或没有影响
  • nobarrier: 于具有电池后备缓存的基础设备,此选项可以通过禁用定期写刷新来提供更高的性能。 但是,如果基础设备的行为良>好,它将向文件系统报告它不需要刷新,并且此选项将无效。

EXT文件系统挂载参数优化:

注意: 在ext4文件系统下,要获得最佳性能,则需要调整几个参数。这些选项在故障情况下通常是不安全的,并且将导致更多的数据丢失和损坏,对于单个broker故障,可以擦除磁盘并从群集重建副本,在多数情况下,多broker异常意味着潜在的文件系统损坏,无法轻易恢复。

  • data=writeback: Ext4默认为data = ordered,这使某些写入操作具有很强的顺序,在Kafka场景下其实不需要该参数,此设置消除了排序约束,并且似乎大大减少了延迟
  • Disabling journaling: 日志记录是一个折衷:它使服务器崩溃后重新引导更快,但它引入了大量额外的锁定,增加了写入性能的差异
  • commit=num_secs: 这调整了ext4提交到其元数据日志的频率。 将此值设置为较低的值可以减少崩溃期间未刷新数据的丢失。 将此值设置为较高的值将提高吞吐量。
  • nobh: 此设置控制在使用data=writeback模式时附加的排序保证,可以提高吞吐量和延迟
  • delalloc: 延迟分配意味着文件系统避免在物理写入发生之前分配任何块,此功能非常适合吞吐量

1.6 Application vs. OS Flush Management

Kafka始终会立即将所有数据写入文件系统,并支持配置刷新策略的功能,该策略控制何时使用刷新将数据从OS缓存中强制出到磁盘上。

可以控制此刷新策略,以在一段时间后或在写入一定数量的消息之后将数据强制到磁盘。 在此配置中有几种选择。

Kafka必须最终调用fsync才能知道数据已刷新。

当从崩溃中恢复任何未知的日志段时,Kafka将通过检查其消息的CRC来检查每条消息的完整性,并在启动时执行的恢复过程中重建附带的偏移索引文件

请注意,Kafka中的持久性不需要将数据同步到磁盘,因为发生故障的节点将始终从其副本中恢复。

我们建议使用默认刷新设置,该设置将完全禁用应用程序的fsync。

这意味着依靠操作系统和Kafka自己的后台刷新来完成后台刷新。

这为所有用途提供了最佳的解决方案:无需调节配置,提高吞吐量和延迟,并提供完全恢复保证。

通常,我们认为复制提供的保证要强于同步到本地磁盘,但是偏执狂仍然更愿意同时拥有两者,并且仍然支持应用程序级fsync策略。

使用应用程序级刷新设置的缺点是,其磁盘使用模式效率较低(它给操作系统减少了重新排序写操作的余地),并且由于大多数Linux文件系统中的fsync阻止了文件写入,因此它会引入延迟。 后台刷新进行更精细的页面级锁定。

1.7 理解Linux操作系统的Flush行为

在Linux中,写入文件系统的数据将保留在页面缓存中,直到必须将其写出到磁盘为止(由于应用程序级fsync或操作系统自身的刷新策略)。

数据刷新是通过一组称为pdflush的后台线程完成的(或在2.6.32版的内核“冲洗线程”中)。

Pdflush具有可配置的策略,该策略控制可以在缓存中维护多少脏数据以及必须将多脏数据写回到磁盘的时间.

pdflush刷新策略

当Pdflush无法跟上写入数据的速度时,它将最终导致写入过程阻塞写入中的延迟,从而减慢数据的累积。

与进程内缓存相比,使用pagecache存储将写入磁盘的数据有几个优点:

  • I/O调度将连续的小写批量写到更大的物理写中,从而提高吞吐量
  • I/O调度尝试重新排序写操作,以最小化磁盘头的移动,从而提高吞吐量
  • 它会自动使用机器上所有的空闲内存

2.节点配置

  • 1.避免使用太小的节点配置,因为这样整个集群的节点数可能会特别多,在这种机器上运行kafka将会有更多的开销
  • 2.避免使用太高配计算机,因为它们经常导致资源使用不平衡,比如内存优先不够了,但cpu还剩余很多。如果在每个高配机器上运行多个broker节点,将会增加整体的复杂度

3.JVM配置

在当前大多数Java类应用下,基本都在使用JDK8(建议使用最新的jdk8),在此环境下默认使用的是G1的垃圾回收器,因此一般情况下仅需要修改如下参数即可:

  • MaxGCPauseMillis: 指定每次垃圾回收默认的停顿时间,默认值200ms
  • InitiatingHeapOccupancyPercent: G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45

官方推荐的GC参数如下:

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

作为参考,LinkedIn最繁忙的集群当前是如下情况:

  • 60 brokers
  • 50k partitions (replication factor 2)
  • 800k messages/sec in
  • 300 MBps inbound, 1 GBps + outbound

上面的GC调优看起来比较激进,但集群中的所有broker都会有90%的gc中止时间,大概21ms,它们做不到每秒一个young GC

作为同样是图片社交的Pinterest来讲,他们采用如下的JVM参数:

Pinterest的大规模kafka实践

# 61G的内存
-Xms8g -Xmx8g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:MetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log -XX:UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M
-Xmx10G -Xms10G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/opt/app/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

4.kafka核心配置

Kafka默认设置在大多数情况下都能工作,特别是与性能相关的设置和选项,但是考虑到集群的规划以及场景用途,有一些补充的配置参数可以对生产环境进行调优。

通常配置上来讲会分为broker端配置produser端配置consumer端配置,由于各个业务方当前均使用开源客户端,因此对于客户端的配置无法做到严格管控(如果有内部的sdk封装可能会比较好)。

4.1 重要的客户端配置

  • acks: 消息一致性保证(0:投递即成功,1:副本同步即成功,all/-1:全部ISR同步即成功)
  • compression: 压缩类型
  • batch size: 批处理大小

注意: 对于消费者来说,最重要的参数为fetch size

鉴于集群的整体可用性可靠性其实很大一部分和客户端的使用方式有关,后面会列举一些常见的生产者和消费者端的核心参数

kafka详细参数列表

4.2 broker核心配置

zookeeper.connect

zk连接串,建议写全部的zk节点地址。

brokers链接的zk集群地址,该值默认采用host:ip/path来指定一个zk中的znode节点,通常情况下用来隔离环境. kafka的zk路径中使 用了chroot环境,如果不指定使用默认的/来作为存储路径。

broker.id

broker唯一标识,该值可以任意设定(int类型)。默认reserved.broker.max开始,每次+1

在分布式集群中,可以手动指定每个broker的节点信息,同时也可以使用如下方式来自动生成每个broker的id

#broker.id broker.id.generation.enable=true

log.dirs

kafka存储的日志消息都是保存在该参数指定的日志路径下,该值可以指定多个磁盘路径,通常我们会绑定到多个磁盘上。比如log.dirs=/exoprt/kafka1,/export/kafka2,/export/kafka3

对应的另外一个默认参数为log.dir,默认值为/tmp/kafka-logs

listeners

broker监听列表,默认将为PLAINTEXT://myhost:9092

advertised.listeners

监听器发布到zk集群中的地址,供客户端使用,默认采用listeners参数值

num.recovery.threads.per.data.dir

每个数据目录用于在启动时进行日志恢复和在关闭时进行刷新的线程数,默认值为: 1

生产环境下,该值可以适当的调整大一些,用来增加正常关闭时数据flush的速度,以及启动时日志恢复的速度,可以提高broker节点的整体可用性。

对于如下几种情况,kafka会使用可配置的线程池来处理日志片段.

  • 服务器正常启动: 用于打开每个分区的日志片段
  • 服务器崩溃后重启: 用于检查和截断每个分区的日志片段
  • 服务器正常关闭: 用于关闭日志片段

默认情况下,每个日志目录采用一个线程,因为这些线程仅有在启动和关闭时才用到,所以可以适当设置大一点,并不会影响到整体服务的性能,特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。

需要注意的是,该值是每个日志目录的线程数,因此总线程数需要考虑到log.dirs的配置

备注: 这也是在使用RAID和JBOD两种磁盘方案的另外一个考虑点

delete.topic.enable

是否允许删除topic,默认为: true

如果为false,通过管理工具删除topic仅为标记删除,此时使用describe命令可以查看到topic的详情信息,但是无法写入,可以通过删除zk中的节点来删除

备注: 生产环境建议设置为false,由集群管理员定期统一的进行删除和管理

auto.create.topics.enable

默认情况下,kafka会使用如下三种方式创建topic:

  • 当一个生产者开始往主题写入消息时
  • 当一个消费者开始从主题读取消息时
  • 当任意一个客户端向主题发送元数据请求时

推荐是设置成false,不允许客户端直接创建topic,否则topic会无法管理。默认值为true

auto.leader.rebalance.enable

是否开启leader自动平衡,默认值为true。后台会有线程进行定期检查leader的分布情况

kafka中有一个被称为优先副本(preferred replicas)的概念(通常分区会有主分区和副本分区的概念,主分区先写入,然后push到其他副本分区)。

如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。

当0节点的broker挂掉时,会启动1这个节点broker当做leader。

当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制(前提是第一次partition在分配的时候,它本身就是一个相对平均的分配)

auto.leader.rebalance.enable=true
# 对应影响的其他两个参数
# leader.imbalance.per.broker.percentage : 每个broker允许leader不平衡比例(如果每个broker上超过了这个值,controller将会>执行分区再平衡),默认值10.
# leader.imbalance.check.interval.seconds: 主分区再平衡的频率,默认值为300s

其实,在重要业务的场景中,需要将leader.imbalance.per.broker.percentage参数适当调整小一些,以避免broker中leader不平衡率导致节点的吞吐不均匀

num.partitions

自动创建topic的默认分区数,默认为1,通常生产集群不建议打开topic自动创建,一方面是不便于管理和追溯,另外一方面因为自动创建默认分区时1,且无法动态变更,造成的风险可能会比较大。

多分区的topic有这更好的数据平衡能力,并且可以帮助消费者进行并行化消费。

注意: 对于有key的数据,避免改变分区的数量

default.replication.factor

适用于自动创建的主题的默认复制因子,推荐至少设置为2,默认为1

min.insync.replicas

当使用required.acks=-1(all)提交到生产请求所需的ISR中的最小副本数,默认为1,建议在数据一致性要求较高的topic中设置至少为2

指定ISR的最小数量。当producer设置ack=all(-1)时,该值指定的副本必须全部写成功,才认为消息写入成功,否则生产者将抛异常(either NotEnoughReplicas or NotEnoughReplicasAfterAppend)

注意min.insync.replicas参数和生产者ack参数一起使用,可以加强整个消息的持久性

示例:(3副本的topic,可以设置该值为2,同时生产者ack设置为all,这将确保大多数副本没有收到写操作时,生产者直接异常)

默认值为:1

unclean.leader.election.enable

是否启用不在ISR集中的副本以选作领导者,即使这样做可能会导致数据丢失。该参数可以提高整体Topic的可用性,但是可能会造成数据的整体不一致性(部分数据的丢失)。

kafka可用性和可靠性保证

默认值为:false

为false,就只从ISR中获取leader保证了数据的可靠性,但是partition就失效了,true则从replica中获取,则可用性增强,但是数>据可能存在丢失情况

注意: 该参数实际上在设置的时候也有一定的争议性,比如,我们知道副本是有ISR的,即正在同步的副本,如果当前的broker宕>机导致需要选举leader partition,此时如果ISR内除了leader之外还有其他副本(但谁又能保证一定有呢),那直接从ISR中选举leader>即可,如果没有的话,当auto.leader.rebalance.enable=true时,就会去其他存活的副本中选举leader,此时可以增强整体的可用性,但是如果存活的副本不在ISR中,即意味着数据可能有一定的丢失了。但是如果该参数为false的话,ISR中没有,就直接异常了,为了保证数据的一致性。

该参数的存在其实是在可用性和可靠性之间做了一个权衡,为true时保证了可用性AP,为false时保证了一致性CP

数据一致性保证: ISR就保存了kafka认为可靠的副本,它们具备这样的条件:

  • 落后leader的消息条数在一定阈值内
  • 或者落后在一定时间内;

num.replica.fetchers

该参数指定了fetch线程的数量(从源broker中复制消息的fetch线程),默认值: 1

其实可以适当的调整大一些,可以增强副本之前的同步效率,设置为2可以满足大多数的场景

num.io.threads

broker处理请求的 IO 线程数,需要考虑到磁盘的IO状况。默认值为:8

该值通常情况下在生产环境也可以适当的调整大一些,但是通常情况下需要考虑到所使用的的磁盘的整体情况,可以尝试设置为num.network.threads的两倍,比如d1ne.4xlarge的规格其实可以设置为20

num.network.threads

指定broker用来接收来自网络的请求和发送网络的响应的线程数,默认值为: 3

在生产环境中,可以根据网络的情况以及cpu和内存的使用情况将该值默认调大一些,基本上可以为逻辑cpu核心的2/3都是可以的,比如我们生产环境采用d1ne.4xlarge规格的实例,可以设置该值为10

background.threads

后台任务处理线程数(例如过期消息删除等)。默认值为:10

socket相关

  • socket.send.buffer.bytes: (socket发送缓冲区:SO_SNDBUFF) 默认值:102400
  • socket.receive.buffer.bytes: (socket接收缓冲区:SO_RCVBUFF) 默认值:102400
  • socket.request.max.bytes: (请求最大值,message.max.bytes要小于该值较好) 默认值:104857600

在生产环境中如果涉及到多可用区,或多机房环境时,socket缓冲区的默认值也是有点小的,因此尝试可以尝试将该值设置为1M或者2M,即socket.send.buffer.bytes=2097152/1048576,生产环境推荐优化该参数

message.max.bytes

该值表示kafka允许的最大的batch大小(不是单个message的大小),默认值为1000012,即1MB.

在最新的消息格式版本中,为了提高效率,一般消息的提交都是采用batch的方式。

注意: 在以前的消息格式版本中,未压缩的记录不会分组成批,在这种情况下,此限制仅适用于单个记录。

在每个topic级别可以使用max.message.bytes设置

通常情况下,如果是一个集群承担多种业务场景,通常需要将该值设置为可以满足大多数场景的配置,比如41943044MB

log相关(具体到topic级别)

  • log.segment.bytes: 单个日志段(segment)的大小,默认为1073741824,即1GB
  • log.segment.delete.delay.ms: 日志段从文件系统中删除前等待的时间,默认为60000,即1min
  • log.cleanup.policy: 保留窗口之外的日志清理策略可同时指定多个策略如: [delete,compact]
  • log.cleaner.enable: 启用日志清除器进程,和cleanup.policy = compact参数结合使用,默认为true
  • log.cleaner.threads: 日志清理的后台线程数量,默认为1
  • log.cleaner.delete.retention.ms: 删除的日志保留的时间,默认为86400000
  • log.retention.bytes: 删除日志前,日志最大的大小,超过该值即删除,默认-1,作用在每个partition,会影响整个topic的容量
  • log.retention.minutes(hours|ms): 日志保留时间,如果没指定,默认使用hours参数
  • log.retention.check.interval.ms: 日志清理器检查日志是否符合删除条件的频率,默认为300000
  • log.flush.interval.messages: 将消息刷新到磁盘之前在日志分区上累积的消息数,默认为9223372036854775807
  • log.flush.interval.ms: 主题中的消息在刷新到磁盘之前保存在内存中的最大时间,默认为null(log.flush.scheduler.interval.ms参数的值)
  • log.flush.scheduler.interval.ms: 日志刷新器检查是否需要将日志刷新到磁盘的频率,默认9223372036854775807
  • log.flush.offset.checkpoint.interval.ms: 更新最后一次刷新的持久记录(被作为恢复点)的频率,默认为60000
  • log.flush.start.offset.checkpoint.interval.ms: 更新日志起始偏移量的持久记录的频率,默认60000
  • log.roll.hours: 新日志段(segment)被创建前的最大时间,默认168,如果没设置优先使用log.roll.ms

offsets相关

  • offsets.commit.required.acks: offset提交之前是否需要ack确认,默认值:-1
  • offsets.commit.timeout.ms: 当偏移量注意_offset的所有副本接收到提交或超时达到该时间时,offset提交将延迟. 默认值:5000
  • offsets.load.buffer.size: 偏移量加载到缓存中时从偏移量段读取的批处理大小. 默认值:5242880
  • offsets.retention.check.interval.ms: 历史offset检查的频率,默认值:600000
  • offsets.retention.minutes: 在消费者组的消费者全部异常之后,offset保留的时间,默认值:10080
  • offsets.topic.compression.codec: 偏移量主题的压缩解码器,默认:0
  • offsets.topic.num.partitions: offset提交主题的分区数量,默认:50(注意:集群部署后不要改变)
  • offsets.topic.replication.factor: offset提交主题的副本数,默认:3 (在集群大小满足此复制因子要求之前,内部主题创建将>失败,该主题非常重要,需要要求强一致性)
  • offsets.topic.segment.bytes: offset提交主题的段大小,设置相对较小,以便更快地实现日志压缩和缓存负载,默认值:104857600,即1Mb

queue相关

  • queued.max.requests: 在网络阻塞线程前,数据平面允许的排队请求数,默认值:500

replica相关

  • replica.fetch.min.bytes:每个fetch响应所需的最小字节数,默认值:1
  • replica.fetch.wait.max.ms: 由follow副本发起的每个fetch请求的最大等待时间,该值应小于replica.lag.time.max.ms,以避免 为低吞吐量的主题频繁地收缩ISR,默认值:500
  • replica.lag.time.max.ms: follow副本在该时间内没有和leader副本同步,或没有发送任何同步请求,将会被leader副本从ISR中删>除. 默认值:10000,即10s
  • replica.socket.receive.buffer.bytes: 副本接收请求的网络缓冲区,默认值:65535
  • replica.socket.timeout.ms: 网络请求的超时时间,默认值:30000
  • replica.fetch.backoff.ms: 发生获取分区错误时要休眠的时间,参数不是很重要,默认值:1000
  • replica.fetch.max.bytes: 尝试为每个分区获取的消息字节数,参数不是很重要,默认值:1048576,即1M

broker.rack

broker所在的机架,用来感知机架的变化,通常多个分区不会放在同一个机架上

示例: RACK1us-east-1d

controller控制器相关

  • controlled.shutdown.enable: 启用控制器关闭,默认:true
  • controlled.shutdown.max.retries: 控制器会因为各种原因而宕机,该值表示控制器的重试次数,默认:3
  • controlled.shutdown.retry.backoff.ms: 在每次重试之前,系统从前一次故障(控制器fail over或副本延迟)的状态中恢复过来的时 间,默认:5000
  • controller.socket.timeout.ms: 控制器到broker角色转换的socket超时时间,默认:30000

group相关(消费组)

  • group.max.size: 消费组中最大消费者数量
  • group.initial.rebalance.delay.ms:注册消费者允许的最小会话超时,默认:6000

注意: 很多参数是有不同级别的生效范围的,比如:

  • read-only: 仅在broker重启后才能生效
  • per-broker: 可以为每个broker动态更新
  • cluster-wide: 可作为集群范围内的值动态更新,也可以在每个broker上更新进行测试

broker配置作用范围

示例配置

# ZooKeeper地址
zookeeper.connect=[list of ZooKeeper servers]
# kafka log相关配置
num.partitions=8 default.replication.factor=3 log.dirs=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]
# 其他配置核心配置
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.] listeners=[list of listeners] auto.create.topics.enable=false
# 最小的isr数量,可以在topic级别设置
min.insync.replicas=2 queued.max.requests=[number of concurrent requests]

注意: 在kafka集群中,当broker集群参数确定后,还有一些针对topic的参数是可以进行动态调整的,以提高kafka服务的灵活性。

4.3 Topic级别的动态参数调整

Topic级别的配置

注意: 如下topic的参数可以在使用过程中进行动态调整,使用kafka-topic.sh工具中的alter参数来直接修改topic相关的参数。

cleanup.policy

一个字符串是“删除”或“压缩”或两者兼而有之. 默认值: [compact, delete]

compression.type

日志压缩类型,默认值为producer

delete.retention.ms

用于日志压缩主题的删除保留时间。默认值:86400000

max.message.bytes

指定每个topic可发送的最大消息(batch size)字节数.(区别于全局的message.max.bytes参数)

num.partitions

指定创建topic的默认分区数量,该值默认为1,建议根据具体的情况进行设定,越多的分区对于海量数据来说可以提高吞吐,但是对于少量数据来说,也可能增加网络消耗.

一般情况下,我们会对默认的topic的分区进行过度分配,以防止后期带key的message扩容分区导致的问题,一般建议初始值设置为5-10

注意:分区数一旦指定,只能增加,不能减少

default.replication.factor

指定kafka副本数,默认每个主题分区均有一个副本,当该副本所在主机异常,可能造成数据的丢失,建议在适当场景将副本至少设置成 2个,以尽快保证数据的一致性。默认值:1

注意:自动创建主题的副本因子

retention.ms

kafka的log保留时间,也可以使用log.retention.hours参数来配置保留时间,默认168小时,即一周。

retention.bytes

指定log保留的大小,作用在每一个partition上,加入一个topic有3个partition,设置了log.retention.bytes为1GB,则表示整个topic仅可以存储3GB的数据,超过该容量的数据会被进行自动删除。

此时,临时增加该topic的容量的方法就是调整该参数,或调整topic的partition个数。

-1表示不限制

segment.bytes

指定每个日志段的大小,通常在消息到达broker时,会被追加到分区的当前日志段上(segment),当日志段大小超过该参数指定的值(默认1GB),当前日志段就会被关闭,一个新的日志段被打开。

如果一个日志片段被关闭,就开始等待过期,该值不建议设置太小。

segment.ms

上面会指定日志段的分割,该参数会指定历史的日志段的过期时间。该参数会和log.retention.bytes一起校验,谁先满足就生效。

message.max.bytes

该值用来限制单个消息的大小,默认值为1000 0001MB,如果超过该大小,broker不会接受,而且会抛出相关异常

注意:该参数指的是消息被压缩后的大小,通常生产中的消息生产会使用gzip或snappy来进行压缩

注意:消息的大小对于性能有比较显著的影响,越大负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求,还会增加磁 盘写入块的大小,从而影响 IO 吞吐量。

file.delete.delay.ms

在从文件系统中删除一个文件前的等待时间

flush.messages

该参数允许我们指定一个间隔来强制同步数据到本地磁盘,比如设置为1,表示每条消息后都会执行同步磁盘,如果设置为5表示,每5个消息同步一次。

一般情况下,管法定不建议修改该参数,可以使用副本机制来保证持久性和开启操作系统的后台flush功能,会更加有效率。

flush.ms

同上,但是指定的是时间间隔。比如设置1000,表示1000ms后执行一次同步操作.

follower.replication.throttled.replicas

对日志复制的副本列表应该被在follow侧进行限流。

副本列表应该为[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]格式,或者使用通配符*来表示给topic的所有副本进行限流。

leader.replication.throttled.replicas

同上,在leader侧进行限流

index.interval.bytes

该参数用来控制多久kafka会增加一个index实体数据到它的offset的index上。默认设置确保我们大约每4096字节索引一条消息。索引越多,读取越接近日志中的确切位置,但索引越大。通常不需要修改该参数。

max.message.bytes

被kafka允许的最大记录的batch size。

如果增加了该值,并且有消费者版本老于0.10.2,消费者的fetch size也必须增加,这样就能获取到该批次大小的数据。

在高版本的消息格式(message format)中,为了效率,记录总是会被分组成batch;而在之前的消息格式版本中,未被压缩的消息不会被分组成batch,在这种情况下,该参数仅在单条记录上生效。

message.downconversion.enable

此配置用于控制是否启用消息格式的向下兼容以满足消费者请求。

当设置成false,对于希望使用较旧消息格式的消费者来说,broker将不会执行向下转换。来自旧客户端的消费者请求,broker将会返回UNSUPPORTED_VERSION错误码。

此配置不适用于可能需要复制到follower的消息格式转换。

message.format.version

指定消息格式版本,broker将使用append方式追加message到log里。该值必须是一个可用的ApiVersion,比如0.8.2, 0.9.0.0, 0.10.0

通过设置特定的消息格式版本,用户可以验证已存在磁盘上的消息是否小于或者等于指定的版本。

错误地设置此值将导致使用较旧版本的使用者中断,因为他们将收到他们不理解的格式的消息。

message.timestamp.difference.max.ms

broker接收到消息的时间戳和在消息体内部的时间戳的最大时间差距。如果message.timestamp.type=CreateTime,当时间戳的差值大于该值,一个message将被拒绝。如果message.timestamp.type=LogAppendTime,该参数将被忽略。

message.timestamp.type

指定message中时间戳的类型。CreateTimeLogAppendTime

min.cleanable.dirty.ratio

该参数用于控制,日志压缩器(log compactor)将尝试清除日志的频率。默认情况下,我们将避免清除已压缩超过50%的日志的日志。这个比率限制了最大空间浪费(日志中的重复,50%或50%以上的日志重复)。

比率越高,意味着越少,更有效的清理,但是也意味着更多的日志空间浪费。

min.compaction.lag.ms

一条消息保持未压缩的最小时间。仅适用于正在压缩的日志。


4.3 Producer核心参数

bootstrap.servers

指定broker地址

key.serializer

broker 需要接收到序列化之后的k/v值,所以生产者需要将序列化后的值发送过来。

org.apache.kafka.common.serialization.Serializer该类表示把键对象序列化为字节数组

  • ByteArraySerializer: 默认的序列化方式
  • StringSerializer:
  • IntegerSerializer:

value.serializer

指定序列化后的value,需要实现org.apache.kafka.common.serialization.Serializer接口

org.apache.kafka.common.serialization.StringSerializer

compression.type 指定消息压缩类型:gzip,snappy等,

broker端也有该参数,默认值为:producer,表示遵循生产者的压缩方式

注意:生产者使用何种压缩方式,消费者将必须使用该方式进行解压缩

acks

该参数用来声明要有多少个分区副本接收消息后,生产者才认为消息写入成功,也就是数据一致性衡量,该参数对消息的丢失的影响较>大. 默认值为:1

  • acks=0: 表示生产者不知道消息是否被broker成功接收被处理,反正自己发出去了就认为是成功了,该种清理增加了吞吐,但是也>增加的数据丢失的风险,因为程序的稳定性,网络的稳定性都可能会影响到消息的生产
  • acks=1: 只要集群中leader接收到消息并成功处理,就返回给生产者写入成功的消息。该种情况,如果发送过程中网络出现问题或>者kafka集群异常导致leader没工作而导致消息写入失败,生产者会受到写入失败相关的异常,此时生产者可进行重试
  • acks=all/-1: 表示所有参与复制的节点都收到消息时,生产者才会接收到来自服务器端写入成功的消息,该种情况下,整体的消息 确认延迟会更高一些,但是数据的一致性也更强一些

注意: 消息的发送其实也分syncasync,即同步和异步,kafka为了保证消息高效传输会决定是同步发送还是异步发送。如果让>客户端等待服务器的响应(通过调用get()方法)也会增加延迟,如果采用客户端回调方式,延迟问题可能会有好转。

buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者会用它来缓冲要发送到服务器的消息,以此来提供消息传递的效率。默认值:33554432

注意:如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足,此时send()方法就会阻塞或者直接异常,取 决于block.on.buffer.null参数

retries

生产者从服务器收到的错误有可能是临时性的错误,比如暂时找不到leader或者当前partition正在迁移无法找到相关的partition,>这种情况下,该参数可以决定生产者的行为,如果重试次数超过之后,生产者就会放弃重试,并返回错误。

默认情况下,生产者在每次重试之间等待100ms,这个等待参数可以通过retry.backoff.ms来修改

batch.size

指定每次提交的batch大小,默认值:16384

当有多个消息需要被发送同一个分区(如何决定是发送到同一个分区?)时,生产者会把他们发送到同一个批次里.

该参数用来指定一个批次提交的大小,当达到该batch的大小,所有的消息会被统一发送至broker

client.id

该参数用来指定客户端的id,不过可以不用指定,注册后为每个客户端生成64为的id

max.in.flight.requests.per.connection

此参数指定了生产者在收到服务器响应之前可以发送多少消息,它的值越高,就会占用越多的内存,不过也会提高吞吐量

把它设为1 可以保证消息是按照发送的顺序写入服务器。

timeout相关参数

  • request.timeout.ms: 生产者在发送数据时等待服务器返回的响应时间,默认值:30000
  • metadata.fetch.timeout.ms: 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间
  • timeout.ms: 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配

max.block.ms

此参数指定了在调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间.

当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞,阻塞时间超过该参数值时,生产者抛出异常

max.request.size

该参数用于控制生产者发送的请求大小.

它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小

receive.buffer.bytes和send.buffer.bytes

为了保证可靠的消息传输,这两个参数分别指定了 TCP Socket 接收和发送数据包的缓冲区的大小,默认为-1,表示使用操作系统的>默认值。

注意: 如果生产者或消费者与broker所处的数据中心不同,该值可以适当调大

4.4 Consumer核心参数

在消费者组中的消费者重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用

消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。

对于不同步的消费群体来说,其组织协调者可以是不同的。

只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心>跳。

如果一段时间,消费者不发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡 。

如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡.

注意: 在这段时间里,组里的消费者将不处理消息(STW)

_consumer_offset主题就主要是用来记录相关消费者的偏移量以及消费者分区分配的

fetch.min.bytes

指定了消费者从服务器获取记录的最小字节数,默认:1

broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它>返回给消费者。

这样可以降低消费者和 broker 的工作负载,因为它们在主题使用频率不是很高的时候就不用来回处理消息。

如果没有很多可用数据,但消费者的 CPU 使用率很高,那么就需要把该属性的值设得比默认值大。

如果消费者的数量比较多,把该属性的值调大可以降低 broker 的工作负载。

fetch.max.wait.ms

上面参数用来控制每次fetch时的最小数据量,但也不能一直等待数据的容量满足要求,因此还有另外一个参数,即fetch.max.wait.ms,指定多长时间还没满足数据容量就进行fetch数据,默认是500ms

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认值为1MB.

KafkaConsumer.poll()方法从每个分区返回的记录最多不超过该值指定的大小。

加入一个20分区的主题,拥有5个消费者,那么每个消费者必须至少4MB的内存来接收消息(每个消费者消费4个分区,每个分区返回消>费者的最大字节数1MB)。

注意: 该参数的设置要适当的设置大一些,防止单个消费者异常后,整体内存受限。

至少,该参数的值要大于max.message.size(broker接收消息的最大字节数),否则消费者无法读取这些消息,导致消费者一直重试并>挂起。

session.timeout.ms

指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,在这个时间内没有发送心跳就会直接认为消费者死亡,此时 协调器就会进行触发consumer rebalance.

此参数与heartbeat.interval.ms(poll() 方法向群组协调器发送心跳的频率)强相关。

auto.offset.reset

指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的该如何处理,默认值为latest,意思是在偏移量无效的情况下>,默认从最新的记录下开始读取数据。可选值为earliest,表示从最开始位置进行读取.

enable.auto.commit

指定了消费者是否自动提交偏移量,默认值是 true,对应auto.commit.interval.ms参数来保证每次提交偏移量的频率

为了避免数据重复和丢失,消费者可以设置为false,由自己决定自己的消费位置(客户端保证数据消费的一致性)

partition.assignment.strategy

PartitionAssignor(分区分配器)会根据给定的消费者和主题,决定哪些分区应该被分配到哪个消费者,默认有两个策略:RangeRoundRobin

max.poll.records

用于控制单次调用call() 方法能够返回的记录数量,可以帮你控制在轮询中需要处理的数据量.

heartbeat.interval.ms

在消费组中,消费者心跳到消费者协调器的频率,默认值:3000ms

三、集群管理

任何一款优秀的开源软件,都会提供比较丰富的集群管理工具来帮助使用者(管理员和实际使用者)来对集群进行操作,记下来从三个角度来大概讲解集群管理相关的操作。

  • 官方提供的操作脚本
  • kafka-manager
  • kafkacat

1. 官方工具

在kafka的发行包中,默认包含了如下管理工具脚本:

$ ls
connect-distributed.sh kafka-delete-records.sh kafka-server-stop.sh connect-mirror-maker.sh kafka-dump-log.sh kafka-streams-application-reset.sh connect-standalone.sh kafka-leader-election.sh kafka-topics.sh kafka-acls.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh kafka-broker-api-versions.sh kafka-mirror-maker.sh kafka-verifiable-producer.sh kafka-configs.sh kafka-preferred-replica-election.sh trogdor.sh kafka-console-consumer.sh kafka-producer-perf-test.sh windows kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh

这里主要介绍几个常用的工具脚本:

运维管理类

  • kafka-topics.sh: 用来创建,删除,查看,改变一个topic参数的工具
  • kafka-reassign-partitions.sh: 用来对partition进行重新分配(管理员会较多使用)
  • kafka-log-dirs.sh: 用来查看指定broker下日志目录的使用空间
  • kafka-leader-election.sh: 用于一组Topic分区的leader重新分配,可以支持优先副本和非同步副本(不在ISR中),老版本中的kafka-preferred-replica-election.sh脚本
  • kafka-replica-verification.sh: 该工具可以用来检查topic的一组副本的数据是否一致
  • kafka-broker-api-versions.sh: 用来查看指定broker当前支持的各个接口的版本(kafka高版本已经保证了向下兼容)
  • kafka-configs.sh: 用来操作和查看topic, client, user or broker的实体配置

kafka操作类

  • kafka-console-consumer.sh: 通过终端来启动消费者
  • kafka-console-producer.sh: 通过终端来启动生产者
  • kafka-consumer-groups.sh: 用来查看,删除或者重置消费者组offset
  • kafka-consumer-perf-test.sh: 用来进行消费者压力测试
  • kafka-producer-perf-test.sh: 用来进行生产者压力测试
  • kafka-delete-records.sh: 删除指定分区的记录,直到指定的offset
  • kafka-mirror-maker.sh: 用于多集群之间同步topic数据
  • kafka-server-start.sh: broker启动脚本
  • kafka-server-stop.sh: broker关闭脚本
  • kafka-streams-application-reset.sh: 流式应用工具
  • zookeeper-shell.sh: kafka工具中也默认提供了zookeeper管理工具(不太好用)

1.1 kafka-topics.sh

topic创建

  • --create: 创建topic
    • --topic: 指定topic名称
    • --partitions: 指定分区数量
    • --replication-factor: 指定副本数量(仅在创建时可用)
    • --config: 指定topic级别的参数(动态参数,可修改)
    • --replica-assignment: 手动指定partition到broker的分配<part1_replica1:part1_replica2,part2_replica1:part2_replica2>
# 创建topic
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –create –topic bgbiao.top Created topic bgbiao.top.
# 查看默认创建topic的参数详情(由broker配置决定) # 默认3个分区,1个副本
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –describe –topic bgbiao.top
Topic: bgbiao.top PartitionCount: 3 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824 Topic: bgbiao.top Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: bgbiao.top Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: bgbiao.top Partition: 2 Leader: 3 Replicas: 3 Isr:
3 # 指定参数创建topic # 指定分区为5,副本为3,topic数据保留2分钟
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –create –topic bgbiao.top-1 –partitions 5 –replication-factor 3 –config retention.ms=120000
# 分区,副本和指定参数都改变了
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –describe –topic bgbiao.top-1 Topic: bgbiao.top-1 PartitionCount: 5 ReplicationFactor: 3 Configs: min.insync.replicas=1,segment.bytes=1073741824,retention.ms=120000 Topic: bgbiao.top-1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: bgbiao.top-1 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: bgbiao.top-1 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: bgbiao.top-1 Partition: 3 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: bgbiao.top-1 Partition: 4 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

topic更改

注意: topic的分区(partitions)可以根据需要进行调整(只能调整大,不能调整小),而且在调整分区的过程中,对于一个有key的主题来说,一条消息的分区逻辑和顺序性可能会受到影响。

注意: 副本数一旦topic创建之后,就不能再修改了,除非进行重分配(副本数不能超过broker数量哦)

  • --alter: 修改分区数量,replica分配,或者topic的动态配置项(结合–topic参数)
    • --partitions: 修改指定Topic的分区数量
    • --config: 修改topic的指定参数(动态参数调整:key=value)
    • --delete-config: 删除topipc的指定参数()

注意: 通常情况下--replica-assignment参数需要和--partitions一同使用才可以指定分区下的副本到broker节点上的分配,相当于手动扩容迁移

# 上面我们刚开始创建的bgbiao.top的topic是1个分区1个副本,这里采用alter参数进行修改基本配置
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –alter –topic bgbiao.top –replica-assignment 1,2,3,1,2 –partitions 5
# 因为bgbiao.top 这个topic再创建时只有一个replication,因此–replica-assignment参数只能指定副本分配在那个broker上,无法指定多个副本的关系
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –describe –topic bgbiao.top Topic: bgbiao.top PartitionCount: 5 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824 Topic: bgbiao.top Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: bgbiao.top Partition: 1 Leader: 2 Replicas: 2 Isr: 2 Topic: bgbiao.top Partition: 2 Leader: 3 Replicas: 3 Isr: 3 Topic: bgbiao.top Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Topic: bgbiao.top Partition: 4 Leader: 2 Replicas: 2 Isr: 2

topic相关信息查看

  • --list: 列出topic
  • --describe: 查看topic详情信息
    • --topic: 指定topic查看详情信息
    • --exclude-internal: 排除内部topic(consumer_offset_topic)
    • --unavailable-partitions: 仅显示leader不可用的分区(在集群异常时快速排查受影响的分区)
    • --under-min-isr-partitions: 仅显示isr小于配置的min-isr-partitions的分区
    • --under-replicated-partitions: 仅显示不同步的分区

注意: 在低版本的kafka中使用–zookeeper来链接集群,高版本中基本都通过–bootstrap-server指定broker来连接集群

# 列出集群topic
# –list参数列出可用的topic
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –list __consumer_offsets …. ….
# 也可以使用–topic指定topic
# 如果指定topic不存在,将返回空
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –list –topic __consumer_offsets __consumer_offsets
# 列出topic详情信息
# –describe参数(同时可以使用–topic指定topic查看)
# –exclude-internal参数可以排除内部的topic(__consumer_offsets)
# –unavailable-partitions参数可以列出leader不可用的topic,在集群故障时快速查看受影响的topic
#
# 可以查看某个topic的分区和副本分布,以及topic级别的相关配置
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –describe –topic __consumer_offsets Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer,min.insync.replicas=1,cleanup.policy=compact,segment.bytes=104857600 Topic: __consumer_offsets Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1 …. ….
# 查看集群leader不可用的分区
# 可以发现leader都为-1,是因为副本和isr的id是4,而broker4其实已经光荣阵亡了
$ /opt/app/kafka_2.11-1.0.1/bin/kafka-topics.sh –zookeeper 172.16.217.38:2181/log-kafka –describe –unavailable-partitions … Topic: realtime_kafka.post_alg_real Partition: 2 Leader: -1 Replicas: 4 Isr: 4 Topic: realtime_kafka.post_alg_real Partition: 11 Leader: -1 Replicas: 4 Isr: 4 Topic: realtime_kafka.post_alg_real Partition: 20 Leader: -1 Replicas: 4 Isr: 4 Topic: realtime_kafka.post_alg_real Partition: 29 Leader: -1 Replicas: 4 Isr: 4 Topic: rm-bp1d3u2p9v3l4da7c632 Partition: 1 Leader: -1 Replicas: 4 Isr: 4 Topic: rm-bp1udb05091x11q5x Partition: 1 Leader: -1 Replicas: 4 Isr: 4
# 查看副本不同步的分区详情
# 可以发现副本中有不同步的情况,是因为有副本所在的节点已经挂了,通常有一部分是因为资源或者网络原因未同步,还有就是如上述broker阵亡的情况
# 如果说–unavailable-partitions可以直接查看到受影响的topic,那么–under-replicated-partitions就可以查看可用性受影响的topic,因为当副本为2时,此时broker4阵亡的前提下,topic下的分区是无法保证高可用的
$ /opt/app/kafka_2.11-1.0.1/bin/kafka-topics.sh –zookeeper 172.16.217.38:2181/log-kafka –describe –under-replicated-partitions … … Topic: androidregister Partition: 1 Leader: 2 Replicas: 4,2 Isr: 2 Topic: eventjsonlog Partition: 11 Leader: 2 Replicas: 4,2 Isr: 2 Topic: eventjsonlog Partition: 27 Leader: 2 Replicas: 2,4 Isr: 2 Topic: eventjsonlog Partition: 38 Leader: 13 Replicas: 13,4 Isr: 13 Topic: eventjsonlog Partition: 44 Leader: 10 Replicas: 4,10 Isr: 10 Topic: eventjsonlog Partition: 52 Leader: 12 Replicas: 12,4 Isr: 12 Topic: eventjsonlog Partition: 59 Leader: 11 Replicas: 4,11 Isr: 11 Topic: eventlog Partition: 8 Leader: 16 Replicas: 16,4 Isr: 16 Topic: eventlog Partition: 23 Leader: 3 Replicas: 4,3 Isr: 3 Topic: eventlog Partition: 38 Leader: 13 Replicas: 13,4 Isr: 13 Topic: eventlog Partition: 44 Leader: 10 Replicas: 4,10 Isr: 10 Topic: eventlog Partition: 52 Leader: 12 Replicas: 12,4 Isr: 12 Topic: eventlog Partition: 59 Leader: 11 Replicas: 4,11 Isr: 11 Topic: pusheventlog Partition: 9 Leader: 1 Replicas: 1,4 Isr: 1 Topic: pusheventlog Partition: 12 Leader: 7 Replicas: 4,7 Isr: 7

删除

  • --delete: 指定topic删除

注意: 在删除Topic时会受delete.topic.enable参数的影响,如果为true,则topic直接删除,如果为false,删除仅是标记删除,即在topic的config中增加一个删除标记MarkedForDeletion:true,待broker重启后完全删除(也可通过zk中的数据删除)

# 如果delete.topic.enable=true,则直接删除掉
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –delete –topic bgbiao.top

注意:

  • topic真正的元数据结构: /brokers/topics/topic-name,删除这个即删除
  • 标记删除的topic元数据: /admin/delete_topics/topic-name,删除这个才算数据清理完成

1.2 kafka-reassign-partitions.sh

该脚本用于在副本之间移动topic的分区,也就是对副本进行重新分配,也是SRE在日常操作中会比较常用的脚本,需要注意的是,在做迁移时需要注意到当前集群的整体情况,毕竟在移动副本时,需要设计到新副本的数据同步,也会占用一定资源。

通常,将服务器添加到Kafka集群很容易,只需为它们分配一个惟一的brokerid,并在新服务器上启动Kafka。然而,这些新服务器不会自动分配任何数据分区,因此,除非将分区移动到它们,否则在创建新主题之前,它们不会做任何工作。因此,向集群中添加机器时,您会希望将一些现有数据迁移到这些机器上。

  • --broker-list: 指定分区需要重新分配到的broker节点,如果--topics-to-move-json-file参数被指定用来生成重分配配置时,必须制定该参数
  • --topics-to-move-json-file: 生成一个移动指定topic的分区到指定broker(--broker-list)的配置
  • --generate: 生成候选分区分配的配置,该参数仅会生成候选的分配方案,不会进行执行
  • --reassignment-json-file: 分区手动分配的配置参数,该参数可由generate参数生成,通常一般会进行微调
  • --replica-alter-log-dirs-throttle: 在相同的broker上日志目录之间的副本移动将被限流为该值bytes/sec,限流应该至少设置为1 KB/s,默认是-1表示不限制
  • --throttle: broker之间的分区移动可以使用该值进行限流(bytes/sec),同上
  • --execute: 通过指定--reassignment-json-file参数来执行重新分配
  • --verify: 如果一个重分配完成了,可以指定--reassignment-json-file参数来查看重分配的进度

topics-to-move-json-file文件示例:

{“topics”: [{“topic”: “foo”},{“topic”: “foo1”}], “version”:1 }

reassignment-json-file文件示例:

# 注意:log_dirs是可选参数,需要指定绝对路径,也可以指定为any,当指定后需要和replicas的长度相等 {“partitions”: [{“topic”: “foo”, “partition”: 1, “replicas”: [1,2,3], “log_dirs”: [“dir1″,”dir2″,”dir3”] }], “version”:1 }

分区重分配示例:

# 查看当前topic详情
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –describe –topic bgbiao.top
Topic: bgbiao.top PartitionCount: 3 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=1073741824 Topic: bgbiao.top Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: bgbiao.top Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: bgbiao.top Partition: 2 Leader: 2 Replicas: 2 Isr: 2
# 1.编辑move.json配置
$ cat move.json {“topics”: [{“topic”: “bgbiao.top”}], “version”:1 }
# 2.生成迁移配置 # 指定正确的zookeeper地址来获取分区的分布状态以及待迁移的状态
$ sh /opt/app/kafka/bin/kafka-reassign-partitions.sh –zookeeper 172.29.203.62:2181 –topics-to-move-json-file move.json –broker-list “1,2,3” –generate
Current partition replica assignment {“version”:1,”partitions”:[{“topic”:”bgbiao.top”,”partition”:2,”replicas”:[2],”log_dirs”:[“any”]},{“topic”:”bgbiao.top”,”partition”:1,”replicas”:[1],”log_dirs”:[“any”]},{“topic”:”bgbiao.top”,”partition”:0,”replicas”:[3],”log_dirs”:[“any”]}]} Proposed partition reassignment configuration {“version”:1,”partitions”:[{“topic”:”bgbiao.top”,”partition”:0,”replicas”:[1],”log_dirs”:[“any”]},{“topic”:”bgbiao.top”,”partition”:2,”replicas”:[3],”log_dirs”:[“any”]},{“topic”:”bgbiao.top”,”partition”:1,”replicas”:[2],”log_dirs”:[“any”]}]}
# 编辑迁移配置
# 当前分区状态为:bgbiao.top-2的副本在broker2,bgbiao.top-1的副本在broker1,bgbiao.top-0的副本在broker3
# 迁移后的分区状态:bgbiao.top-2的副本在broker3,bgbiao.top-1的副本在broker2,bgbiao.top-0的副本在broker1
# 然后将期望的迁移配置保存下来做相关修改即可.
# 我们是想给某个分区增加副本,因此可以修改成如下配置
$ cat assignment.json { “partitions”: [ { “partition”: 0, “replicas”: [ 3,1,2 ], “topic”: “bgbiao.top” } ], “version”: 1 }
# 3.根据上述的迁移配置执行迁移
$ sh /opt/app/kafka/bin/kafka-reassign-partitions.sh –zookeeper 172.29.203.62:2181 –reassignment-json-file ./assignment.json –execute
Current partition replica assignment {“version”:1,”partitions”:[{“topic”:”bgbiao.top”,”partition”:2,”replicas”:[2],”log_dirs”:[“any”]},{“topic”:”bgbiao.top”,”partition”:1,”replicas”:[1],”log_dirs”:[“any”]},{“topic”:”bgbiao.top”,”partition”:0,”replicas”:[3],”log_dirs”:[“any”]}]} Save this to use as the –reassignment-json-file option during rollback Successfully started reassignment of partitions.
# 4.查看上述迁移的进度(将–execute参数改为–verify参数)
$ sh /opt/app/kafka/bin/kafka-reassign-partitions.sh –zookeeper 172.29.203.62:2181 –reassignment-json-file ./assignment.json –verify Status of partition reassignment: Reassignment of partition bgbiao.top-0 completed successfully
# 再次查看topix详情
# 如期望,partition-1增加了两个副本,该副本的整体可用性提高了3倍
$ sh /opt/app/kafka/bin/kafka-topics.sh –bootstrap-server 172.29.203.62:9092 –describe –topic bgbiao.top
Topic: bgbiao.top PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=1,segment.bytes=1073741824 Topic: bgbiao.top Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: bgbiao.top Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: bgbiao.top Partition: 2 Leader: 2 Replicas: 2 Isr: 2

注意: 该工具可以让集群部分节点故障后,及时将新部分的partition进行调整,以恢复partition的高可用性。同时能够让集群在扩展后,快速将已有topic的数据均衡的分布在新节点上,以实现整体负载的均衡。

1.3 kafka-log-dirs.sh

该脚本参数用于查看kafka各个broker节点以及topic的磁盘使用率情况

  • --describe: 查看topic和broker的磁盘使用情况
    • --broker-list: 指定查看的broker列表
    • --topic-list: 指定需要查看的topic列表
# 当指定topic后会从全部的broker中进行查找
$ sh /opt/app/kafka/bin/kafka-log-dirs.sh –bootstrap-server 172.29.203.62:9092 –describe –topic-list myapp-yum-log,myapp Querying brokers for log directories information Received log directory information from brokers 1,2,3 {“version”:1,”brokers”:[{“broker”:1,”logDirs”:[{“logDir”:”/opt/data/kafka/kafka-logs”,”error”:null,”partitions”:[{“partition”:”myapp-yum-log-2″,”size”:13291235,”offsetLag”:0,”isFuture”:false},{“partition”:”myapp-0″,”size”:0,”offsetLag”:0,”isFuture”:false}]}]},{“broker”:2,”logDirs”:[{“logDir”:”/opt/data/kafka/kafka-logs”,”error”:null,”partitions”:[{“partition”:”myapp-1″,”size”:0,”offsetLag”:0,”isFuture”:false},{“partition”:”myapp-yum-log-0″,”size”:13258726,”offsetLag”:0,”isFuture”:false}]}]},{“broker”:3,”logDirs”:[{“logDir”:”/opt/data/kafka/kafka-logs”,”error”:null,”partitions”:[{“partition”:”myapp-2″,”size”:0,”offsetLag”:0,”isFuture”:false},{“partition”:”myapp-yum-log-1″,”size”:13264869,”offsetLag”:0,”isFuture”:false}]}]}]}

结果是json串,可以看到myapp-yum-log-2在broker-1上占用了13291235字节,也就是12M.

1.4 kafka-leader-election.sh

用于一组 Topic 分区的 leader 重新分配,可以支持优先副本和非同步副本。一般用于指定topic的分区存在非预选副本或非同步副本的情况,对整个leader进行适当调整

  • --admin.config: 传给admin客户端的配置文件
  • --all-topic-partitions: 基于选举类型--election-type来对符合条件的topic进行选举
  • --bootstrap-server: 指定broker地址
  • --election-type: 指定选举类型[preferred,unclean],对优先副本进行选举或非同步副本进行选举
  • --partition: 指定需要选举的指定topic分区的id(和–topic一起使用)
  • --path-to-json-file: 使用json配置文件来保存重分配信息
# 比如我们发现topic的partition-1当前的leader不是优先副本(broker相对倾斜),可以使用如下配置进行leader重新选举
$ sh kafka-leader-election.sh –bootstrap-server 172.29.203.62:9092 –topic test-bgbiao-1 –partition 0 –election-type preferred Valid replica already elected for partitions {“partitions”: [{“topic”: “foo”, “partition”: 1}, {“topic”: “foobar”, “partition”: 2}] }

1.5 kafka-configs.sh

用来查看和修改kafka相关的配置信息,包含集群的动态配置,topic级别的动态配置等等

  • --all: 列出给定实体的全部配置文件(默认已经生效的全部参数,如果没有all仅对动态参数生效)
  • --entity-type: 实体类型[topics/clients/users/brokers/broker-loggers]
  • --entity-name: 实体名称[topic名称/client-id/user-name/broker-id]
  • --describe: 列出给定实体的配置文件
  • --force: 强制生效
  • --topic: 指定topic名称
  • --alter: 修改指定实体的配置文件 注意:当使用delete-config和add-config时必须使用--alter
  • --delete-config: 删除指定的配置”k1,k2″
  • --add-config: 给指定的实体增加配置(k=v,k2=[v1,v2,v3],k3=v3)

topic级别的动态参数

  • cleanup.policy: 清理策略
  • compression.type: 压缩类型(通常建议在produce端控制)
  • delete.retention.ms: 压缩日志的保留时间
  • flush.messages: 持久化message限制
  • flush.ms: 持久化频率
  • follower.replication.throttled.replicas: follower副本限流
  • leader.replication.throttled.replicas: leader副本限流
  • max.message.bytes: 最大的batch的message大小
  • message.downconversion.enable: message向下兼容
  • message.format.version: message格式版本
  • min.insync.replicas: 最小的ISR
  • retention.ms: 日志保留时间
  • retention.bytes: 日志保留大小(通常按照时间限制)
  • segment.bytes: segment的大小限制
  • segment.ms: segment的切割时间
  • unclean.leader.election.enable: 是否允许非同步副本选主(针对可用性设置的一个参数)

broker级别的动态参数

broker级别的动态参数比较多,这里只列举常用的几个

  • log.retention.ms: 日志保留时间
  • max.connections: 最大连接数
  • max.connections.per.ip: 每个ip的最大连接数
  • message.max.bytes: batch的message的最大限制
  • min.insync.replicas: 最小的ISR
  • num.io.threads: IO线程数(网络线程数的两倍)
  • num.network.threads: 网络线程数(cpu的2/3较好)
  • num.recovery.threads.per.data.dir: 每个数据目录的恢复线程
  • num.replica.fetchers: 副本的fetchers数量(默认为1,可适当调大)

user级别的参数

  • SCRAM-SHA-256:
  • SCRAM-SHA-512:
  • consumer_byte_rate: 针对消费者user进行限流
  • producer_byte_rate: 针对生产者进行限流
  • request_percentage: 请求百分比

clients级别参数

  • consumer_byte_rate: 针对消费者user进行限流
  • producer_byte_rate: 针对生产者进行限流
  • request_percentage: 请求百分比
# 修改topic的数据保留时间
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –topic push-test –add-config retention.ms=10000000 –alter
# 查看topic的动态参数配置
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –topic push-test –describe Dynamic configs for topic push-test are: retention.ms=10000000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000000}
# 删除topic动态参数
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –topic push-test –alter –delete-config retention.ms Completed updating config for topic push-test.
# 查看broker全部的参数(–all会获取全部的参数)
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –all –broker-defaults –describe Default configs for brokers in the cluster are:
# 也可以使用如下参数查看broker的全部参数(动态的和默认的参数)
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –all –entity-type brokers –entity-name 1 –describe
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –all –broker 1 –describe
# broker的动态参数(去除了–all之后,会发现列出的是动态的配置,默认broker是没有动态参数调整的)
$ sh kafka-configs.sh –bootstrap-server 172.29.203.62:9092 –broker 1 –describe Dynamic configs for broker 1 are:
# user和client也是类似的

1.6 kafka-broker-api-versions.sh

查看kafka对外的各个api版本.

# 查看当前kafka版本
$ sh kafka-broker-api-versions.sh –bootstrap-server 172.29.203.62:9092 –version 2.5.0 (Commit:66563e712b0b9f84)
# 查看集群所有节点的api版本
$ sh kafka-broker-api-versions.sh –bootstrap-server 172.29.203.62:9092 172.29.203.106:9092 (id: 2 rack: null) -> ( Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0] )

1.7 生产者和消费者工具

  • kafka-console-consumer.sh: 终端消费者工具
  • kafka-console-producer.sh: 终端生产者工具
# 指定topic从终端写入数据
$ sh /opt/app/kafka/bin/kafka-console-producer.sh –bootstrap-server 172.29.203.62:9092 –topic test-push >hello xxb >BGBiao >My website is https://bgbiao.top. >And >公众号: BGBiao
# 指定消费者组对topic进行消费
# –from-beginning表示从头开始消费
# 因为开始生产者刚开始生产并生产topic,导致topic无leader会有警告信息
# 可以看到client-id就是consumer-test-bgbiao-1,即consumer的前缀加消费组加数字后缀
$ sh kafka-console-consumer.sh –bootstrap-server 172.29.203.62:9092 –topic push-test –group test-bgbiao [2020-05-31 19:52:13,472] WARN [Consumer clientId=consumer-test-bgbiao-1, groupId=test-bgbiao] Error while fetching metadata with correlation id 2 : {test-push=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) hello xxb BGBiao My website is https://bgbiao.top. 公众号: BGBiao

1.8 kafka-consumer-groups.sh

消费组管理工具,可以列出所有的消费组,查看消费组详情,删除消费组信息以及重置消费组的offset

  • --all-groups: 应用到所有的消费组
  • --all-topics:
  • --delete: 删除topic分区的offset,以及拥有者和消费组信息(–group g1 –group g2)
  • --delete-offsets: 删除消费组的offset
  • --describe: 查看消费组信息以及消费者的offset lag
  • --execute: 指定操作,支持reset-offsets操作
  • --export: 导出操作执行到csv,支持reset-offsets
  • --from-file: 指定文件中指定的值重置offset (csv文件)
  • --group: 指定消费组
  • --list: 列出所有的消费组
  • --members: 查看消费组中的成员
  • --state: 查看消费组的状态
  • --offsets: 查看消费组并且列出每个消费组所有topic的分区以及消息的offset lag
  • --reset-offsets: 重置消费组的offset (需要指定如下一个参数)
    • --to-datetime:
    • --by-period:
    • --to-earliest:
    • --to-latest:
    • --shift-by:
    • --from-file:
    • --to-current:
# 查看全部的消费者组
$ sh /opt/app/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 172.29.203.62:9092 –all-grou –list KMOffsetCache-daf27df49ede test-bgbiao
# 查看消费者详情
$ sh /opt/app/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 172.29.203.62:9092 –group test-bgbiao –describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test-bgbiao test-push 0 1 1 0 consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1 test-bgbiao test-push 1 2 2 0 consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1 test-bgbiao test-push 2 2 2 0 consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1
# 查看消费组的成员
$ sh /opt/app/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 172.29.203.62:9092 –group test-bgbiao –members –describe GROUP CONSUMER-ID HOST CLIENT-ID
#PARTITIONS test-bgbiao consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1 3
# 查看消费组状态
$ sh /opt/app/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 172.29.203.62:9092 –group test-bgbiao –state –describe GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE
#MEMBERS test-bgbiao 172.29.203.62:9092 (1) range Stable 1
# 查看消费组的offset信息
$ sh /opt/app/kafka/bin/kafka-consumer-groups.sh –bootstrap-server 172.29.203.62:9092 –group test-bgbiao –offsets –describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test-bgbiao test-push 0 1 1 0 consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1 test-bgbiao test-push 1 2 2 0 consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1 test-bgbiao test-push 2 2 2 0 consumer-test-bgbiao-1-a915f28c-3ee0-46ee-a8d8-6c93bddb7686 /172.29.203.62 consumer-test-bgbiao-1 test-bgbiao test-full-push 2 61392 61430 38 – – – test-bgbiao test-full-push 1 61056 61088 32 – – – test-bgbiao test-full-push 0 61299 61337 38 –

2. kafka-manager

四、集群监控

文章转载请说明出处:八零岁月 » kafka生产规划和运维

分享到:更多 ()

吐槽集中营 抢沙发

评论前必须登录!