(十三)处理暂存状态的响应消息
发布日期:2021-11-18 17:47:35 浏览次数:9 分类:技术文章

本文共 16261 字,大约阅读时间需要 54 分钟。

文章目录


追踪Poll

从poll里面进入slector的poll调用addToStagedReceives()进行消息处理(把接收的数据加入到待处理队列);

private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
//channel代表的就是一个网络的连接,一台kafka的主机就对应了一个channel连接。 if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque
()); Deque
deque = stagedReceives.get(channel); //往队列里面存放接受到响应 deque.add(receive); }

相应数据结构

public class Selector implements Selectable {
public static final long NO_IDLE_TIMEOUT_MS = -1; private static final Logger log = LoggerFactory.getLogger(Selector.class); //这个对象就是javaNIO里面的Selector //Selector是负责网络的建立,发送网络请求,处理实际的网络IO。 //所以他是最最核心的这么样的一个组件。 private final java.nio.channels.Selector nioSelector; //broker 和 KafkaChannel的映射 //这儿的kafkaChannel大家暂时可以理解为就是SocketChannel //代表的就是一个网络连接。 private final Map
channels; //已经完成发送的请求 private final List
completedSends; //已经接收到的,并且处理完了的响应。 private final List
completedReceives; //已经接收到了,但是还没来得及处理的响应。 //一个连接,对应一个响应队列 private final Map
> stagedReceives; private final Set
immediatelyConnectedKeys; //没有建立连接的主机 private final List
disconnected; //完成建立连接的主机 private final List
connected; //建立连接失败的主机。 private final List
failedSends;。。。。。。。}

在这里插入图片描述

selector的poll()

public void poll(long timeout) throws IOException {
if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); //从Selector上找到有多少个key注册了 int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); //因为我们用场景驱动的方式 //我们刚刚确实是注册了一个key if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//立马就要对这个Selector上面的key要进行处理。 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } //TODO 对stagedReceives里面的数据要进行处理 addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }

addToCompletedReceives(),把响应存入到completedReceives

private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator
>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) {
Map.Entry
> entry = iter.next(); KafkaChannel channel = entry.getKey(); if (!channel.isMute()) {
Deque
deque = entry.getValue(); //获取到响应 NetworkReceive networkReceive = deque.poll(); //把响应存入到completedReceives 数据结构里面 this.completedReceives.add(networkReceive); this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); if (deque.isEmpty()) iter.remove(); } } } }

其实真正处理响应是在 Networkclient的poll,步骤三

public List
poll(long timeout, long now) {
/** * 在这个方法里面有涉及到kafka的网络的方法,但是 * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候 * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据 * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码 * 的时候,其实代码就比较简单了。 */ //步骤一:封装了一个要拉取元数据请求 long metadataTimeout = metadataUpdater.maybeUpdate(now); try {
//步骤二: 发送请求,进行复杂的网络操作 //但是我们目前还没有学习到kafka的网络 //所以这儿大家就只需要知道这儿会发送网络请求。 //TODO 执行网络IO的操作。 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) {
log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List
responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); //步骤三:处理响应,响应里面就会有我们需要的元数据。 /** * 这个地方是我们在看生产者是如何获取元数据的时候,看的。 * 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。 * 获取元数据 -》 判断网络连接是否建立好 -》 建立网络连接 * -》 发送请求(获取元数据的请求) -》 服务端发送回来响应(带了集群的元数据信息) * */ handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); //处理超时的请求 handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
//调用的响应的里面的我们之前发送出去的请求的回调函数 //看到了这儿,我们回头再去看一下 //我们当时发送请求的时候,是如何封装这个请求。 //不过虽然目前我们还没看到,但是我们可以大胆猜一下。 //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。 response.request().callback().onComplete(response); } catch (Exception e) {
log.error("Uncaught error in request completion:", e); } } } return responses; }

调用 handleCompletedReceives()

private void handleCompletedReceives(List
responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
//获取broker id String source = receive.source(); /** * kafka 有这样的一个机制:每个连接可以容忍5个发送出去了,但是还没接收到响应的请求。 */ //从数据结构里面移除已经接收到响应的请求。 //把之前存入进去的请求也获取到了 ClientRequest req = inFlightRequests.completeNext(source); //解析服务端发送回来的请求(里面有响应的结果数据) Struct body = parseResponse(receive.payload(), req.request().header()); //TODO 如果是关于元数据信息的响应 if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) //解析完了以后就把封装成一个一个的cilentResponse //body 存储的是响应的内容 //req 发送出去的那个请求信息 responses.add(new ClientResponse(req, now, false, body)); } }

又把请求响应封装存入到response

在poll()里面的后面代码进行处理,其实是用请求的回调函数进行处理

// invoke callbacks        for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
//调用的响应的里面的我们之前发送出去的请求的回调函数 //看到了这儿,我们回头再去看一下 //我们当时发送请求的时候,是如何封装这个请求。 //不过虽然目前我们还没看到,但是我们可以大胆猜一下。 //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。 response.request().callback().onComplete(response); } catch (Exception e) {
log.error("Uncaught error in request completion:", e); } } }

