Kafka消息保留-清理策略
发布日期:2021-05-06 17:45:26 浏览次数:23 分类:原创文章

本文共 2494 字,大约阅读时间需要 8 分钟。

Kafka Broker默认的消息保留策略是:要么保留一定时间,要么保留到消息达到一定大小的字节数。

当消息达到设置的条件上限时,旧消息就会过期并被删除,所以,在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。

topic可以配置自己的保留策略,可以将消息保留到不再使用他们为止。

在这里插入图片描述

因为在一个大文件里查找和删除消息是很费时的事,也容易出错,所以,分区被划分为若干个片段。默认情况下,每个片段包含1G或者一周的数据,以较小的那个为准。在broker往leader分区写入消息时,如果达到片段上限,就关闭当前文件,并打开一个新文件。当前正在写入数据的片段叫活跃片段。当所有片段都被写满时,会清除下一个分区片段的数据,如果配置的是7个片段,每天打开一个新片段,就会删除一个最老的片段,循环使用所有片段。

日志清理:

Kafka提供两种日志清理策略,删除和压缩。通过参数cleanup.policy来指定清理策略。日志清理可以控制到主题级别,可以为不同主题创建不同的清理策略。

日志删除,它是一个定时任务在日志管理器启动后会启动它。默认为5分钟执行一次。kafka提供了基于日志保留时长的删除策略和日志大小的策略,默认是168小时,也就是7天,日志被保留7天之后将会删除。默认不设置日志大小。日志保留时长不是通过日志文件的最后修改时间来确定的,它是基于时间戳索引进行的。

日志压缩

分段策略属性

属性名 含义 默认值
log.roll.{hours,ms} 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment 168(7day)
log.segment.bytes 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment 1G(-1为不限制)
log.retention.check.interval.ms 日志片段文件检查的周期时间 60000

日志刷新策略

Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。

属性名 含义 默认值
log.flush.interval.messages 消息达到多少条时将数据写入到日志文件 10000
log.flush.interval.ms 当达到该时间时,强制执行一次flush null
log.flush.scheduler.interval.ms 周期性检查,是否需要将信息flush 很大的值

日志保存清理策略

属性名 含义 默认值
log.cleanup.polict 日志清理保存的策略只有delete和compact两种 delete
log.retention.hours 日志保存的时间,可以选择hours,minutes和ms 168(7day)
log.retention.bytes 删除前日志文件允许保存的最大值 -1
log.segment.delete.delay.ms 日志文件被真正删除前的保留时间 60000
log.cleanup.interval.mins 每隔一段时间多久调用一次清理的步骤 10
log.retention.check.interval.ms 周期性检查是否有日志符合删除的条件(新版本使用) 300000

这里特别说明一下,日志的真正清楚时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。

参考:https://www.cnblogs.com/angellst/p/9368493.html

========================================================================================

Kafka将数据持久化到了硬盘上,允许你配置一定的策略对数据清理,清理的策略有两个,删除和压缩。

数据清理的方式

删除

log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:
清理超过指定时间清理:
log.retention.hours=16
超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824
为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

压缩

将数据压缩,只保留每个key最后一个版本的数据。
首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。
在topic的配置中设置log.cleanup.policy=compact启用压缩策略。

img

如上图,在整个数据流中,每个Key都有可能出现多次,压缩时将根据Key将消息聚合,只保留最后一次出现时的数据。这样,无论什么时候消费消息,都能拿到每个Key的最新版本的数据。
压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,比如,当试图获取offset为5的消息时,实际上会拿到offset为6的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。

原文参考:https://blog.csdn.net/honglei915/article/details/49683065

上一篇:线程池体系(二)-- ForkJoinPool
下一篇:StampedLock实现原理

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年04月12日 07时20分41秒