Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18181: Refactor ShareConsumerTest #18105

Merged
merged 8 commits into from
Dec 14, 2024
Merged

Conversation

brandboat
Copy link
Member

related to https://issues.apache.org/jira/browse/KAFKA-18181

  1. consumeMessages should return messagesConsumed instead of wrapping it in CompletableFuture
  2. replace fail by assertDoesNotThrow or just throw exception
  3. produceMessagesshould return messagesConsumed instead of wrapping it in CompletableFuture
  4. use CompletableFuture.runAsync() instead of ExecutorService. We don't use the interrupt so it is fine to use default executor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added core Kafka Broker tests Test fixes (including flaky tests) labels Dec 9, 2024
@AndrewJSchofield AndrewJSchofield added the KIP-932 Queues for Kafka label Dec 9, 2024
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, it's a good refactor. I have one doubt on the lost exception details with assertions. Can we somehow capture the exception as well?

} catch (Exception e) {
fail("Failed to send records: " + e);
for (int i = 0; i < 10; i++) {
assertDoesNotThrow(() -> producer.send(record).get(), "Failed to send records");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible to capture the exception thrown as well so it's easy to find what went wrong. It's hard to find that with the current custom error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment @apoorvmittal10, I did a quick test to check if assertDoesNotThrow captures the exception details.

Here’s the test I wrote:

@Test
public void testT() {
    assertDoesNotThrow(() -> {
        throw new RuntimeException("foobar");
    }, "Failed to send records");
}

When I run this, the output includes the custom error message, the thrown exception, and its stack trace:

Failed to send records ==> Unexpected exception thrown: java.lang.RuntimeException: foobar
org.opentest4j.AssertionFailedError: Failed to send records ==> Unexpected exception thrown: java.lang.RuntimeException: foobar
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152)
	at org.junit.jupiter.api.AssertDoesNotThrow.createAssertionFailedError(AssertDoesNotThrow.java:84)
	at org.junit.jupiter.api.AssertDoesNotThrow.assertDoesNotThrow(AssertDoesNotThrow.java:75)
	at org.junit.jupiter.api.AssertDoesNotThrow.assertDoesNotThrow(AssertDoesNotThrow.java:62)
	at org.junit.jupiter.api.Assertions.assertDoesNotThrow(Assertions.java:3249)
	at kafka.server.logger.RuntimeLoggerManagerTest.testT(RuntimeLoggerManagerTest.java:46)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: java.lang.RuntimeException: foobar
	at kafka.server.logger.RuntimeLoggerManagerTest.lambda$testT$0(RuntimeLoggerManagerTest.java:47)
	at org.junit.jupiter.api.AssertDoesNotThrow.assertDoesNotThrow(AssertDoesNotThrow.java:71)
	... 6 more

Is this the level of detail you were hoping to see, or is there something additional you'd like? Let me know!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this looks fine to me, as once we include the custom error message, we get both the stack trace of the exception and the custom message that we defined.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into, this is good enough.

assertDoesNotThrow(() -> {
try (Admin admin = Admin.create(props)) {
admin.createTopics(Collections.singleton(new NewTopic(topicName, 1, (short) 1))).all().get();
}
Copy link
Contributor

@ShivsundarR ShivsundarR Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the custom error message added here as well("Failed to create topic"). Like in other parts of the code, we could maybe add that as the second argument for assertDoesNotThrow()?

fail("Failed to send record: " + e);
}
final int index = i;
assertDoesNotThrow(() -> recordFutures[index].get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, can we add the custom error message("Failed to send record") to the assert as well.

try (KafkaShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
new ByteArrayDeserializer(), new ByteArrayDeserializer(), groupId)) {
shareConsumer.subscribe(Collections.singleton(tp.topic()));
return consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit);
Copy link
Contributor

@ShivsundarR ShivsundarR Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well, can we add the custom error message with consumer number. There are a couple of more places where we do this, maybe we can change those places as well to include the custom error message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply, I added error message to all assertDoesNotThrow methods in this test. Thanks!

Copy link
Contributor

@ShivsundarR ShivsundarR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @brandboat , Thanks for the PR!. Just a few minor comments w.r.t the assertDoesNotThrow() changes.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for this patch

fail("Failed to send record: " + e);
}
final int index = i;
assertDoesNotThrow(() -> recordFutures[index].get(), "Failed to send record");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we expect all messages should be succeed, and hence we can rewrite this function:

    private int produceMessages(int messageCount) {
        try (KafkaProducer<byte[], byte[]> producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) {
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
            IntStream.range(0, messageCount).forEach(__ -> producer.send(record));
            producer.flush();
        }
        return messageCount;
    }

CompletableFuture.allOf(produceMessageFutures.toArray(CompletableFuture[]::new)).get(60, TimeUnit.SECONDS);
CompletableFuture.allOf(consumeMessagesFutures.toArray(CompletableFuture[]::new)).get(60, TimeUnit.SECONDS);

int totalSuccessResult = consumeMessagesFutures.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int totalSuccessResult = consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();

int actualMessagesSent = 0;
try {
producerExecutorService.awaitTermination(60, TimeUnit.SECONDS); // Wait for all producer threads to complete
int totalResult1 = consumeMessagesFutures1.stream().map(CompletableFuture::join).reduce(Integer::sum).orElse(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        int totalResult1 = consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
        int totalResult2 = consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
        int totalResult3 = consumeMessagesFutures3.stream().mapToInt(CompletableFuture::join).sum();

} catch (Exception e) {
fail("Exception occurred : " + e.getMessage());
}
assertEquals(totalMessagesSent, totalMessagesConsumedGroup1.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need totalMessagesConsumedGroup1, or can we simply count the returned values from all consumeMessages instead?

Copy link
Member Author

@brandboat brandboat Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can remove all AtomicInteger here, like you said, now we can rely on the returned values to verify how much records are consumed. WDYT ?

Update: I mean the AtomicInteger in consumeMessages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please remove it to simplify code :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that we cannot remove the AtomicInteger in consumeMessages because it is essential for managing the total number of messages consumed across multiple calls to consumeMessages.

Without it, many tests would hang since the condition totalMessagesConsumed.get() < totalMessages would no longer be met, causing the while loop to run indefinitely [0].

[0] https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/api/ShareConsumerTest.java#L1888

But you are right, the atomic integer assertions seems a bit redundant as now we can rely on the return value from consumeMessages. I'll remove those assertions, thanks.

Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor, LGTM!


int maxBytes = 100000;
for (int i = 0; i < consumerCount; i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the variable be consumerCount => consumerGroupCount ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment ! I think consumerGroupCount might still be confusing since there are three consumer groups in this test. Instead of renaming, perhaps we could remove the consumerCount variable and inlining its value directly into the for-loop.

@chia7712 chia7712 merged commit 9e60fcc into apache:trunk Dec 14, 2024
10 checks passed
@brandboat brandboat deleted the KAFKA-18181 branch December 14, 2024 03:37
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants