diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index 91185fbe94c..e87a8e803fd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread { LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; private final NettyRequestProcessor processor; - private final ConcurrentHashMap> topicCidMap; + private final ConcurrentLinkedHashMap> topicCidMap; private final ConcurrentLinkedHashMap> pollingMap; private long lastCleanTime = 0; @@ -63,7 +63,8 @@ public PopLongPollingService(BrokerController brokerController, NettyRequestProc this.brokerController = brokerController; this.processor = processor; // 100000 topic default, 100000 lru topic + cid + qid - this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize()); + this.topicCidMap = new ConcurrentLinkedHashMap.Builder>() + .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build(); this.pollingMap = new ConcurrentLinkedHashMap.Builder>() .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); this.notifyLast = notifyLast; @@ -350,7 +351,7 @@ private void cleanUnusedResource() { Map.Entry> entry = topicCidMapIter.next(); String topic = entry.getKey(); if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic); + POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic); topicCidMapIter.remove(); continue; } @@ -358,8 +359,8 @@ private void cleanUnusedResource() { while (cidMapIter.hasNext()) { Map.Entry cidEntry = cidMapIter.next(); String cid = cidEntry.getKey(); - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { - POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic); + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { + POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic); cidMapIter.remove(); } } @@ -380,12 +381,12 @@ private void cleanUnusedResource() { String topic = keyArray[0]; String cid = keyArray[1]; if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic); + POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic); pollingMapIter.remove(); continue; } - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { - POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic); + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { + POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic); pollingMapIter.remove(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 4eccc6c0374..120f5b104c7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -281,7 +281,7 @@ protected void autoClean() { continue; } - if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) { + if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) { iterator.remove(); log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs); continue; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6bcf9aaa0f7..6fb7584aa9b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -424,7 +424,7 @@ private RemotingCommand getSubscriptionGroup(ChannelHandlerContext ctx, GetSubscriptionGroupConfigRequestHeader requestHeader = (GetSubscriptionGroupConfigRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup()); + SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (groupConfig == null) { LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup()); response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); @@ -2444,7 +2444,7 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, } // groupSysFlag if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) { - SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup()); + SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (groupConfig != null) { request.addExtField("groupSysFlag", String.valueOf(groupConfig.getGroupSysFlag())); } @@ -2933,7 +2933,7 @@ private RemotingCommand getTopicConfig(ChannelHandlerContext ctx, GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic()); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (topicConfig == null) { LOGGER.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic()); //be care of the response code, should set "not-exist" explicitly diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 05a92c54b18..820388b18d2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -197,12 +197,12 @@ private void scanGarbage() { String topic = keyArray[0]; String cid = keyArray[1]; if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", topic); + POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic); iterator.remove(); continue; } - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { - POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic); + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { + POP_LOGGER.info("[PopBuffer]remove nonexistent subscription group {} of topic {} in buffer!", cid, topic); iterator.remove(); continue; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java index 1f064ec05d1..003bf09842a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java @@ -55,20 +55,20 @@ public class PopLongPollingServiceTest { @Mock private BrokerController brokerController; - + @Mock private NettyRequestProcessor processor; - + @Mock private ChannelHandlerContext ctx; - + @Mock private ExecutorService pullMessageExecutor; - + private PopLongPollingService popLongPollingService; - + private final String defaultTopic = "defaultTopic"; - + @Before public void init() { BrokerConfig brokerConfig = new BrokerConfig(); @@ -76,7 +76,7 @@ public void init() { when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); popLongPollingService = spy(new PopLongPollingService(brokerController, processor, true)); } - + @Test public void testNotifyMessageArrivingWithRetryTopic() { int queueId = 0; @@ -84,31 +84,32 @@ public void testNotifyMessageArrivingWithRetryTopic() { popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId); verify(popLongPollingService, times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null, 0L, null, null); } - + @Test public void testNotifyMessageArriving() { int queueId = 0; Long tagsCode = 123L; long offset = 123L; long msgStoreTime = System.currentTimeMillis(); - byte[] filterBitMap = new byte[]{0x01}; + byte[] filterBitMap = new byte[] {0x01}; Map properties = new ConcurrentHashMap<>(); doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); verify(popLongPollingService).notifyMessageArriving(defaultTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } - + @Test public void testNotifyMessageArrivingValidRequest() throws Exception { String cid = "CID_1"; int queueId = 0; - ConcurrentHashMap> topicCidMap = new ConcurrentHashMap<>(); + ConcurrentLinkedHashMap> topicCidMap = new ConcurrentLinkedHashMap.Builder>() + .maximumWeightedCapacity(10).build(); ConcurrentHashMap cids = new ConcurrentHashMap<>(); cids.put(cid, (byte) 1); topicCidMap.put(defaultTopic, cids); popLongPollingService = new PopLongPollingService(brokerController, processor, true); ConcurrentLinkedHashMap> pollingMap = - new ConcurrentLinkedHashMap.Builder>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); + new ConcurrentLinkedHashMap.Builder>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); Channel channel = mock(Channel.class); when(channel.isActive()).thenReturn(true); PopRequest popRequest = mock(PopRequest.class); @@ -126,19 +127,19 @@ public void testNotifyMessageArrivingValidRequest() throws Exception { boolean actual = popLongPollingService.notifyMessageArriving(defaultTopic, queueId, cid, null, 0, null, null); assertFalse(actual); } - + @Test public void testWakeUpNullRequest() { assertFalse(popLongPollingService.wakeUp(null)); } - + @Test public void testWakeUpIncompleteRequest() { PopRequest request = mock(PopRequest.class); when(request.complete()).thenReturn(false); assertFalse(popLongPollingService.wakeUp(request)); } - + @Test public void testWakeUpInactiveChannel() { PopRequest request = mock(PopRequest.class); @@ -150,7 +151,7 @@ public void testWakeUpInactiveChannel() { when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor); assertTrue(popLongPollingService.wakeUp(request)); } - + @Test public void testWakeUpValidRequestWithException() throws Exception { PopRequest request = mock(PopRequest.class); @@ -168,7 +169,7 @@ public void testWakeUpValidRequestWithException() throws Exception { captor.getValue().run(); verify(processor).processRequest(any(), any()); } - + @Test public void testPollingNotPolling() { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); @@ -180,7 +181,7 @@ public void testPollingNotPolling() { PollingResult result = popLongPollingService.polling(ctx, remotingCommand, requestHeader, subscriptionData, messageFilter); assertEquals(PollingResult.NOT_POLLING, result); } - + @Test public void testPollingServicePollingTimeout() throws IllegalAccessException { String cid = "CID_1"; @@ -194,7 +195,8 @@ public void testPollingServicePollingTimeout() throws IllegalAccessException { when(requestHeader.getPollTime()).thenReturn(1000L); when(requestHeader.getTopic()).thenReturn(defaultTopic); when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup"); - ConcurrentHashMap> topicCidMap = new ConcurrentHashMap<>(); + ConcurrentLinkedHashMap> topicCidMap = new ConcurrentLinkedHashMap.Builder>() + .maximumWeightedCapacity(10).build(); ConcurrentHashMap cids = new ConcurrentHashMap<>(); cids.put(cid, (byte) 1); topicCidMap.put(defaultTopic, cids); @@ -202,7 +204,7 @@ public void testPollingServicePollingTimeout() throws IllegalAccessException { PollingResult result = popLongPollingService.polling(ctx, remotingCommand, requestHeader, subscriptionData, messageFilter); assertEquals(PollingResult.POLLING_TIMEOUT, result); } - + @Test public void testPollingPollingSuc() { ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java index 25b418c9344..4414eda54e9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; @@ -29,7 +28,6 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; -import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.assertj.core.util.Lists; import org.junit.Before; import org.junit.Test; @@ -384,9 +382,7 @@ public void testAutoCleanAndEncode() { SubscriptionGroupManager subscriptionGroupManager = mock(SubscriptionGroupManager.class); when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); - ConcurrentMap subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>(); - subscriptionGroupConfigConcurrentMap.put(GROUP, new SubscriptionGroupConfig()); - when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap); + when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true); TopicConfig topicConfig = new TopicConfig(TOPIC); when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);