(十六)处理超时批次
发布日期:2021-11-18 17:47:36 浏览次数:7 分类:技术文章

本文共 6825 字,大约阅读时间需要 22 分钟。

文章目录


前言

sender的run

/**         * 步骤六:         *  对超时的批次是如何处理的?         *         */        List
expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);
public List
abortExpiredBatches(int requestTimeout, long now) {
List
expiredBatches = new ArrayList<>(); int count = 0; // for (Map.Entry
> entry : this.batches.entrySet()) {
//获取到每个分区的队列 -》 队列里面对应的批次 Deque
dq = entry.getValue(); TopicPartition tp = entry.getKey(); // We only check if the batch should be expired if the partition does not have a batch in flight. // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. if (!muted.contains(tp)) {
synchronized (dq) {
// iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); //迭代的看每个分区里面的每个批次 Iterator
batchIterator = dq.iterator(); while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next(); boolean isFull = batch != lastBatch || batch.records.isFull(); // check if the batch is expired //判断一下是否超时 if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
//增加到超时的数据结构里面 expiredBatches.add(batch); count++; //从数据结构里面移除 batchIterator.remove(); //释放资源 deallocate(batch); } else {
// Stop at the first batch that has not expired. break; } } } } } if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", count); return expiredBatches; }
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
boolean expire = false; String errorMessage = null; /** * requestTimeoutMs:代表的是请求发送的超时的时间。默认值是30s. * now:当前时间 * lastAppendTime:批次的创建的时间(上一次重试的时间) * now - this.lastAppendTime 大于30秒,说明批次超时了 还没发送出去。 */ if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) {
expire = true; //记录异常信息 errorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; /** * lingerMs: 100ms,无论如何都要把消息发送出去的时间 * * createdMs:批次创建的时间 * * 已经大于30秒了。 说明也是超时了。 * */ } else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) {
expire = true; errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch creation plus linger time"; /** * 针对重试 * lastAttemptMs: 上一次重试的时间(批次创建的时间) * retryBackoffMs: 重试的时间间隔 * 说明也是超时了。 */ } else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) {
expire = true; errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed since last attempt plus backoff time"; } if (expire) {
this.records.close(); //调用done方法 //方法里面传过去了一个TimeoutException的异常。(超时了) this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + " due to " + errorMessage)); } return expire; }

调用done方法,并传入TimeoutException

public void done(long baseOffset, long timestamp, RuntimeException exception) {
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); // execute callbacks /** * * 我们发送数据的时候,一条消息就代表一个thunk * 遍历所以我们当时发送出去消息。 */ for (int i = 0; i < this.thunks.size(); i++) {
try {
Thunk thunk = this.thunks.get(i); //如果没有异常 if (exception == null) {
// If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used. RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(), timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp, thunk.future.checksum(), thunk.future.serializedKeySize(), thunk.future.serializedValueSize()); //调用我们发送的消息的回调函数 //大家还记不记得我们在发送数据的时候 //还不是绑定了一个回调函数。 //这儿说的调用的回调函数 //就是我们开发,生产者代码的时候,我们用户传进去的那个 //回调函数。 thunk.callback.onCompletion(metadata, null);//带过去的就是没有异常 //也就是说我们生产者那儿的代码,捕获异常的时候就是发现没有异常。 } else {
//如果有异常就会把异常传给回调函数。 //由我们用户自己去捕获这个异常。 //然后对这个异常进行处理 //大家根据自己公司的业务规则进行处理就可以了。 //如果走这个分支的话,我们的用户的代码是可以捕获到timeoutexception //这个异常,如果用户捕获到了,做对应的处理就可以了。 thunk.callback.onCompletion(null, exception); } } catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } this.produceFuture.done(topicPartition, baseOffset, exception); }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112385570 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(十七)处理长时间没响应的请求
下一篇:(十五)处理有异常消息

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年03月24日 13时07分41秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

android线程通信方式,Android 主线程和子线程通信问题 2019-04-21
cps1 cps2 android,图文教程:CPS1和CPS2模拟器使用 2019-04-21
在线设计 html5 表单,html5注册表单制作-表单制作-小程序表单制作 2019-04-21
android小闹钟课程设计,《小闹钟》教学设计 2019-04-21
mysql文件系统_MySQL文件系统先睹为快(1) 2019-04-21
nums在python_程序找到一对(i,j),其中nums [i] + nums [j] +(i -j)在Python中最大化?... 2019-04-21
jquery后台内容管理_教育平台项目后台管理系统:课程内容模块 2019-04-21
grouping函数 mysql_sql聚合函数有哪些 2019-04-21
python os.walk如何不遍历隐藏文件_python 获取文件下所有文件或目录os.walk()的实例... 2019-04-21
python 股票估值_【中金固收·固收+】隐藏价值的角落:限售股AAP估值及Python实现方法(上)... 2019-04-21
java文档生成_Java文档自动生成 2019-04-21
java 共享目录_java 操作windows 共享目录方法介绍 2019-04-21
java 监控 宕机_JAVA监测tomcat是否宕机,控制重启 2019-04-21
catch that cow java_POJ3278——Catch That Cow 2019-04-21
java integer 不变模式_Java代码的变与不变 2019-04-21
java guava 使用_Java8-Guava实战示例 2019-04-21
python barrier option pricing_《Python金融数据分析》书内代码实战与讲解(二)金融衍生物定价... 2019-04-21
java自带工具_深入了解Java JDK自带工具,包括javac、jar、jstack等,实用~ 2019-04-21
gnome mysql client_解决MySQLWorkbenchgnome-keyring-daemon错误的方法分享 2019-04-21
java线程占用CPU_在windows下揪出java程序占用cpu很高的线程并完美解决 2019-04-21