From 12395b420c82d6411d01c8a42efee414e9c47b21 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Fri, 31 Jan 2020 15:44:34 -0500 Subject: [PATCH 1/6] [shards] shard locks+queues. --- .../statsd/NonBlockingStatsDClient.java | 16 ++++-- .../NonBlockingStatsDClientBuilder.java | 10 +++- .../statsd/StatsDBlockingProcessor.java | 30 +++++++--- .../statsd/StatsDNonBlockingProcessor.java | 56 ++++++++++++++----- .../com/timgroup/statsd/StatsDProcessor.java | 10 +++- .../statsd/NonBlockingStatsDClientTest.java | 8 +-- 6 files changed, 97 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index dbe9c17b..f60e464d 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -89,6 +89,7 @@ String tag() { public static final int DEFAULT_PROCESSOR_WORKERS = 1; public static final int DEFAULT_SENDER_WORKERS = 1; public static final int DEFAULT_DOGSTATSD_PORT = 8125; + public static final int DEFAULT_LOCK_SHARD_GRAIN = 4; public static final int SOCKET_TIMEOUT_MS = 100; public static final int SOCKET_BUFFER_BYTES = -1; public static final boolean DEFAULT_BLOCKING = false; @@ -213,6 +214,9 @@ private static String format(ThreadLocal formatter, Number value) * The number of processor worker threads assembling buffers for submission. * @param senderWorkers * The number of sender worker threads submitting buffers to the socket. + * @param lockShardGrain + * The granularity for the lock sharding - sharding is based of thread id + * so value should not be greater than the application thread count.. * @param blocking * Blocking or non-blocking implementation for statsd message queue. * @param enableTelemetry @@ -230,7 +234,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ final StatsDClientErrorHandler errorHandler, Callable addressLookup, Callable telemetryAddressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking, final boolean enableTelemetry, + final int senderWorkers, final int lockShardGrain, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval, final int aggregationFlushInterval, final int aggregationShards) throws StatsDClientException { if ((prefix != null) && (!prefix.isEmpty())) { @@ -288,7 +292,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ } statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, - processorWorkers, blocking, aggregationFlushInterval, aggregationShards); + processorWorkers, lockShardGrain, blocking, aggregationFlushInterval, aggregationShards); telemetryStatsDProcessor = statsDProcessor; Properties properties = new Properties(); @@ -1001,14 +1005,14 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ } protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, - final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking, - final int aggregationFlushInterval, final int aggregationShards) throws Exception { + final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final int lockShardGrain, + final boolean blocking, final int aggregationFlushInterval, final int aggregationShards) throws Exception { if (blocking) { return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards); + workers, lockShardGrain, aggregationFlushInterval, aggregationShards); } else { return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards); + workers, lockShardGrain, aggregationFlushInterval, aggregationShards); } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 87c5cfdc..a462b941 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -26,6 +26,7 @@ public class NonBlockingStatsDClientBuilder { public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS; public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS; public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING; + public int lockShardGrain = NonBlockingStatsDClient.DEFAULT_LOCK_SHARD_GRAIN; public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY; public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION; public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL; @@ -90,6 +91,11 @@ public NonBlockingStatsDClientBuilder senderWorkers(int val) { return this; } + public NonBlockingStatsDClientBuilder lockShardGrain(int val) { + lockShardGrain = val; + return this; + } + public NonBlockingStatsDClientBuilder blocking(boolean val) { blocking = val; return this; @@ -182,8 +188,8 @@ public NonBlockingStatsDClient build() throws StatsDClientException { return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, lookup, telemetryLookup, timeout, socketBufferSize, maxPacketSizeBytes, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, - enableTelemetry, telemetryFlushInterval, + entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, + blocking, enableTelemetry, telemetryFlushInterval, (enableAggregation ? aggregationFlushInterval : 0), aggregationShards); } diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index a1b05fed..980dee01 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -11,7 +11,8 @@ public class StatsDBlockingProcessor extends StatsDProcessor { - private final BlockingQueue messages; + private final BlockingQueue[] messages; + private final BlockingQueue[] processorWorkQueue; private class ProcessingTask extends StatsDProcessor.ProcessingTask { @@ -30,7 +31,7 @@ public void run() { aggregator.start(); - while (!((emptyHighPrio = highPrioMessages.isEmpty()) && (empty = messages.isEmpty()) && shutdown)) { + while (!((emptyHighPrio = highPrioMessages.isEmpty()) && processorWorkQueue[this.processorQueueId].isEmpty() && shutdown)) { try { @@ -38,7 +39,8 @@ public void run() { if (!emptyHighPrio) { message = highPrioMessages.poll(); } else { - message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); + final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); + message = messages[messageQueueIdx].poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS); } if (message != null) { @@ -98,11 +100,18 @@ public void run() { } StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, - final int maxPacketSizeBytes, final int poolSize, final int workers, + final int maxPacketSizeBytes, final int poolSize, final int workers, final int lockShardGrain, final int aggregatorFlushInterval, final int aggregatorShards) throws Exception { - super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, aggregatorFlushInterval, aggregatorShards); - this.messages = new ArrayBlockingQueue<>(queueSize); + super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.messages[i] = new ArrayBlockingQueue(queueSize); + } + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(queueSize); + } } @Override @@ -120,8 +129,15 @@ protected ProcessingTask createProcessingTask() { @Override protected boolean send(final Message message) { try { + long threadId = Thread.currentThread().getId(); + // modulo reduction alternative to: long shard = threadID % this.lockShardGrain; + // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + int shard = (int)((threadId * (long)this.lockShardGrain) >> 32); + int processQueue = (int)((threadId * (long)this.workers) >> 32); + if (!shutdown) { - messages.put(message); + messages[shard].put(message); + processorWorkQueue[processQueue].put(shard); return true; } } catch (InterruptedException e) { diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 9c6bd074..cdfa11d2 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -11,11 +11,19 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { - private final Queue messages; - private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate. + private final Queue[] messages; + private final Queue[] processorWorkQueue; + private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate. private class ProcessingTask extends StatsDProcessor.ProcessingTask { + private final int processorQueueId; + + public ProcessingTask(int id) { + super(); + this.processorQueueId = id; + } + @Override public void run() { ByteBuffer sendBuffer; @@ -31,7 +39,8 @@ public void run() { aggregator.start(); - while (!((emptyHighPrio = highPrioMessages.isEmpty()) && (empty = messages.isEmpty()) && shutdown)) { + while (!((emptyHighPrio = highPrioMessages.isEmpty()) && + (empty = processorWorkQueue[this.processorQueueId].isEmpty()) && shutdown)) { try { @@ -48,12 +57,14 @@ public void run() { if (!emptyHighPrio) { message = highPrioMessages.poll(); } else { - message = messages.poll(); + + final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); + message = messages[messageQueueIdx].poll(); } if (message != null) { - qsize.decrementAndGet(); + qsize[messageQueueIdx].decrementAndGet(); // TODO: Aggregate and fix, there's some duplicate logic if (aggregator.aggregateMessage(message)) { @@ -88,7 +99,7 @@ public void run() { writeBuilderToSendBuffer(sendBuffer); } - if (null == messages.peek()) { + if (null == processorWorkQueue[this.processorQueueId].peek()) { outboundQueue.put(sendBuffer); sendBuffer = bufferPool.borrow(); } @@ -112,12 +123,24 @@ public void run() { StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, - final int aggregatorFlushInterval, final int aggregatorShards) - throws Exception { + final int lockShardGrain, final int aggregatorFlushInterval, + final int aggregatorShards) throws Exception { - super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, aggregatorFlushInterval, aggregatorShards); + super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ConcurrentLinkedQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ConcurrentLinkedQueue(); + this.qsize[i].set(0); + } + + this.processorWorkQueue = new ConcurrentLinkedQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ConcurrentLinkedQueue(); + } } @Override @@ -134,9 +157,16 @@ protected ProcessingTask createProcessingTask() { @Override protected boolean send(final Message message) { if (!shutdown) { - if (qsize.get() < qcapacity) { - messages.offer(message); - qsize.incrementAndGet(); + long threadId = Thread.currentThread().getId(); + // modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain; + // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + int shard = (int)((threadId * (long)this.lockShardGrain) >> 32); + int processQueue = (int)((threadId * (long)this.workers) >> 32); + + if (qsize[shard].get() < qcapacity) { + messages[shard].offer(message); + qsize[shard].incrementAndGet(); + processorWorkQueue[processQueue].offer(shard); return true; } } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 67f40d0e..57e3bd95 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -36,6 +36,7 @@ public abstract class StatsDProcessor implements Runnable { protected final int workers; protected final int qcapacity; + protected final int lockShardGrain; protected StatsDAggregator aggregator; protected volatile Telemetry telemetry; @@ -48,6 +49,11 @@ protected abstract class ProcessingTask implements Runnable { protected final CharsetEncoder utf8Encoder = MESSAGE_CHARSET.newEncoder() .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE); + protected final int processorQueueId; + + public ProcessingTask(int id) { + this.processorQueueId = id; + } public abstract void run(); @@ -69,12 +75,14 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, - final int aggregatorFlushInterval, final int aggregatorShards) + final int lockShardGrain, final int aggregatorFlushInterval, + final int aggregatorShards) throws Exception { this.handler = handler; this.workers = workers; this.qcapacity = queueSize; + this.lockShardGrain = lockShardGrain; this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() { final ThreadFactory delegate = Executors.defaultThreadFactory(); diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index a11ab52f..4ede8cb0 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1240,10 +1240,10 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD String[] constantTags, final StatsDClientErrorHandler errorHandler, Callable addressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking) throws StatsDClientException { + final int senderWorkers, final int lockShardGrain, boolean blocking) throws StatsDClientException { super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,bufferSize, - maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0, 0, 0); + maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, false, 0, 0, 0); lock = new CountDownLatch(1); } @@ -1266,11 +1266,11 @@ public SlowStatsDNonBlockingStatsDClient build() throws StatsDClientException { if (addressLookup != null) { return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, bufferPoolSize, - processorWorkers, senderWorkers, blocking); + processorWorkers, senderWorkers, lockShardGrain, blocking); } else { return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking); + entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking); } } } From 784f099e9f84089fabc0eb38210537ef7d2e4417 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Thu, 12 Mar 2020 16:18:10 -0400 Subject: [PATCH 2/6] Cache threadId for quicker lookup, use modulo --- .../timgroup/statsd/StatsDBlockingProcessor.java | 8 +++----- .../statsd/StatsDNonBlockingProcessor.java | 8 +++----- .../java/com/timgroup/statsd/StatsDProcessor.java | 14 ++++++++++++++ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 980dee01..1f9c73c4 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -129,11 +129,9 @@ protected ProcessingTask createProcessingTask() { @Override protected boolean send(final Message message) { try { - long threadId = Thread.currentThread().getId(); - // modulo reduction alternative to: long shard = threadID % this.lockShardGrain; - // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ - int shard = (int)((threadId * (long)this.lockShardGrain) >> 32); - int processQueue = (int)((threadId * (long)this.workers) >> 32); + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; if (!shutdown) { messages[shard].put(message); diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index cdfa11d2..e31bb3a6 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -157,11 +157,9 @@ protected ProcessingTask createProcessingTask() { @Override protected boolean send(final Message message) { if (!shutdown) { - long threadId = Thread.currentThread().getId(); - // modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain; - // ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ - int shard = (int)((threadId * (long)this.lockShardGrain) >> 32); - int processQueue = (int)((threadId * (long)this.workers) >> 32); + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; if (qsize[shard].get() < qcapacity) { messages[shard].offer(message); diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 57e3bd95..b11a5919 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -26,6 +26,9 @@ public abstract class StatsDProcessor implements Runnable { protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer"; protected static final int WAIT_SLEEP_MS = 10; // 10 ms would be a 100HZ slice + // Atomic integer containing the next thread ID to be assigned + private static final AtomicInteger nextId = new AtomicInteger(0); + protected final StatsDClientErrorHandler handler; protected final BufferPool bufferPool; @@ -33,6 +36,12 @@ public abstract class StatsDProcessor implements Runnable { protected final BlockingQueue outboundQueue; // FIFO queue with outbound buffers protected final ExecutorService executor; protected final CountDownLatch endSignal; + protected static final ThreadLocal threadId = new ThreadLocal() { + @Override + protected Integer initialValue() { + return nextId.getAndIncrement(); + } + }; protected final int workers; protected final int qcapacity; @@ -154,6 +163,11 @@ public int getQcapacity() { return this.qcapacity; } + // Returns the current thread's unique ID, assigning it if necessary + public static int getThreadId() { + return threadId.get().intValue(); + } + @Override public void run() { From 183d3682e2cc274e52f847af6ef64367c64a3b05 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Wed, 8 Apr 2020 23:42:54 -0400 Subject: [PATCH 3/6] fix rebase collateral --- .../statsd/StatsDBlockingProcessor.java | 11 +++++--- .../statsd/StatsDNonBlockingProcessor.java | 7 +++--- .../com/timgroup/statsd/StatsDProcessor.java | 4 +-- .../com/timgroup/statsd/TelemetryTest.java | 25 +++++++++++-------- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 1f9c73c4..39a5836a 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -16,6 +16,10 @@ public class StatsDBlockingProcessor extends StatsDProcessor { private class ProcessingTask extends StatsDProcessor.ProcessingTask { + public ProcessingTask(int id) { + super(id); + } + @Override public void run() { ByteBuffer sendBuffer; @@ -76,7 +80,7 @@ public void run() { writeBuilderToSendBuffer(sendBuffer); } - if (null == messages.peek()) { + if (null == processorWorkQueue[this.processorQueueId].peek()) { outboundQueue.put(sendBuffer); sendBuffer = bufferPool.borrow(); } @@ -96,7 +100,6 @@ public void run() { aggregator.stop(); endSignal.countDown(); } - } StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, @@ -115,8 +118,8 @@ public void run() { } @Override - protected ProcessingTask createProcessingTask() { - return new ProcessingTask(); + protected ProcessingTask createProcessingTask(int id) { + return new ProcessingTask(id); } StatsDBlockingProcessor(final StatsDBlockingProcessor processor) diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index e31bb3a6..f90036aa 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -20,8 +20,7 @@ private class ProcessingTask extends StatsDProcessor.ProcessingTask { private final int processorQueueId; public ProcessingTask(int id) { - super(); - this.processorQueueId = id; + super(id); } @Override @@ -144,8 +143,8 @@ public void run() { } @Override - protected ProcessingTask createProcessingTask() { - return new ProcessingTask(); + protected ProcessingTask createProcessingTask(int id) { + return new ProcessingTask(id); } StatsDNonBlockingProcessor(final StatsDNonBlockingProcessor processor) throws Exception { diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index b11a5919..94043c97 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -137,7 +137,7 @@ public Thread newThread(final Runnable runnable) { processor.getAggregator().getFlushInterval()); } - protected abstract ProcessingTask createProcessingTask(); + protected abstract ProcessingTask createProcessingTask(int id); protected abstract boolean send(final Message message); @@ -172,7 +172,7 @@ public static int getThreadId() { public void run() { for (int i = 0 ; i < workers ; i++) { - executor.submit(createProcessingTask()); + executor.submit(createProcessingTask(i)); } boolean done = false; diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index 2a347114..d1fd40e8 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -31,11 +31,16 @@ public static class FakeProcessor extends StatsDProcessor { public final List messages = new ArrayList<>(); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0); + super(0, handler, 0, 1, 1, 1, 0, 0); } private class FakeProcessingTask extends StatsDProcessor.ProcessingTask { + + public FakeProcessingTask(int id) { + super(id); + } + @Override public void run() {} } @@ -50,8 +55,8 @@ public boolean send(final Message msg) { public void run(){} @Override - protected ProcessingTask createProcessingTask() { - return new FakeProcessingTask(); + protected ProcessingTask createProcessingTask(int id) { + return new FakeProcessingTask(id); } public List getMessages() { @@ -82,12 +87,12 @@ public StatsDNonBlockingTelemetry(final String prefix, final int queueSize, Stri final StatsDClientErrorHandler errorHandler, Callable addressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking, final boolean enableTelemetry, - final int telemetryFlushInterval) + final int senderWorkers, final int lockShardGrain, boolean blocking, + final boolean enableTelemetry, final int telemetryFlushInterval) throws StatsDClientException { super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, 0, 0); + lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } }; @@ -98,13 +103,13 @@ public StatsDNonBlockingTelemetry build() throws StatsDClientException { if (addressLookup != null) { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, - bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval); + bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, + enableTelemetry, telemetryFlushInterval); } else { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes, - entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry, - telemetryFlushInterval); + entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, + enableTelemetry, telemetryFlushInterval); } } } From bcc863fcd05cd332dc495991a995f74bf1284728 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Thu, 29 Oct 2020 18:00:38 -0400 Subject: [PATCH 4/6] [test] fixing test to account for sharding --- .../statsd/NonBlockingStatsDClient.java | 4 +- .../statsd/StatsDBlockingProcessor.java | 13 +++- .../statsd/StatsDNonBlockingProcessor.java | 21 ++++-- .../com/timgroup/statsd/StatsDProcessor.java | 1 + .../timgroup/statsd/StatsDAggregatorTest.java | 65 +++++++++++++++---- 5 files changed, 81 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index f60e464d..c1f07c7b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -324,7 +324,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ // similar settings, but a single worker and non-blocking. telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, - poolSize, 1, false, 0, aggregationShards); + poolSize, 1, 1, false, 0, aggregationShards); } this.telemetry = new Telemetry(telemetrytags, telemetryStatsDProcessor); @@ -1001,7 +1001,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[ this(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout, bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, - blocking, enableTelemetry, telemetryFlushInterval, 0, 0); + DEFAULT_LOCK_SHARD_GRAIN, blocking, enableTelemetry, telemetryFlushInterval, 0, 0); } protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 39a5836a..95efb816 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -107,10 +107,12 @@ public void run() { final int aggregatorFlushInterval, final int aggregatorShards) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); + this.messages = new ArrayBlockingQueue[lockShardGrain]; for (int i = 0 ; i < lockShardGrain ; i++) { this.messages[i] = new ArrayBlockingQueue(queueSize); } + this.processorWorkQueue = new ArrayBlockingQueue[workers]; for (int i = 0 ; i < workers ; i++) { this.processorWorkQueue[i] = new ArrayBlockingQueue(queueSize); @@ -126,7 +128,16 @@ protected ProcessingTask createProcessingTask(int id) { throws Exception { super(processor); - this.messages = new ArrayBlockingQueue<>(processor.getQcapacity()); + + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.messages[i] = new ArrayBlockingQueue(getQcapacity()); + } + + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(getQcapacity()); + } } @Override diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index f90036aa..7d4282de 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -17,8 +17,6 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor { private class ProcessingTask extends StatsDProcessor.ProcessingTask { - private final int processorQueueId; - public ProcessingTask(int id) { super(id); } @@ -28,6 +26,7 @@ public void run() { ByteBuffer sendBuffer; boolean empty = true; boolean emptyHighPrio = true; + int messageQueueIdx = 0; try { sendBuffer = bufferPool.borrow(); @@ -57,7 +56,7 @@ public void run() { message = highPrioMessages.poll(); } else { - final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); + messageQueueIdx = processorWorkQueue[this.processorQueueId].poll(); message = messages[messageQueueIdx].poll(); } @@ -126,7 +125,6 @@ public void run() { final int aggregatorShards) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); - this.qsize = new AtomicInteger(0); this.qsize = new AtomicInteger[lockShardGrain]; this.messages = new ConcurrentLinkedQueue[lockShardGrain]; @@ -149,8 +147,19 @@ protected ProcessingTask createProcessingTask(int id) { StatsDNonBlockingProcessor(final StatsDNonBlockingProcessor processor) throws Exception { super(processor); - this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ConcurrentLinkedQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ConcurrentLinkedQueue(); + this.qsize[i].set(0); + } + + this.processorWorkQueue = new ConcurrentLinkedQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ConcurrentLinkedQueue(); + } } @Override diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 94043c97..0ced985c 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -116,6 +116,7 @@ public Thread newThread(final Runnable runnable) { this.handler = processor.handler; this.workers = processor.workers; + this.lockShardGrain = processor.lockShardGrain; this.qcapacity = processor.getQcapacity(); this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() { diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 185ce740..34bf0cdb 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -19,6 +19,7 @@ import java.util.Queue; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,22 +82,42 @@ protected void writeTo(StringBuilder builder){} // fakeProcessor store messages from the telemetry only public static class FakeProcessor extends StatsDProcessor { - private final Queue messages; + private final Queue[] messages; + private final Queue[] processorWorkQueue; + private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate. private final AtomicInteger messageSent = new AtomicInteger(0); private final AtomicInteger messageAggregated = new AtomicInteger(0); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0); - this.messages = new ConcurrentLinkedQueue<>(); - } + super(50, handler, 0, 1, 1, 1, 0, StatsDAggregator.DEFAULT_SHARDS); + + // 1 queue (lockShardGrain = 1) + this.qsize = new AtomicInteger[lockShardGrain]; + this.messages = new ArrayBlockingQueue[lockShardGrain]; + for (int i = 0 ; i < lockShardGrain ; i++) { + this.qsize[i] = new AtomicInteger(); + this.messages[i] = new ArrayBlockingQueue(getQcapacity()); + this.qsize[i].set(0); + } + // 1 worker + this.processorWorkQueue = new ArrayBlockingQueue[workers]; + for (int i = 0 ; i < workers ; i++) { + this.processorWorkQueue[i] = new ArrayBlockingQueue(getQcapacity()); + } + } private class FakeProcessingTask extends StatsDProcessor.ProcessingTask { + + public FakeProcessingTask(int id) { + super(id); + } + @Override public void run() { while (!shutdown) { - final Message message = messages.poll(); + final Message message = messages[0].poll(); if (message == null) { try{ @@ -106,6 +127,7 @@ public void run() { continue; } + qsize[0].decrementAndGet(); if (aggregator.aggregateMessage(message)) { messageAggregated.incrementAndGet(); continue; @@ -118,23 +140,38 @@ public void run() { } @Override - protected StatsDProcessor.ProcessingTask createProcessingTask() { - return new FakeProcessingTask(); + protected StatsDProcessor.ProcessingTask createProcessingTask(int id) { + return new FakeProcessingTask(id); } @Override public boolean send(final Message msg) { - messages.offer(msg); - return true; + if (!shutdown) { + int threadId = getThreadId(); + int shard = threadId % lockShardGrain; + int processQueue = threadId % workers; + + if (qsize[shard].get() < qcapacity) { + messages[shard].offer(msg); + qsize[shard].incrementAndGet(); + processorWorkQueue[processQueue].offer(shard); + return true; + } + } + + return false; } - public Queue getMessages() { - return messages; + public Queue getMessages(int id) { + return messages[id]; } public void clear() { try { - messages.clear(); + + for (int i = 0 ; i < lockShardGrain ; i++) { + messages[i].clear(); + } highPrioMessages.clear(); } catch (Exception e) {} } @@ -181,7 +218,7 @@ public void aggregate_messages() throws Exception { fakeProcessor.send(new FakeAlphaMessage("some.set", Message.Type.SET, "value")); } - waitForQueueSize(fakeProcessor.messages, 0); + waitForQueueSize(fakeProcessor.messages[0], 0); // 10 gauges, 10 counts, 10 sets assertEquals(30, fakeProcessor.messageAggregated.get()); @@ -206,7 +243,7 @@ public void aggregation_sharding() throws Exception { fakeProcessor.send(gauge); } - waitForQueueSize(fakeProcessor.messages, 0); + waitForQueueSize(fakeProcessor.messages[0], 0); for (int i=0 ; i map = fakeProcessor.aggregator.aggregateMetrics.get(i); From d00bc54d8a50114dd3bf625344fd911906b34e68 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Fri, 30 Oct 2020 13:59:50 +0100 Subject: [PATCH 5/6] [style] address cops --- .../java/com/timgroup/statsd/StatsDBlockingProcessor.java | 6 ++++-- .../com/timgroup/statsd/StatsDNonBlockingProcessor.java | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index 95efb816..7fb76bc1 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -35,7 +35,8 @@ public void run() { aggregator.start(); - while (!((emptyHighPrio = highPrioMessages.isEmpty()) && processorWorkQueue[this.processorQueueId].isEmpty() && shutdown)) { + while (!((emptyHighPrio = highPrioMessages.isEmpty()) + && processorWorkQueue[this.processorQueueId].isEmpty() && shutdown)) { try { @@ -106,7 +107,8 @@ public void run() { final int maxPacketSizeBytes, final int poolSize, final int workers, final int lockShardGrain, final int aggregatorFlushInterval, final int aggregatorShards) throws Exception { - super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); + super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, + lockShardGrain, aggregatorFlushInterval, aggregatorShards); this.messages = new ArrayBlockingQueue[lockShardGrain]; for (int i = 0 ; i < lockShardGrain ; i++) { diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 7d4282de..2aee5bb9 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -37,8 +37,8 @@ public void run() { aggregator.start(); - while (!((emptyHighPrio = highPrioMessages.isEmpty()) && - (empty = processorWorkQueue[this.processorQueueId].isEmpty()) && shutdown)) { + while (!((emptyHighPrio = highPrioMessages.isEmpty()) + && (empty = processorWorkQueue[this.processorQueueId].isEmpty()) && shutdown)) { try { @@ -124,7 +124,8 @@ public void run() { final int lockShardGrain, final int aggregatorFlushInterval, final int aggregatorShards) throws Exception { - super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards); + super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, + lockShardGrain, aggregatorFlushInterval, aggregatorShards); this.qsize = new AtomicInteger[lockShardGrain]; this.messages = new ConcurrentLinkedQueue[lockShardGrain]; From f3165a7b38b4dc903e33e231cc293d11cbcb0356 Mon Sep 17 00:00:00 2001 From: Jaime Fullaondo Date: Mon, 2 Nov 2020 14:34:29 +0100 Subject: [PATCH 6/6] [uds] set better defaults for mac packet size --- src/main/java/com/timgroup/statsd/BufferPool.java | 4 ++++ .../timgroup/statsd/NonBlockingStatsDClient.java | 3 ++- .../statsd/NonBlockingStatsDClientBuilder.java | 12 ++++++++++-- .../statsd/NonBlockingStatsDClientTest.java | 15 +++++++++++++-- .../java/com/timgroup/statsd/TelemetryTest.java | 13 +++++++++++-- .../java/com/timgroup/statsd/UnixSocketTest.java | 6 ++++++ 6 files changed, 46 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/BufferPool.java b/src/main/java/com/timgroup/statsd/BufferPool.java index 07e41654..298236de 100644 --- a/src/main/java/com/timgroup/statsd/BufferPool.java +++ b/src/main/java/com/timgroup/statsd/BufferPool.java @@ -54,6 +54,10 @@ int getSize() { return size; } + int getBufferSize() { + return bufferSize; + } + int available() { return pool.size(); } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index c1f07c7b..5a8c4282 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -83,7 +83,8 @@ String tag() { } } - public static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 1400; + public static final int DEFAULT_UDP_MAX_PACKET_SIZE_BYTES = 1432; + public static final int DEFAULT_UDS_MAX_PACKET_SIZE_BYTES = 8192; public static final int DEFAULT_QUEUE_SIZE = 4096; public static final int DEFAULT_POOL_SIZE = 512; public static final int DEFAULT_PROCESSOR_WORKERS = 1; diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index a462b941..8e45a3c6 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -16,13 +16,13 @@ public class NonBlockingStatsDClientBuilder { * See https://github.com/DataDog/java-dogstatsd-client/pull/17 for discussion. */ + public int maxPacketSizeBytes = 0; public int port = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; public int telemetryPort = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; public int queueSize = NonBlockingStatsDClient.DEFAULT_QUEUE_SIZE; public int timeout = NonBlockingStatsDClient.SOCKET_TIMEOUT_MS; public int bufferPoolSize = NonBlockingStatsDClient.DEFAULT_POOL_SIZE; public int socketBufferSize = NonBlockingStatsDClient.SOCKET_BUFFER_BYTES; - public int maxPacketSizeBytes = NonBlockingStatsDClient.DEFAULT_MAX_PACKET_SIZE_BYTES; public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS; public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS; public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING; @@ -171,6 +171,8 @@ public NonBlockingStatsDClientBuilder aggregationShards(int val) { * @return the built NonBlockingStatsDClient. */ public NonBlockingStatsDClient build() throws StatsDClientException { + + int packetSize = maxPacketSizeBytes; Callable lookup = addressLookup; Callable telemetryLookup = telemetryAddressLookup; @@ -178,6 +180,12 @@ public NonBlockingStatsDClient build() throws StatsDClientException { lookup = staticStatsDAddressResolution(hostname, port); } + if (packetSize == 0) { + packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : + NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } + + if (telemetryLookup == null) { if (telemetryHostname == null) { telemetryLookup = lookup; @@ -187,7 +195,7 @@ public NonBlockingStatsDClient build() throws StatsDClientException { } return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - lookup, telemetryLookup, timeout, socketBufferSize, maxPacketSizeBytes, + lookup, telemetryLookup, timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval, (enableAggregation ? aggregationFlushInterval : 0), aggregationShards); diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 4ede8cb0..b41df345 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -61,6 +61,11 @@ public void clear() { server.clear(); } + @Test + public void assert_default_udp_size() throws Exception { + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES); + } + @Test(timeout = 5000L) public void sends_counter_value_to_statsd() throws Exception { @@ -1263,13 +1268,19 @@ private static class SlowStatsDNonBlockingStatsDClientBuilder extends NonBlockin @Override public SlowStatsDNonBlockingStatsDClient build() throws StatsDClientException { + int packetSize = maxPacketSizeBytes; + if (packetSize == 0) { + packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : + NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } + if (addressLookup != null) { return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, bufferPoolSize, + addressLookup, timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking); } else { return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler, - staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes, + staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking); } } diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index d1fd40e8..791ec182 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -100,14 +100,21 @@ private static class StatsDNonBlockingTelemetryBuilder extends NonBlockingStatsD @Override public StatsDNonBlockingTelemetry build() throws StatsDClientException { + + int packetSize = maxPacketSizeBytes; + if (packetSize == 0) { + packetSize = (port == 0) ? NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES : + NonBlockingStatsDClient.DEFAULT_UDP_MAX_PACKET_SIZE_BYTES; + } + if (addressLookup != null) { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, - addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, + addressLookup, timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval); } else { return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler, - staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes, + staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, packetSize, entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval); } @@ -352,6 +359,8 @@ public void telemetry_flushInterval() throws Exception { public void telemetry_droppedData() throws Exception { clientError.telemetry.reset(); + assertThat(clientError.statsDProcessor.bufferPool.getBufferSize(), equalTo(8192)); + clientError.gauge("gauge", 24); // leaving time to the server to flush metrics diff --git a/src/test/java/com/timgroup/statsd/UnixSocketTest.java b/src/test/java/com/timgroup/statsd/UnixSocketTest.java index 07543d12..0f3faeec 100644 --- a/src/test/java/com/timgroup/statsd/UnixSocketTest.java +++ b/src/test/java/com/timgroup/statsd/UnixSocketTest.java @@ -14,6 +14,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertEquals; public class UnixSocketTest implements StatsDClientErrorHandler { private static File tmpFolder; @@ -62,6 +63,11 @@ public void stop() throws Exception { server.close(); } + @Test + public void assert_default_uds_size() throws Exception { + assertEquals(client.statsDProcessor.bufferPool.getBufferSize(), NonBlockingStatsDClient.DEFAULT_UDS_MAX_PACKET_SIZE_BYTES); + } + @Test(timeout = 5000L) public void sends_to_statsd() throws Exception { for(long i = 0; i < 5 ; i++) {