From bcc863fcd05cd332dc495991a995f74bf1284728 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Thu, 29 Oct 2020 18:00:38 -0400 Subject: [PATCH] [test] fixing test to account for sharding --- .../statsd/NonBlockingStatsDClient.java | 4 +- .../statsd/StatsDBlockingProcessor.java | 13 +++- .../statsd/StatsDNonBlockingProcessor.java | 21 ++++-- .../com/timgroup/statsd/StatsDProcessor.java | 1 + .../timgroup/statsd/StatsDAggregatorTest.java | 65 +++++++++++++++---- 5 files changed, 81 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index f60e464d..c1f07c7b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -324,7 +324,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, - poolSize, 1, false, 0, aggregationShards); + poolSize, 1, 1, false, 0, aggregationShards); } this.telemetry = new Telemetry(telemetrytags, telemetryStatsDProcessor); @@ -1001,7 +1001,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ this(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, 0, 0); + DEFAULT_LOCK_SHARD_GRAIN, blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 39a5836a..95efb816 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -107,10 +107,12 @@ public void run() { final int aggregatorFlushInterval, final int aggregatorShards) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); + this.messages = new ArrayBlockingQueue[lockShardGrain]; for (int i = 0 ; i < lockShardGrain ; i++) { this.messages[i] = new ArrayBlockingQueue(queueSize); } + this.processorWorkQueue = new ArrayBlockingQueue[workers]; for (int i = 0 ; i < workers ; i++) { this.processorWorkQueue[i] = new ArrayBlockingQueue(queueSize); @@ -126,7 +128,16 @@ protected ProcessingTask createProcessingTask(int id) { throws Exception { super(processor); - this.messages = new ArrayBlockingQueue<>(processor.getQcapacity()); + + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.messages[i] = new ArrayBlockingQueue(getQcapacity()); + } + + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(getQcapacity()); + } } @Override diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index f90036aa..7d4282de 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -17,8 +17,6 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { private class ProcessingTask extends StatsDProcessor.ProcessingTask { - private final int processorQueueId; - public ProcessingTask(int id) { super(id); } @@ -28,6 +26,7 @@ public void run() { ByteBuffer sendBuffer; boolean empty = true; boolean emptyHighPrio = true; + int messageQueueIdx = 0; try { sendBuffer = bufferPool.borrow(); @@ -57,7 +56,7 @@ public void run() { message = highPrioMessages.poll(); } else { - final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); + messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); message = messages[messageQueueIdx].poll(); } @@ -126,7 +125,6 @@ public void run() { final int aggregatorShards) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); - this.qsize = new AtomicInteger(0); this.qsize = new AtomicInteger[lockShardGrain]; this.messages = new ConcurrentLinkedQueue[lockShardGrain]; @@ -149,8 +147,19 @@ protected ProcessingTask createProcessingTask(int id) { StatsDNonBlockingProcessor(final StatsDNonBlockingProcessor processor) throws Exception { super(processor); - this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ConcurrentLinkedQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ConcurrentLinkedQueue(); + this.qsize[i].set(0); + } + + this.processorWorkQueue = new ConcurrentLinkedQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ConcurrentLinkedQueue(); + } } @Override diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 94043c97..0ced985c 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -116,6 +116,7 @@ public Thread newThread(final Runnable runnable) { this.handler = processor.handler; this.workers = processor.workers; + this.lockShardGrain = processor.lockShardGrain; this.qcapacity = processor.getQcapacity(); this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() { diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 185ce740..34bf0cdb 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -19,6 +19,7 @@ import java.util.Queue; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,22 +82,42 @@ protected void writeTo(StringBuilder builder){} // fakeProcessor store messages from the telemetry only public static class FakeProcessor extends StatsDProcessor { - private final Queue messages; + private final Queue[] messages; + private final Queue[] processorWorkQueue; + private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate. private final AtomicInteger messageSent = new AtomicInteger(0); private final AtomicInteger messageAggregated = new AtomicInteger(0); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0); - this.messages = new ConcurrentLinkedQueue<>(); - } + super(50, handler, 0, 1, 1, 1, 0, StatsDAggregator.DEFAULT_SHARDS); + + // 1 queue (lockShardGrain = 1) + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ArrayBlockingQueue(getQcapacity()); + this.qsize[i].set(0); + } + // 1 worker + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(getQcapacity()); + } + } private class FakeProcessingTask extends StatsDProcessor.ProcessingTask { + + public FakeProcessingTask(int id) { + super(id); + } + @Override public void run() { while (!shutdown) { - final Message message = messages.poll(); + final Message message = messages[0].poll(); if (message == null) { try{ @@ -106,6 +127,7 @@ public void run() { continue; } + qsize[0].decrementAndGet(); if (aggregator.aggregateMessage(message)) { messageAggregated.incrementAndGet(); continue; @@ -118,23 +140,38 @@ public void run() { } @Override - protected StatsDProcessor.ProcessingTask createProcessingTask() { - return new FakeProcessingTask(); + protected StatsDProcessor.ProcessingTask createProcessingTask(int id) { + return new FakeProcessingTask(id); } @Override public boolean send(final Message msg) { - messages.offer(msg); - return true; + if (!shutdown) { + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; + + if (qsize[shard].get() < qcapacity) { + messages[shard].offer(msg); + qsize[shard].incrementAndGet(); + processorWorkQueue[processQueue].offer(shard); + return true; + } + } + + return false; } - public Queue getMessages() { - return messages; + public Queue getMessages(int id) { + return messages[id]; } public void clear() { try { - messages.clear(); + + for (int i = 0 ; i < lockShardGrain ; i++) { + messages[i].clear(); + } highPrioMessages.clear(); } catch (Exception e) {} } @@ -181,7 +218,7 @@ public void aggregate_messages() throws Exception { fakeProcessor.send(new FakeAlphaMessage("some.set", Message.Type.SET, "value")); } - waitForQueueSize(fakeProcessor.messages, 0); + waitForQueueSize(fakeProcessor.messages[0], 0); // 10 gauges, 10 counts, 10 sets assertEquals(30, fakeProcessor.messageAggregated.get()); @@ -206,7 +243,7 @@ public void aggregation_sharding() throws Exception { fakeProcessor.send(gauge); } - waitForQueueSize(fakeProcessor.messages, 0); + waitForQueueSize(fakeProcessor.messages[0], 0); for (int i=0 ; i map = fakeProcessor.aggregator.aggregateMetrics.get(i);