Skip to content

Commit

Permalink
[ISSUE #9105] Fix the issue of duplicate consumption in LMQ (#9101)
Browse files Browse the repository at this point in the history
* Fix the issue of duplicate consumption in LMQ

* Pass the checkstyle

* Pass the UTs

* Pass the check style
  • Loading branch information
RongtongJin authored Jan 6, 2025
1 parent 0548593 commit 9ce8345
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread {
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
private long lastCleanTime = 0;

Expand All @@ -63,7 +63,8 @@ public PopLongPollingService(BrokerController brokerController, NettyRequestProc
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build();
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.notifyLast = notifyLast;
Expand Down Expand Up @@ -350,16 +351,16 @@ private void cleanUnusedResource() {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next();
String topic = entry.getKey();
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic);
topicCidMapIter.remove();
continue;
}
Iterator<Map.Entry<String, Byte>> cidMapIter = entry.getValue().entrySet().iterator();
while (cidMapIter.hasNext()) {
Map.Entry<String, Byte> cidEntry = cidMapIter.next();
String cid = cidEntry.getKey();
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic);
cidMapIter.remove();
}
}
Expand All @@ -380,12 +381,12 @@ private void cleanUnusedResource() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic);
POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic);
pollingMapIter.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic);
pollingMapIter.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ protected void autoClean() {
continue;
}

if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) {
if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) {
iterator.remove();
log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private RemotingCommand getSubscriptionGroup(ChannelHandlerContext ctx,
GetSubscriptionGroupConfigRequestHeader requestHeader = (GetSubscriptionGroupConfigRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup());
SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (groupConfig == null) {
LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup());
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
Expand Down Expand Up @@ -2444,7 +2444,7 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,
}
// groupSysFlag
if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) {
SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup());
SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (groupConfig != null) {
request.addExtField("groupSysFlag", String.valueOf(groupConfig.getGroupSysFlag()));
}
Expand Down Expand Up @@ -2933,7 +2933,7 @@ private RemotingCommand getTopicConfig(ChannelHandlerContext ctx,
GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (topicConfig == null) {
LOGGER.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic());
//be care of the response code, should set "not-exist" explicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ private void scanGarbage() {
String topic = keyArray[0];
String cid = keyArray[1];
if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", topic);
POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic);
iterator.remove();
continue;
}
if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) {
POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic);
if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) {
POP_LOGGER.info("[PopBuffer]remove nonexistent subscription group {} of topic {} in buffer!", cid, topic);
iterator.remove();
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,60 +55,61 @@ public class PopLongPollingServiceTest {

@Mock
private BrokerController brokerController;

@Mock
private NettyRequestProcessor processor;

@Mock
private ChannelHandlerContext ctx;

@Mock
private ExecutorService pullMessageExecutor;

private PopLongPollingService popLongPollingService;

private final String defaultTopic = "defaultTopic";

@Before
public void init() {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setPopPollingMapSize(100);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
popLongPollingService = spy(new PopLongPollingService(brokerController, processor, true));
}

@Test
public void testNotifyMessageArrivingWithRetryTopic() {
int queueId = 0;
doNothing().when(popLongPollingService).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null);
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId);
verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null);
}

@Test
public void testNotifyMessageArriving() {
int queueId = 0;
Long tagsCode = 123L;
long offset = 123L;
long msgStoreTime = System.currentTimeMillis();
byte[] filterBitMap = new byte[]{0x01};
byte[] filterBitMap = new byte[] {0x01};
Map<String, String> properties = new ConcurrentHashMap<>();
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
verify(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}

@Test
public void testNotifyMessageArrivingValidRequest() throws Exception {
String cid = "CID_1";
int queueId = 0;
ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentHashMap<>();
ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(10).build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
popLongPollingService = new PopLongPollingService(brokerController, processor, true);
ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap =
new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
Channel channel = mock(Channel.class);
when(channel.isActive()).thenReturn(true);
PopRequest popRequest = mock(PopRequest.class);
Expand All @@ -126,19 +127,19 @@ public void testNotifyMessageArrivingValidRequest() throws Exception {
boolean actual = popLongPollingService.notifyMessageArriving(defaultTopic, queueId, cid, null, 0, null, null);
assertFalse(actual);
}

@Test
public void testWakeUpNullRequest() {
assertFalse(popLongPollingService.wakeUp(null));
}

@Test
public void testWakeUpIncompleteRequest() {
PopRequest request = mock(PopRequest.class);
when(request.complete()).thenReturn(false);
assertFalse(popLongPollingService.wakeUp(request));
}

@Test
public void testWakeUpInactiveChannel() {
PopRequest request = mock(PopRequest.class);
Expand All @@ -150,7 +151,7 @@ public void testWakeUpInactiveChannel() {
when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor);
assertTrue(popLongPollingService.wakeUp(request));
}

@Test
public void testWakeUpValidRequestWithException() throws Exception {
PopRequest request = mock(PopRequest.class);
Expand All @@ -168,7 +169,7 @@ public void testWakeUpValidRequestWithException() throws Exception {
captor.getValue().run();
verify(processor).processRequest(any(), any());
}

@Test
public void testPollingNotPolling() {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Expand All @@ -180,7 +181,7 @@ public void testPollingNotPolling() {
PollingResult result = popLongPollingService.polling(ctx, remotingCommand, requestHeader, subscriptionData, messageFilter);
assertEquals(PollingResult.NOT_POLLING, result);
}

@Test
public void testPollingServicePollingTimeout() throws IllegalAccessException {
String cid = "CID_1";
Expand All @@ -194,15 +195,16 @@ public void testPollingServicePollingTimeout() throws IllegalAccessException {
when(requestHeader.getPollTime()).thenReturn(1000L);
when(requestHeader.getTopic()).thenReturn(defaultTopic);
when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentHashMap<>();
ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>()
.maximumWeightedCapacity(10).build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
FieldUtils.writeDeclaredField(popLongPollingService, "topicCidMap", topicCidMap, true);
PollingResult result = popLongPollingService.polling(ctx, remotingCommand, requestHeader, subscriptionData, messageFilter);
assertEquals(PollingResult.POLLING_TIMEOUT, result);
}

@Test
public void testPollingPollingSuc() {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -384,9 +382,7 @@ public void testAutoCleanAndEncode() {

SubscriptionGroupManager subscriptionGroupManager = mock(SubscriptionGroupManager.class);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
subscriptionGroupConfigConcurrentMap.put(GROUP, new SubscriptionGroupConfig());
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true);

TopicConfig topicConfig = new TopicConfig(TOPIC);
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
Expand Down

0 comments on commit 9ce8345

Please sign in to comment.