那我们看看回调函数在哪里封装

sender类里的run方法

List
requests = createProduceRequests(batches, now);
private List
createProduceRequests(Map
> collated, long now) {
List
requests = new ArrayList
(collated.size()); for (Map.Entry
> entry : collated.entrySet()) requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; }
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List
batches) {
Map
produceRecordsByPartition = new HashMap
(batches.size()); final Map
recordsByPartition = new HashMap
(batches.size()); for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition; produceRecordsByPartition.put(tp, batch.records.buffer()); recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
//回调函数要是被调用 //其实就是这个方法被执行了。 handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; //这个就是我们封装出来的请求,里面果然带了一个回调函数 return new ClientRequest(now, acks != 0, send, callback); }

本质callback就是这个函数

private void handleProduceResponse(ClientResponse response, Map
batches, long now) {
int correlationId = response.request().request().header().correlationId(); //这个地方就是就是一个特殊情况 //我们要发送请求了,但是发现 broker失去连接。 //不过这个是一个小概率事件。 if (response.wasDisconnected()) {
log.trace("Cancelled request {} due to node {} being disconnected", response, response.request() .request() .destination()); for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now); } else {
//正常情况下,都是走的这儿。 log.trace("Received produce response from node {} with correlation id {}", response.request().request().destination(), correlationId); // if we have a response, parse it //所以我们正常情况下,走的都是这个分支,代表ack非0 if (response.hasResponse()) {
ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); /** * 遍历每个分区的响应 */ for (Map.Entry
entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); //如果处理成功那就是成功了,但是如果服务端那儿处理失败了 //是不是也要给我们发送回来异常的信息。 //error 这个里面存储的就是服务端发送回来的异常码 Errors error = Errors.forCode(partResp.errorCode); //获取到当前分区的响应。 RecordBatch batch = batches.get(tp); //对响应进行处理 completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); this.sensors.recordThrottleTime(response.request().request().destination(), produceResponse.getThrottleTime()); } else {
// this is the acks = 0 case, just complete all requests //acks=0意思就是不需要返回响应。 //1 p -> b leader partion //-1 p -> broker leader partition -> follower partition //在生产环境里面,我们一般是不会把acks 参数设置为0 for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now); } } }

对响应进行处理

private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
//如果响应里面带有异常 并且 这个请求是可以重试的 if (error != Errors.NONE && canRetry(batch, error)) {
// retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts - 1, error); //重新把发送失败等着批次 加入到队列里面。 this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else {
//这儿过来的数据:带有异常,但是不可以重试(1:压根就不让重试2:重试次数超了) //其余的都走这个分支。 RuntimeException exception; //如果响应里面带有 没有权限的异常 if (error == Errors.TOPIC_AUTHORIZATION_FAILED) //自己封装一个异常信息(自定义了异常) exception = new TopicAuthorizationException(batch.topicPartition.topic()); else // exception = error.exception(); // tell the user the result of their request //TODO 核心代码 把异常的信息也给带过去了 //我们刚刚看的就是这儿的代码 //里面调用了用户传进来的回调函数 //回调函数调用了以后 //说明我们的一个完整的消息的发送流程就结束了。 batch.done(baseOffset, timestamp, exception); //看起来这个代码就是要回收资源的。 this.accumulator.deallocate(batch); if (error != Errors.NONE) this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) {
if (error.exception() instanceof UnknownTopicOrPartitionException) log.warn("Received unknown topic or partition error in produce request on partition {}. The " + "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition); metadata.requestUpdate(); } // Unmute the completed partition. if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition); }

调用处理异常的核心代码

public void done(long baseOffset, long timestamp, RuntimeException exception) {
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, exception); // execute callbacks /** * * 我们发送数据的时候,一条消息就代表一个thunk * 遍历所以我们当时发送出去消息。 */ for (int i = 0; i < this.thunks.size(); i++) {
try {
Thunk thunk = this.thunks.get(i); //如果没有异常 if (exception == null) {
// If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used. RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset(), timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp, thunk.future.checksum(), thunk.future.serializedKeySize(), thunk.future.serializedValueSize()); //调用我们发送的消息的回调函数 //大家还记不记得我们在发送数据的时候 //还不是绑定了一个回调函数。 //这儿说的调用的回调函数 //就是我们开发,生产者代码的时候,我们用户传进去的那个 //回调函数。 thunk.callback.onCompletion(metadata, null);//带过去的就是没有异常 //也就是说我们生产者那儿的代码,捕获异常的时候就是发现没有异常。 } else {
//如果有异常就会把异常传给回调函数。 //由我们用户自己去捕获这个异常。 //然后对这个异常进行处理 //大家根据自己公司的业务规则进行处理就可以了。 //如果走这个分支的话,我们的用户的代码是可以捕获到timeoutexception //这个异常,如果用户捕获到了,做对应的处理就可以了。 thunk.callback.onCompletion(null, exception); } } catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } this.produceFuture.done(topicPartition, baseOffset, exception); }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112380961 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(十四)消息发送完以后内存处理
下一篇:(十五)top10热门品类之需求回顾以及实现思路分析

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年04月21日 13时55分13秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章