kafka0.8生产者异常处理
发布日期:2021-09-01 18:44:28 浏览次数:11 分类:技术文章

本文共 12792 字,大约阅读时间需要 42 分钟。

  hot3.png

本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。

概况

输入图片说明

kafka的java producer的发送是异步,主要的分几步:

  • append到RecordAccumulator
  • sender从RecordAccumulator取出RecordBatch,交给client去发送
  • NetworkClient跟broker打交道,把RecordBatch发送出去

这里就涉及到了几个步骤的异常,append的时候,会抛异常,对于ApiException则放到callback里头去,其他异常直接抛出来(callback仅仅是跟RecordAccumulator打交道这一层)

sender中run方法直接捕获log出来。

具体跟network打交道的时候,请求失败(网络链接失败或是broker返回异常),则会根据重试次数重新入队。

append

kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/producer/KafkaProducer.java

public Future
send(ProducerRecord
record, Callback callback) { try { // first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } ProducerRecord
serializedRecord = new ProducerRecord
(record.topic(), record.partition(), serializedKey, serializedValue); int partition = partitioner.partition(serializedRecord, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // Handling exceptions and record the errors; // For API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); throw new KafkaException(e); } catch (KafkaException e) { this.errors.record(); throw e; } }

Handling exceptions and record the errors; For API exceptions return them in the future, for other exceptions throw directly. 对于

sender

kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/producer/internals/Sender.java

  • 线程的run方法
/**     * The main run loop for the sender thread     */    public void run() {        log.debug("Starting Kafka producer I/O thread.");        // main loop, runs until close is called        while (running) {            try {                run(time.milliseconds());            } catch (Exception e) {                log.error("Uncaught error in kafka producer I/O thread: ", e);            }        }        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");        // okay we stopped accepting requests but there may still be        // requests in the accumulator or waiting for acknowledgment,        // wait until these are completed.        while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {            try {                run(time.milliseconds());            } catch (Exception e) {                log.error("Uncaught error in kafka producer I/O thread: ", e);            }        }        this.client.close();        log.debug("Shutdown of Kafka producer I/O thread has completed.");    }
  • run(long)方法
/**     * Run a single iteration of sending     *      * @param now The current POSIX time in milliseconds     */    public void run(long now) {        Cluster cluster = metadata.fetch();        // get the list of partitions with data ready to send        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);        // if there are any partitions whose leaders are not known yet, force metadata update        if (result.unknownLeadersExist)            this.metadata.requestUpdate();        // remove any nodes we aren't ready to send to        Iterator
iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map
> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); List
requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; List
responses = this.client.poll(requests, pollTimeout, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); else handleResponse(response, now); } }

retries

/**     * Complete or retry the given batch of records.     * @param batch The record batch     * @param error The error (or null if none)     * @param baseOffset The base offset assigned to the records if successful     * @param correlationId The correlation id for the request     * @param now The current POSIX time stamp in milliseconds     */    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {        if (error != Errors.NONE && canRetry(batch, error)) {            // retry            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",                     correlationId,                     batch.topicPartition,                     this.retries - batch.attempts - 1,                     error);            this.accumulator.reenqueue(batch, now);            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);        } else {            // tell the user the result of their request            batch.done(baseOffset, error.exception());            this.accumulator.deallocate(batch);            if (error != Errors.NONE)                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);        }        if (error.exception() instanceof InvalidMetadataException)            metadata.requestUpdate();    }

对于有error的情况,将整个batch重新reenqueue handleDisconnect以及handleResponse都会调用这个方法

  • handleDisconnect
private void handleDisconnect(ClientResponse response, long now) {        log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination());        int correlation = response.request().request().header().correlationId();        @SuppressWarnings("unchecked")        Map
responseBatches = (Map
) response.request().attachment(); for (RecordBatch batch : responseBatches.values()) completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); }
  • handleResponse
/**     * Handle a produce response     */    private void handleResponse(ClientResponse response, long now) {        int correlationId = response.request().request().header().correlationId();        log.trace("Received produce response from node {} with correlation id {}",                  response.request().request().destination(),                  correlationId);        @SuppressWarnings("unchecked")        Map
batches = (Map
) response.request().attachment(); // if we have a response, parse it if (response.hasResponse()) { ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); for (Map.Entry
entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); Errors error = Errors.forCode(partResp.errorCode); RecordBatch batch = batches.get(tp); completeBatch(batch, error, partResp.baseOffset, correlationId, now); } this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); } else { // this is the acks = 0 case, just complete all requests for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NONE, -1L, correlationId, now); } }

NetworkClient.poll

kafka-clients-0.8.2.2-sources.jar!/org/apache/kafka/clients/NetworkClient.java

public List
poll(List
requests, long timeout, long now) { List
sends = new ArrayList
(); for (int i = 0; i < requests.size(); i++) { ClientRequest request = requests.get(i); int nodeId = request.request().destination(); if (!isSendable(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); this.inFlightRequests.add(request); sends.add(request.request()); } // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0); // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); if (!this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); // do the I/O try { this.selector.poll(Math.min(timeout, metadataTimeout), sends); } catch (IOException e) { log.error("Unexpected error during I/O in producer network thread", e); } List
responses = new ArrayList
(); handleCompletedSends(responses, now); handleCompletedReceives(responses, now); handleDisconnections(responses, now); handleConnections(); return responses; }

doc

  • (非常好)
  • (case study)
  • (可靠性分析透彻)

转载于:https://my.oschina.net/go4it/blog/1546394

转载地址:https://blog.csdn.net/weixin_34124651/article/details/91664426 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:socket缓冲区以及壅塞形式
下一篇:Linux新加硬盘分区并设置系统启动自动挂载的方法

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年04月18日 16时18分44秒