(十六)处理超时批次
发布日期:2021-11-18 17:47:36
浏览次数:7
分类:技术文章
本文共 6825 字,大约阅读时间需要 22 分钟。
文章目录
前言
sender的run
/** * 步骤六: * 对超时的批次是如何处理的? * */ ListexpiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);
public ListabortExpiredBatches(int requestTimeout, long now) { List expiredBatches = new ArrayList<>(); int count = 0; // for (Map.Entry > entry : this.batches.entrySet()) { //获取到每个分区的队列 -》 队列里面对应的批次 Deque dq = entry.getValue(); TopicPartition tp = entry.getKey(); // We only check if the batch should be expired if the partition does not have a batch in flight. // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. if (!muted.contains(tp)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); //迭代的看每个分区里面的每个批次 Iterator batchIterator = dq.iterator(); while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); boolean isFull = batch != lastBatch || batch.records.isFull(); // check if the batch is expired //判断一下是否超时 if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { //增加到超时的数据结构里面 expiredBatches.add(batch); count++; //从数据结构里面移除 batchIterator.remove(); //释放资源 deallocate(batch); } else { // Stop at the first batch that has not expired. break; } } } } } if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", count); return expiredBatches; }
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { boolean expire = false; String errorMessage = null; /** * requestTimeoutMs:代表的是请求发送的超时的时间。默认值是30s. * now:当前时间 * lastAppendTime:批次的创建的时间(上一次重试的时间) * now - this.lastAppendTime 大于30秒,说明批次超时了 还没发送出去。 */ if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) { expire = true; //记录异常信息 errorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; /** * lingerMs: 100ms,无论如何都要把消息发送出去的时间 * * createdMs:批次创建的时间 * * 已经大于30秒了。 说明也是超时了。 * */ } else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) { expire = true; errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time"; /** * 针对重试 * lastAttemptMs: 上一次重试的时间(批次创建的时间) * retryBackoffMs: 重试的时间间隔 * 说明也是超时了。 */ } else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) { expire = true; errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time"; } if (expire) { this.records.close(); //调用done方法 //方法里面传过去了一个TimeoutException的异常。(超时了) this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage)); } return expire; }
调用done方法,并传入TimeoutException
public void done(long baseOffset, long timestamp, RuntimeException exception) { log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); // execute callbacks /** * * 我们发送数据的时候,一条消息就代表一个thunk * 遍历所以我们当时发送出去消息。 */ for (int i = 0; i < this.thunks.size(); i++) { try { Thunk thunk = this.thunks.get(i); //如果没有异常 if (exception == null) { // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used. RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(), timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp, thunk.future.checksum(), thunk.future.serializedKeySize(), thunk.future.serializedValueSize()); //调用我们发送的消息的回调函数 //大家还记不记得我们在发送数据的时候 //还不是绑定了一个回调函数。 //这儿说的调用的回调函数 //就是我们开发,生产者代码的时候,我们用户传进去的那个 //回调函数。 thunk.callback.onCompletion(metadata, null);//带过去的就是没有异常 //也就是说我们生产者那儿的代码,捕获异常的时候就是发现没有异常。 } else { //如果有异常就会把异常传给回调函数。 //由我们用户自己去捕获这个异常。 //然后对这个异常进行处理 //大家根据自己公司的业务规则进行处理就可以了。 //如果走这个分支的话,我们的用户的代码是可以捕获到timeoutexception //这个异常,如果用户捕获到了,做对应的处理就可以了。 thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } this.produceFuture.done(topicPartition, baseOffset, exception); }
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112385570 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
表示我来过!
[***.240.166.169]2024年03月24日 13时07分41秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
android线程通信方式,Android 主线程和子线程通信问题
2019-04-21
cps1 cps2 android,图文教程:CPS1和CPS2模拟器使用
2019-04-21
在线设计 html5 表单,html5注册表单制作-表单制作-小程序表单制作
2019-04-21
android小闹钟课程设计,《小闹钟》教学设计
2019-04-21
mysql文件系统_MySQL文件系统先睹为快(1)
2019-04-21
jquery后台内容管理_教育平台项目后台管理系统:课程内容模块
2019-04-21
grouping函数 mysql_sql聚合函数有哪些
2019-04-21
java文档生成_Java文档自动生成
2019-04-21
java 共享目录_java 操作windows 共享目录方法介绍
2019-04-21
java 监控 宕机_JAVA监测tomcat是否宕机,控制重启
2019-04-21
catch that cow java_POJ3278——Catch That Cow
2019-04-21
java integer 不变模式_Java代码的变与不变
2019-04-21
java guava 使用_Java8-Guava实战示例
2019-04-21
java线程占用CPU_在windows下揪出java程序占用cpu很高的线程并完美解决
2019-04-21