-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Delayed message isn't marked as acked when delayed time > messageTTL #23739
Comments
BTW, according the doc, seems the delayed message should be auto acked if TTL expires. https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#delayed-message-delivery |
Additionally, I didn't set TTL on namespace level. visxu➜~/pulsar-cluster-3.0.6» ./pulsar-1/bin/pulsar-admin namespaces get-message-ttl public/default [21:27:52]
null
visxu➜~/pulsar-cluster-3.0.6» |
@crossoverJie Could you help take a look if you're available ? |
I also found this problem in pulsar v4.0.0. Here's how I did it : bin/pulsar-admin --admin-url http://localhost:8080 namespaces set-message-ttl --messageTTL 30s public/default
bin/pulsar-admin --admin-url http://localhost:8080 namespaces get-message-ttl public/default public class DelayMessageProducer {
private static final String serviceUrl = "pulsar://localhost:6650";
private static final String topicName = "persistent://public/default/test_delay_message";
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
try (Producer<byte[]> producer = client.newProducer().topic(topicName).create()) {
for (int i = 0; i < 100; i++) {
MessageId messageId = producer.newMessage().deliverAfter(i, TimeUnit.SECONDS).value(("Hello Pulsar! " + i).getBytes(StandardCharsets.UTF_8)).send();
System.out.println(messageId);
}
}
client.close();
}
}
public class DelayMessageConsumer {
private static final String serviceUrl = "pulsar://localhost:6650";
private static final String topicName = "persistent://public/default/test_delay_message";
private static final String subscriptionName = "s0";
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
while (true) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
System.out.printf("%s,%s,%s\n", message.getPublishTime(), message.getMessageId(), new String(message.getData()));
}
}
} |
TTL check interval's default value is 5 minutes. see |
Search before asking
Read release policy
Version
OS: MAC 13.6.7
Java: 17
Pulsar: 3.0.6
Minimal reproduce step
What did you expect to see?
The consumer will not receive the delayed message.
What did you see instead?
Consumer consumed the delayed message.
Anything else?
I sent a delayed 120s message with timestamp, but got it 2 mins later.
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: