
本文共 2494 字,大约阅读时间需要 8 分钟。
Kafka Broker默认的消息保留策略是:要么保留一定时间,要么保留到消息达到一定大小的字节数。
当消息达到设置的条件上限时,旧消息就会过期并被删除,所以,在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。
topic可以配置自己的保留策略,可以将消息保留到不再使用他们为止。
因为在一个大文件里查找和删除消息是很费时的事,也容易出错,所以,分区被划分为若干个片段。默认情况下,每个片段包含1G或者一周的数据,以较小的那个为准。在broker往leader分区写入消息时,如果达到片段上限,就关闭当前文件,并打开一个新文件。当前正在写入数据的片段叫活跃片段。当所有片段都被写满时,会清除下一个分区片段的数据,如果配置的是7个片段,每天打开一个新片段,就会删除一个最老的片段,循环使用所有片段。
日志清理:
Kafka提供两种日志清理策略,删除和压缩。通过参数cleanup.policy来指定清理策略。日志清理可以控制到主题级别,可以为不同主题创建不同的清理策略。
日志删除,它是一个定时任务在日志管理器启动后会启动它。默认为5分钟执行一次。kafka提供了基于日志保留时长的删除策略和日志大小的策略,默认是168小时,也就是7天,日志被保留7天之后将会删除。默认不设置日志大小。日志保留时长不是通过日志文件的最后修改时间来确定的,它是基于时间戳索引进行的。
日志压缩
分段策略属性
属性名 | 含义 | 默认值 |
---|---|---|
log.roll.{hours,ms} | 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment | 168(7day) |
log.segment.bytes | 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment | 1G(-1为不限制) |
log.retention.check.interval.ms | 日志片段文件检查的周期时间 | 60000 |
日志刷新策略
Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。
属性名 | 含义 | 默认值 |
---|---|---|
log.flush.interval.messages | 消息达到多少条时将数据写入到日志文件 | 10000 |
log.flush.interval.ms | 当达到该时间时,强制执行一次flush | null |
log.flush.scheduler.interval.ms | 周期性检查,是否需要将信息flush | 很大的值 |
日志保存清理策略
属性名 | 含义 | 默认值 |
---|---|---|
log.cleanup.polict | 日志清理保存的策略只有delete和compact两种 | delete |
log.retention.hours | 日志保存的时间,可以选择hours,minutes和ms | 168(7day) |
log.retention.bytes | 删除前日志文件允许保存的最大值 | -1 |
log.segment.delete.delay.ms | 日志文件被真正删除前的保留时间 | 60000 |
log.cleanup.interval.mins | 每隔一段时间多久调用一次清理的步骤 | 10 |
log.retention.check.interval.ms | 周期性检查是否有日志符合删除的条件(新版本使用) | 300000 |
这里特别说明一下,日志的真正清楚时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。
参考:https://www.cnblogs.com/angellst/p/9368493.html
========================================================================================
Kafka将数据持久化到了硬盘上,允许你配置一定的策略对数据清理,清理的策略有两个,删除和压缩。
数据清理的方式
删除
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:
清理超过指定时间清理:
log.retention.hours=16
超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824
为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
压缩
将数据压缩,只保留每个key最后一个版本的数据。
首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。
在topic的配置中设置log.cleanup.policy=compact启用压缩策略。
如上图,在整个数据流中,每个Key都有可能出现多次,压缩时将根据Key将消息聚合,只保留最后一次出现时的数据。这样,无论什么时候消费消息,都能拿到每个Key的最新版本的数据。
压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,比如,当试图获取offset为5的消息时,实际上会拿到offset为6的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。
原文参考:https://blog.csdn.net/honglei915/article/details/49683065
发表评论
最新留言
关于作者
