diff --git a/src/main/java/com/timgroup/statsd/BufferPool.java b/src/main/java/com/timgroup/statsd/BufferPool.java index 07e41654..298236de 100644 --- a/src/main/java/com/timgroup/statsd/BufferPool.java +++ b/src/main/java/com/timgroup/statsd/BufferPool.java @@ -54,6 +54,10 @@ int getSize() { return size; } + int getBufferSize() { + return bufferSize; + } + int available() { return pool.size(); } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index dbe9c17b..5a8c4282 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -83,12 +83,14 @@ String tag() { } } - public static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 1400; + public static final int DEFAULT_UDP_MAX_PACKET_SIZE_BYTES = 1432; + public static final int DEFAULT_UDS_MAX_PACKET_SIZE_BYTES = 8192; public static final int DEFAULT_QUEUE_SIZE = 4096; public static final int DEFAULT_POOL_SIZE = 512; public static final int DEFAULT_PROCESSOR_WORKERS = 1; public static final int DEFAULT_SENDER_WORKERS = 1; public static final int DEFAULT_DOGSTATSD_PORT = 8125; + public static final int DEFAULT_LOCK_SHARD_GRAIN = 4; public static final int SOCKET_TIMEOUT_MS = 100; public static final int SOCKET_BUFFER_BYTES = -1; public static final boolean DEFAULT_BLOCKING = false; @@ -213,6 +215,9 @@ private static String format(ThreadLocal formatter, Number value) * The number of processor worker threads assembling buffers for submission. * @param senderWorkers * The number of sender worker threads submitting buffers to the socket. + * @param lockShardGrain + * The granularity for the lock sharding - sharding is based of thread id + * so value should not be greater than the application thread count.. * @param blocking * Blocking or non-blocking implementation for statsd message queue. * @param enableTelemetry @@ -230,7 +235,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ final StatsDClientErrorHandler errorHandler, Callable addressLookup, Callable telemetryAddressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking, final boolean enableTelemetry, + final int senderWorkers, final int lockShardGrain, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval, final int aggregationFlushInterval, final int aggregationShards) throws StatsDClientException { if ((prefix != null) && (!prefix.isEmpty())) { @@ -288,7 +293,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ } statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, - processorWorkers, blocking, aggregationFlushInterval, aggregationShards); + processorWorkers, lockShardGrain, blocking, aggregationFlushInterval, aggregationShards); telemetryStatsDProcessor = statsDProcessor; Properties properties = new Properties(); @@ -320,7 +325,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, - poolSize, 1, false, 0, aggregationShards); + poolSize, 1, 1, false, 0, aggregationShards); } this.telemetry = new Telemetry(telemetrytags, telemetryStatsDProcessor); @@ -997,18 +1002,18 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ this(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, 0, 0); + DEFAULT_LOCK_SHARD_GRAIN, blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, - final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking, - final int aggregationFlushInterval, final int aggregationShards) throws Exception { + final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final int lockShardGrain, + final boolean blocking, final int aggregationFlushInterval, final int aggregationShards) throws Exception { if (blocking) { return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards); + workers, lockShardGrain, aggregationFlushInterval, aggregationShards); } else { return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards); + workers, lockShardGrain, aggregationFlushInterval, aggregationShards); } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 87c5cfdc..8e45a3c6 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -16,16 +16,17 @@ public class NonBlockingStatsDClientBuilder { * See https://github.com/DataDog/java-dogstatsd-client/pull/17 for discussion. */ + public int maxPacketSizeBytes = 0; public int port = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; public int telemetryPort = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; public int queueSize = NonBlockingStatsDClient.DEFAULT_QUEUE_SIZE; public int timeout = NonBlockingStatsDClient.SOCKET_TIMEOUT_MS; public int bufferPoolSize = NonBlockingStatsDClient.DEFAULT_POOL_SIZE; public int socketBufferSize = NonBlockingStatsDClient.SOCKET_BUFFER_BYTES; - public int maxPacketSizeBytes = NonBlockingStatsDClient.DEFAULT_MAX_PACKET_SIZE_BYTES; public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS; public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS; public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING; + public int lockShardGrain = NonBlockingStatsDClient.DEFAULT_LOCK_SHARD_GRAIN; public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY; public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION; public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL; @@ -90,6 +91,11 @@ public NonBlockingStatsDClientBuilder senderWorkers(int val) { return this; } + public NonBlockingStatsDClientBuilder lockShardGrain(int val) { + lockShardGrain = val; + return this; + } + public NonBlockingStatsDClientBuilder blocking(boolean val) { blocking = val; return this; @@ -165,6 +171,8 @@ public NonBlockingStatsDClientBuilder aggregationShards(int val) { * @return the built NonBlockingStatsDClient. */ public NonBlockingStatsDClient build() throws StatsDClientException { + + int packetSize = maxPacketSizeBytes; Callable lookup = addressLookup; Callable telemetryLookup = telemetryAddressLookup; @@ -172,6 +180,12 @@ public NonBlockingStatsDClient build() throws StatsDClientException { lookup = staticStatsDAddressResolution(hostname, port); } + if (packetSize == 0) { + packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : + NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } + + if (telemetryLookup == null) { if (telemetryHostname == null) { telemetryLookup = lookup; @@ -181,9 +195,9 @@ public NonBlockingStatsDClient build() throws StatsDClientException { } return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - lookup, telemetryLookup, timeout, socketBufferSize, maxPacketSizeBytes, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, - enableTelemetry, telemetryFlushInterval, + lookup, telemetryLookup, timeout, socketBufferSize, packetSize, + entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, + blocking, enableTelemetry, telemetryFlushInterval, (enableAggregation ? aggregationFlushInterval : 0), aggregationShards); } diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index a1b05fed..7fb76bc1 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -11,10 +11,15 @@ public class StatsDBlockingProcessor extends StatsDProcessor { - private final BlockingQueue messages; + private final BlockingQueue[] messages; + private final BlockingQueue[] processorWorkQueue; private class ProcessingTask extends StatsDProcessor.ProcessingTask { + public ProcessingTask(int id) { + super(id); + } + @Override public void run() { ByteBuffer sendBuffer; @@ -30,7 +35,8 @@ public void run() { aggregator.start(); - while (!((emptyHighPrio = highPrioMessages.isEmpty()) && (empty = messages.isEmpty()) && shutdown)) { + while (!((emptyHighPrio = highPrioMessages.isEmpty()) + && processorWorkQueue[this.processorQueueId].isEmpty() && shutdown)) { try { @@ -38,7 +44,8 @@ public void run() { if (!emptyHighPrio) { message = highPrioMessages.poll(); } else { - message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); + final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); + message = messages[messageQueueIdx].poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); } if (message != null) { @@ -74,7 +81,7 @@ public void run() { writeBuilderToSendBuffer(sendBuffer); } - if (null == messages.peek()) { + if (null == processorWorkQueue[this.processorQueueId].peek()) { outboundQueue.put(sendBuffer); sendBuffer = bufferPool.borrow(); } @@ -94,34 +101,57 @@ public void run() { aggregator.stop(); endSignal.countDown(); } - } StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, - final int maxPacketSizeBytes, final int poolSize, final int workers, + final int maxPacketSizeBytes, final int poolSize, final int workers, final int lockShardGrain, final int aggregatorFlushInterval, final int aggregatorShards) throws Exception { - super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, aggregatorFlushInterval, aggregatorShards); - this.messages = new ArrayBlockingQueue<>(queueSize); + super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, + lockShardGrain, aggregatorFlushInterval, aggregatorShards); + + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.messages[i] = new ArrayBlockingQueue(queueSize); + } + + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(queueSize); + } } @Override - protected ProcessingTask createProcessingTask() { - return new ProcessingTask(); + protected ProcessingTask createProcessingTask(int id) { + return new ProcessingTask(id); } StatsDBlockingProcessor(final StatsDBlockingProcessor processor) throws Exception { super(processor); - this.messages = new ArrayBlockingQueue<>(processor.getQcapacity()); + + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.messages[i] = new ArrayBlockingQueue(getQcapacity()); + } + + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(getQcapacity()); + } } @Override protected boolean send(final Message message) { try { + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; + if (!shutdown) { - messages.put(message); + messages[shard].put(message); + processorWorkQueue[processQueue].put(shard); return true; } } catch (InterruptedException e) { diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 9c6bd074..2aee5bb9 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -11,16 +11,22 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { - private final Queue messages; - private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate. + private final Queue[] messages; + private final Queue[] processorWorkQueue; + private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate. private class ProcessingTask extends StatsDProcessor.ProcessingTask { + public ProcessingTask(int id) { + super(id); + } + @Override public void run() { ByteBuffer sendBuffer; boolean empty = true; boolean emptyHighPrio = true; + int messageQueueIdx = 0; try { sendBuffer = bufferPool.borrow(); @@ -31,7 +37,8 @@ public void run() { aggregator.start(); - while (!((emptyHighPrio = highPrioMessages.isEmpty()) && (empty = messages.isEmpty()) && shutdown)) { + while (!((emptyHighPrio = highPrioMessages.isEmpty()) + && (empty = processorWorkQueue[this.processorQueueId].isEmpty()) && shutdown)) { try { @@ -48,12 +55,14 @@ public void run() { if (!emptyHighPrio) { message = highPrioMessages.poll(); } else { - message = messages.poll(); + + messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); + message = messages[messageQueueIdx].poll(); } if (message != null) { - qsize.decrementAndGet(); + qsize[messageQueueIdx].decrementAndGet(); // TODO: Aggregate and fix, there's some duplicate logic if (aggregator.aggregateMessage(message)) { @@ -88,7 +97,7 @@ public void run() { writeBuilderToSendBuffer(sendBuffer); } - if (null == messages.peek()) { + if (null == processorWorkQueue[this.processorQueueId].peek()) { outboundQueue.put(sendBuffer); sendBuffer = bufferPool.borrow(); } @@ -112,31 +121,59 @@ public void run() { StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, - final int aggregatorFlushInterval, final int aggregatorShards) - throws Exception { + final int lockShardGrain, final int aggregatorFlushInterval, + final int aggregatorShards) throws Exception { + + super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, + lockShardGrain, aggregatorFlushInterval, aggregatorShards); + + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ConcurrentLinkedQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ConcurrentLinkedQueue(); + this.qsize[i].set(0); + } - super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, aggregatorFlushInterval, aggregatorShards); - this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + this.processorWorkQueue = new ConcurrentLinkedQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ConcurrentLinkedQueue(); + } } @Override - protected ProcessingTask createProcessingTask() { - return new ProcessingTask(); + protected ProcessingTask createProcessingTask(int id) { + return new ProcessingTask(id); } StatsDNonBlockingProcessor(final StatsDNonBlockingProcessor processor) throws Exception { super(processor); - this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ConcurrentLinkedQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ConcurrentLinkedQueue(); + this.qsize[i].set(0); + } + + this.processorWorkQueue = new ConcurrentLinkedQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ConcurrentLinkedQueue(); + } } @Override protected boolean send(final Message message) { if (!shutdown) { - if (qsize.get() < qcapacity) { - messages.offer(message); - qsize.incrementAndGet(); + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; + + if (qsize[shard].get() < qcapacity) { + messages[shard].offer(message); + qsize[shard].incrementAndGet(); + processorWorkQueue[processQueue].offer(shard); return true; } } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 67f40d0e..0ced985c 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -26,6 +26,9 @@ public abstract class StatsDProcessor implements Runnable { protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer"; protected static final int WAIT_SLEEP_MS = 10; // 10 ms would be a 100HZ slice + // Atomic integer containing the next thread ID to be assigned + private static final AtomicInteger nextId = new AtomicInteger(0); + protected final StatsDClientErrorHandler handler; protected final BufferPool bufferPool; @@ -33,9 +36,16 @@ public abstract class StatsDProcessor implements Runnable { protected final BlockingQueue outboundQueue; // FIFO queue with outbound buffers protected final ExecutorService executor; protected final CountDownLatch endSignal; + protected static final ThreadLocal threadId = new ThreadLocal() { + @Override + protected Integer initialValue() { + return nextId.getAndIncrement(); + } + }; protected final int workers; protected final int qcapacity; + protected final int lockShardGrain; protected StatsDAggregator aggregator; protected volatile Telemetry telemetry; @@ -48,6 +58,11 @@ protected abstract class ProcessingTask implements Runnable { protected final CharsetEncoder utf8Encoder = MESSAGE_CHARSET.newEncoder() .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE); + protected final int processorQueueId; + + public ProcessingTask(int id) { + this.processorQueueId = id; + } public abstract void run(); @@ -69,12 +84,14 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, - final int aggregatorFlushInterval, final int aggregatorShards) + final int lockShardGrain, final int aggregatorFlushInterval, + final int aggregatorShards) throws Exception { this.handler = handler; this.workers = workers; this.qcapacity = queueSize; + this.lockShardGrain = lockShardGrain; this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); @@ -99,6 +116,7 @@ public Thread newThread(final Runnable runnable) { this.handler = processor.handler; this.workers = processor.workers; + this.lockShardGrain = processor.lockShardGrain; this.qcapacity = processor.getQcapacity(); this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() { @@ -120,7 +138,7 @@ public Thread newThread(final Runnable runnable) { processor.getAggregator().getFlushInterval()); } - protected abstract ProcessingTask createProcessingTask(); + protected abstract ProcessingTask createProcessingTask(int id); protected abstract boolean send(final Message message); @@ -146,11 +164,16 @@ public int getQcapacity() { return this.qcapacity; } + // Returns the current thread's unique ID, assigning it if necessary + public static int getThreadId() { + return threadId.get().intValue(); + } + @Override public void run() { for (int i = 0 ; i < workers ; i++) { - executor.submit(createProcessingTask()); + executor.submit(createProcessingTask(i)); } boolean done = false; diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index a11ab52f..b41df345 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -61,6 +61,11 @@ public void clear() { server.clear(); } + @Test + public void assert_default_udp_size() throws Exception { + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES); + } + @Test(timeout = 5000L) public void sends_counter_value_to_statsd() throws Exception { @@ -1240,10 +1245,10 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD String[] constantTags, final StatsDClientErrorHandler errorHandler, Callable addressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking) throws StatsDClientException { + final int senderWorkers, final int lockShardGrain, boolean blocking) throws StatsDClientException { super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,bufferSize, - maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0, 0, 0); + maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, false, 0, 0, 0); lock = new CountDownLatch(1); } @@ -1263,14 +1268,20 @@ private static class SlowStatsDNonBlockingStatsDClientBuilder extends NonBlockin @Override public SlowStatsDNonBlockingStatsDClient build() throws StatsDClientException { + int packetSize = maxPacketSizeBytes; + if (packetSize == 0) { + packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : + NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } + if (addressLookup != null) { return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, bufferPoolSize, - processorWorkers, senderWorkers, blocking); + addressLookup, timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, + processorWorkers, senderWorkers, lockShardGrain, blocking); } else { return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking); + staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, + entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking); } } } diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 185ce740..34bf0cdb 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -19,6 +19,7 @@ import java.util.Queue; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,22 +82,42 @@ protected void writeTo(StringBuilder builder){} // fakeProcessor store messages from the telemetry only public static class FakeProcessor extends StatsDProcessor { - private final Queue messages; + private final Queue[] messages; + private final Queue[] processorWorkQueue; + private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate. private final AtomicInteger messageSent = new AtomicInteger(0); private final AtomicInteger messageAggregated = new AtomicInteger(0); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0); - this.messages = new ConcurrentLinkedQueue<>(); - } + super(50, handler, 0, 1, 1, 1, 0, StatsDAggregator.DEFAULT_SHARDS); + + // 1 queue (lockShardGrain = 1) + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ArrayBlockingQueue(getQcapacity()); + this.qsize[i].set(0); + } + // 1 worker + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(getQcapacity()); + } + } private class FakeProcessingTask extends StatsDProcessor.ProcessingTask { + + public FakeProcessingTask(int id) { + super(id); + } + @Override public void run() { while (!shutdown) { - final Message message = messages.poll(); + final Message message = messages[0].poll(); if (message == null) { try{ @@ -106,6 +127,7 @@ public void run() { continue; } + qsize[0].decrementAndGet(); if (aggregator.aggregateMessage(message)) { messageAggregated.incrementAndGet(); continue; @@ -118,23 +140,38 @@ public void run() { } @Override - protected StatsDProcessor.ProcessingTask createProcessingTask() { - return new FakeProcessingTask(); + protected StatsDProcessor.ProcessingTask createProcessingTask(int id) { + return new FakeProcessingTask(id); } @Override public boolean send(final Message msg) { - messages.offer(msg); - return true; + if (!shutdown) { + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; + + if (qsize[shard].get() < qcapacity) { + messages[shard].offer(msg); + qsize[shard].incrementAndGet(); + processorWorkQueue[processQueue].offer(shard); + return true; + } + } + + return false; } - public Queue getMessages() { - return messages; + public Queue getMessages(int id) { + return messages[id]; } public void clear() { try { - messages.clear(); + + for (int i = 0 ; i < lockShardGrain ; i++) { + messages[i].clear(); + } highPrioMessages.clear(); } catch (Exception e) {} } @@ -181,7 +218,7 @@ public void aggregate_messages() throws Exception { fakeProcessor.send(new FakeAlphaMessage("some.set", Message.Type.SET, "value")); } - waitForQueueSize(fakeProcessor.messages, 0); + waitForQueueSize(fakeProcessor.messages[0], 0); // 10 gauges, 10 counts, 10 sets assertEquals(30, fakeProcessor.messageAggregated.get()); @@ -206,7 +243,7 @@ public void aggregation_sharding() throws Exception { fakeProcessor.send(gauge); } - waitForQueueSize(fakeProcessor.messages, 0); + waitForQueueSize(fakeProcessor.messages[0], 0); for (int i=0 ; i map = fakeProcessor.aggregator.aggregateMetrics.get(i); diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index 2a347114..791ec182 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -31,11 +31,16 @@ public static class FakeProcessor extends StatsDProcessor { public final List messages = new ArrayList<>(); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0); + super(0, handler, 0, 1, 1, 1, 0, 0); } private class FakeProcessingTask extends StatsDProcessor.ProcessingTask { + + public FakeProcessingTask(int id) { + super(id); + } + @Override public void run() {} } @@ -50,8 +55,8 @@ public boolean send(final Message msg) { public void run(){} @Override - protected ProcessingTask createProcessingTask() { - return new FakeProcessingTask(); + protected ProcessingTask createProcessingTask(int id) { + return new FakeProcessingTask(id); } public List getMessages() { @@ -82,12 +87,12 @@ public StatsDNonBlockingTelemetry(final String prefix, final int queueSize, Stri final StatsDClientErrorHandler errorHandler, Callable addressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking, final boolean enableTelemetry, - final int telemetryFlushInterval) + final int senderWorkers, final int lockShardGrain, boolean blocking, + final boolean enableTelemetry, final int telemetryFlushInterval) throws StatsDClientException { super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, 0, 0); + lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } }; @@ -95,16 +100,23 @@ private static class StatsDNonBlockingTelemetryBuilder extends NonBlockingStatsD @Override public StatsDNonBlockingTelemetry build() throws StatsDClientException { + + int packetSize = maxPacketSizeBytes; + if (packetSize == 0) { + packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : + NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } + if (addressLookup != null) { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, - addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, - bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval); + addressLookup, timeout, socketBufferSize, packetSize, entityID, + bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, + enableTelemetry, telemetryFlushInterval); } else { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, - staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval); + staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, + entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, + enableTelemetry, telemetryFlushInterval); } } } @@ -347,6 +359,8 @@ public void telemetry_flushInterval() throws Exception { public void telemetry_droppedData() throws Exception { clientError.telemetry.reset(); + assertThat(clientError.statsDProcessor.bufferPool.getBufferSize(), equalTo(8192)); + clientError.gauge("gauge", 24); // leaving time to the server to flush metrics diff --git a/src/test/java/com/timgroup/statsd/UnixSocketTest.java b/src/test/java/com/timgroup/statsd/UnixSocketTest.java index 07543d12..0f3faeec 100644 --- a/src/test/java/com/timgroup/statsd/UnixSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixSocketTest.java @@ -14,6 +14,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertEquals; public class UnixSocketTest implements StatsDClientErrorHandler { private static File tmpFolder; @@ -62,6 +63,11 @@ public void stop() throws Exception { server.close(); } + @Test + public void assert_default_uds_size() throws Exception { + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES); + } + @Test(timeout = 5000L) public void sends_to_statsd() throws Exception { for(long i = 0; i < 5 ; i++) {