(五)生产者源码之分区选择
发布日期:2021-11-18 17:47:29
浏览次数:6
分类:技术文章
本文共 2285 字,大约阅读时间需要 7 分钟。
文章目录
前言
private int partition(ProducerRecordrecord, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { //如果你的这个消息已经分配了分区号,那直接就用这个分区号就可以了 //但是正常情况下,消息是没有分区号的。 Integer partition = record.partition(); return partition != null ? partition : //使用分区器进行选择合适的分区 partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
public class DefaultPartitioner implements Partitioner { //原子类,初始化的时候 给了一个随机数 private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); public void configure(Mapconfigs) { } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //首先获取到我们要发送消息的对应的topic的分区的信息 List partitions = cluster.partitionsForTopic(topic); //计算出来分区的总的个数 int numPartitions = partitions.size(); //策略一: 如果发送消息的时候,没有指定key if (keyBytes == null) { //这儿有一个计数器 //每次执行都会加一 int nextValue = counter.getAndIncrement(); //获取可用的分区数 List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { //计算我们要发送到哪个分区上。 //一个数如果对10进行取模, 0-9之间 //11 % 9 //12 % 9 //13 % 9 //实现一个轮训的效果,能达到负载均衡。 int part = Utils.toPositive(nextValue) % availablePartitions.size(); //根据这个值分配分区好。 return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { //策略二:这个地方就是指定了key // hash the keyBytes to choose a partition //直接对key取一个hash值 % 分区的总数取模 //如果是同一个key,计算出来的分区肯定是同一个分区。 //如果我们想要让消息能发送到同一个分区上面,那么我们就 //必须指定key. 这一点非常重要 //希望大家一定一定要知道。 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } public void close() { }}
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112318289 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年04月19日 16时59分39秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
SAP Data Intelligence上的Python Operator
2019-04-27
SAP API开发方法大全
2019-04-27
SAP Analytics Cloud里基于dimension的calculation
2019-04-27
SAP Data Intelligence Repository里的模型路径
2019-04-27
SAP Data Intelligence Graph使用浏览器访问的url规范
2019-04-27
SAP Data Intelligence API执行出错的排错之道
2019-04-27
SAP Data Intelligence Graph json源代码的结构分析
2019-04-27
使用类似搭积木的低代码开发方式进行SAP API开发
2019-04-27
SAP Commerce(SAP Hybris)学习资料汇总
2019-04-27
SAP云平台 Document Information Extraction服务测试
2019-04-27
SAP Analytics Cloud里显示在图表里的描述信息更改
2019-04-27
SAP Analytics Cloud里避免类型为个数的measure出现小数点
2019-04-27
SAP Commerce(原Hybris)的一些架构图,持续更新
2019-04-27
如何使用R语言在SAP Analytics Cloud里绘制各种统计图表
2019-04-27