Skip to content

Commit

Permalink
KAFKA-17455: fix stuck producer when throttling or retrying (#17527)
Browse files Browse the repository at this point in the history
A producer might get stuck after it was throttled. This PR unblocks the producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().

Reviewers: Matthias J. Sax <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
coltmcnealy-lh authored Jan 9, 2025
1 parent 25fdcd0 commit bb22eec
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long
throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow

// If the network client is waiting to send data for some reason (eg. throttling or retry backoff),
// polling longer than that is potentially dangerous as the producer will not attempt to send
// any pending requests.
long waitingTime = client.pollDelayMs(node, startTime);
if (waitingTime > 0 && pollTimeout > waitingTime) {
// Block only until the next-scheduled time that it's okay to send data to the producer,
// wake up, and try again. This is the way.
pollTimeout = waitingTime;
}

client.poll(pollTimeout, attemptStartTime);
if (client.authenticationException(node) != null)
throw client.authenticationException(node);
Expand Down
20 changes: 19 additions & 1 deletion clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public FutureResponse(Node node,

private int correlation;
private Runnable wakeupHook;
private boolean advanceTimeDuringPoll;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>();
Expand Down Expand Up @@ -138,7 +139,11 @@ public long connectionDelay(Node node, long now) {

@Override
public long pollDelayMs(Node node, long now) {
return connectionDelay(node, now);
return connectionState(node.idString()).pollDelayMs(now);
}

public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
this.advanceTimeDuringPoll = advanceTimeDuringPoll;
}

public void backoff(Node node, long durationMs) {
Expand Down Expand Up @@ -336,6 +341,12 @@ public List<ClientResponse> poll(long timeoutMs, long now) {
copy.add(response);
}

// In real life, if poll() is called and we get to the end with no responses,
// time equal to timeoutMs would have passed.
if (advanceTimeDuringPoll) {
time.sleep(timeoutMs);
}

return copy;
}

Expand Down Expand Up @@ -795,6 +806,13 @@ long connectionDelay(long now) {
return 0;
}

long pollDelayMs(long now) {
if (notThrottled(now))
return connectionDelay(now);

return throttledUntilMs - now;
}

boolean ready(long now) {
switch (state) {
case CONNECTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,46 @@ public void testMetadataTopicExpiry() throws Exception {
assertTrue(future.isDone(), "Request should be completed");
}

@Test
public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
// We want MockClient#poll() to advance time so that eventually the backoff expires.
try {
client.advanceTimeDuringPoll(true);

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

int throttleTimeMs = 1000;
long startTime = time.milliseconds();
Node nodeToThrottle = metadata.fetch().nodeById(0);
client.throttle(nodeToThrottle, throttleTimeMs);

// Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
// as done above by throttling or with a disconnect / backoff.
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
assertEquals(currentPollDelay, throttleTimeMs);

txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);

assertFalse(txnManager.hasInFlightRequest());
sender.runOnce();
assertTrue(txnManager.hasInFlightRequest());

long totalTimeToRunOnce = time.milliseconds() - startTime;

// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);

} finally {
client.advanceTimeDuringPoll(false);
}
}

@Test
public void testNodeLatencyStats() throws Exception {
try (Metrics m = new Metrics()) {
Expand Down

0 comments on commit bb22eec

Please sign in to comment.