long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed;
// 发送 metadata 请求,直到获取了这个 topic 的 metadata 或者请求超时 do { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate();// 返回当前版本号,初始值为0,每次更新时会自增,并将 needUpdate 设置为 true sender.wakeup();// 唤起 sender,发送 metadata 请求 try { metadata.awaitUpdate(version, remainingWaitMs);// 等待 metadata 的更新 } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exceptionwith remainingWaitMs throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");// 超时 if (cluster.unauthorizedTopics().contains(topic))// 认证失败,对当前 topic 没有 Write 权限 throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null);// 不停循环,直到 partitionsCount 不为 null(即直到 metadata 中已经包含了这个 topic 的相关信息)
if (partition != null && partition >= partitionsCount) { throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); }
returnnew ClusterAndWaitTime(cluster, elapsed); }
如果 metadata 中不存在这个 topic 的 metadata,那么就请求更新 metadata,如果 metadata 没有更新的话,方法就一直处在 do ... while 的循环之中,在循环之中,主要做以下操作:
// 更新 metadata 信息(根据当前 version 值来判断) publicsynchronizedvoidawaitUpdate(finalint lastVersion, finallong maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { thrownewIllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) {// 不断循环,直到 metadata 更新成功,version 自增 if (remainingWaitMs != 0) wait(remainingWaitMs);// 阻塞线程,等待 metadata 的更新 long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs)// timeout thrownewTimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } }
在 Metadata.awaitUpdate() 方法中,线程会阻塞在 while 循环中,直到 metadata 更新成功或者 timeout。
从前面可以看出,此时 Producer 线程会阻塞在两个 while 循环中,直到 metadata 信息更新,那么 metadata 是如何更新的呢?如果有印象的话,前面应该已经介绍过了,主要是通过 sender.wakeup()来唤醒 sender 线程,间接唤醒 NetworkClient 线程,NetworkClient 线程来负责发送 Metadata 请求,并处理 Server 端的响应。
Nodenode= leastLoadedNode(now);// 选择一个连接数最小的节点 if (node == null) { log.debug("Give up sending metadata request since no node is available"); return reconnectBackoffMs; }
/** * Add a metadata request to the list of sends if we can make one */ // 判断是否可以发送请求,可以的话将 metadata 请求加入到发送列表中 privatelongmaybeUpdate(long now, Node node) { StringnodeConnectionId= node.idString();
// If there's any connection establishment underway, wait until it completes. This prevents // the client from unnecessarily connecting to additional nodes while a previous connection // attempt has not been completed. if (isAnyNodeConnecting()) {// 如果 client 正在与任何一个 node 的连接状态是 connecting,那么就进行等待 // Strictly the timeout we should return here is "connect timeout", but as we don't // have such application level configuration, using reconnect backoff instead. return reconnectBackoffMs; }
if (connectionStates.canConnect(nodeConnectionId, now)) {// 如果没有连接这个 node,那就初始化连接 // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); initiateConnect(node, now);// 初始化连接 return reconnectBackoffMs; } return Long.MAX_VALUE; }
// 处理任何已经完成的接收响应 private void handleCompletedReceives(List<ClientResponse> responses, longnow) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); AbstractResponse body = parseResponse(receive.payload(), req.header); log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body); if (req.isInternalRequest && body instanceof MetadataResponse)// 如果是 meta 响应 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); elseif (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); // 如果是其他响应 else responses.add(req.completed(body, now)); } }
// 处理 Server 端对 Metadata 请求处理后的 response public void handleCompletedMetadataResponse(RequestHeader requestHeader, longnow, MetadataResponse response) { this.metadataFetchInProgress = false; Cluster cluster = response.cluster(); // check if any topics metadata failed to get updated Map<String, Errors> errors = response.errors(); if (!errors.isEmpty()) log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now);// 更新 meta 信息 } else {// 如果 metadata 中 node 信息无效,则不更新 metadata 信息 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); } }