Replies: 12 comments
-
There has been some discussion on geo replicated subscriptions @merlimat can provide more insights. |
Beta Was this translation helpful? Give feedback.
-
The one thing I thought of in the background is that messages would have to be replicated with origin DC and origin ID or part of an Avro schema- as currently message IDs are unique per DC (from my understanding) |
Beta Was this translation helpful? Give feedback.
-
@volfco you might check out the replicated subscription introduced in 2.4.0. that can potentially solve you problem. |
Beta Was this translation helpful? Give feedback.
-
@sijie I'll try that tomorrow and let you know if it meets my need. I didn't see that in my search of the docs |
Beta Was this translation helpful? Give feedback.
-
@sijie I've gotten a MVP cluster setup between two DCs that are ~40ms apart with replication between them. This is my code (I'm horrible at java): import org.apache.pulsar.client.api.*;
public class Main {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Need serviceURL");
System.exit(1);
}
PulsarClient client = PulsarClient.builder()
.serviceUrl(args[0])
.build();
Consumer consumer = client.newConsumer()
.topic("persistent://development/ns1/test")
.subscriptionName("consumer")
.replicateSubscriptionState(true)
.subscribe();
System.out.println("starting consumption");
System.out.println(args[0]);
while (true) {
Thread.sleep(100);
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s\n", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
}
} Reading PIP-33, I'm operating under the assumption if I run one consumer reading from AUS, that consumer dies, and I start one reading from IAD a few seconds later, I shouldn't encounter many duplicate messages. This is not what I'm seeing. I can start the consumer in AUS, it'll start reading. I close it at message 10 and wait a few seconds and start an IAD consumer. It will start at the 0 position, and not around 10 like I would expect. If I restart the AUS consumer, it will pick up at message 10 and ignore any progress the IAD one has made. It also seems that this feature is almost what I'm looking for. From my understanding of PIP-33, I can only run one consumer at a time? |
Beta Was this translation helpful? Give feedback.
-
Keep in mind that the subscription state in asynchronously replicated at configurable interval. Take a look at broker.conf:
|
Beta Was this translation helpful? Give feedback.
-
Ok, missed that you mentioned you're waiting few secs.. |
Beta Was this translation helpful? Give feedback.
-
Can you check on the broker logs if it's enabling the replicated subscription on that topic? |
Beta Was this translation helpful? Give feedback.
-
@merlimat These logs tell me that replication has been enabled:
|
Beta Was this translation helpful? Give feedback.
-
Ok, that doesn't look correct then. Can you enable debug logs on Also there are a couple of metrics in Prometheus:
|
Beta Was this translation helpful? Give feedback.
-
@merlimat Sure. Prometheus Metrics:
IAD on the left, AUS on the right. I waited 30 seconds between stopping IAD and starting AUS. I assume I configured log4j2.yaml wrong, as I go only the following message:
log4j2.yaml config:
|
Beta Was this translation helpful? Give feedback.
-
@volfco & @sijie Now whether the replication subscription status is available, the version I tested is 2.8.0, and it does not work normally at present. One site produces 10 messages and consumes two. another sites do not start from the third one, but start from the beginning. How did you solve this problem |
Beta Was this translation helpful? Give feedback.
-
One of the big things missing in my book, is a way to have multiple subscribers across datacenters consume messages "exactly" once.
DC A produces 5 messages. DC A has a consume subscription named TEST. DC B has a consumer with the same name. Ideally, a consumer in DC A process messages 1,2,3 while DC B is able to process 4 and 5.
Another example would be I'm writing emails to be sent out into a topic from us-west and us-east. I can send emails from each DC, and the DC should consume messages produced there (just by nature of asynchronous replication). But if the DC's consumers go down I would like the other DCs to pick up and consume messages that should have been consumed by the now failed DC.
From my admittedly limited research, I don't think this is currently supported. Subscribers cursor is currently per DC, which allows for consumers in each DC to consume the same message.
I could implement this on the client side by using the configuration store zookeeper to record a "lock" on a message ID to disallow consumption on other consumers- but this seems like something that could be implemented inside the broker and configurable on a per-namespace level. The consumer would write a lock into zookeeper, which other consumers would check for before doing the actual processing. If a lock exists, the consumer will ack it.
At the expense of consumption latency and zookeeper I/O, I think this would be an amazing feature.
I could see this implemented in two ways, both as Consumer Types. One is a "Lock on Everything" where a lock is acquired for each message. The second would be "Lock on foreign messages" where a lock is acquired for each message not produced in the local cluster (implying that it was replicated in from another cluster)- so messages produced from us-west would not require a global lock, but messages produced in us-west and consumed in us-east would need a global lock before processing.
Beta Was this translation helpful? Give feedback.
All reactions