Skip to content

Commit

Permalink
Cache threadId for quicker lookup, use modulo
Browse files Browse the repository at this point in the history
  • Loading branch information
truthbk committed Apr 9, 2020
1 parent 56a3b7e commit 805c073
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,9 @@ protected ProcessingTask createProcessingTask() {
@Override
protected boolean send(final Message message) {
try {
long threadId = Thread.currentThread().getId();
// modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
// ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
int shard = (int)((threadId * (long)this.lockShardGrain) >> 32);
int processQueue = (int)((threadId * (long)this.workers) >> 32);
int threadId = getThreadId();
int shard = threadId % lockShardGrain;
int processQueue = threadId % workers;

if (!shutdown) {
messages[shard].put(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,9 @@ protected ProcessingTask createProcessingTask() {
@Override
protected boolean send(final Message message) {
if (!shutdown) {
long threadId = Thread.currentThread().getId();
// modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain;
// ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
int shard = (int)((threadId * (long)this.lockShardGrain) >> 32);
int processQueue = (int)((threadId * (long)this.workers) >> 32);
int threadId = getThreadId();
int shard = threadId % lockShardGrain;
int processQueue = threadId % workers;

if (qsize[shard].get() < qcapacity) {
messages[shard].offer(message);
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@ public abstract class StatsDProcessor implements Runnable {
protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer";
protected static final int WAIT_SLEEP_MS = 10; // 10 ms would be a 100HZ slice

// Atomic integer containing the next thread ID to be assigned
private static final AtomicInteger nextId = new AtomicInteger(0);

protected final StatsDClientErrorHandler handler;

protected final BufferPool bufferPool;
protected final BlockingQueue<ByteBuffer> outboundQueue; // FIFO queue with outbound buffers
protected final ExecutorService executor;
protected final CountDownLatch endSignal;
protected static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return nextId.getAndIncrement();
}
};

protected final int workers;
protected final int lockShardGrain;
Expand Down Expand Up @@ -97,6 +106,11 @@ public BlockingQueue<ByteBuffer> getOutboundQueue() {
return this.outboundQueue;
}

// Returns the current thread's unique ID, assigning it if necessary
public static int getThreadId() {
return threadId.get().intValue();
}

@Override
public void run() {

Expand Down

0 comments on commit 805c073

Please sign in to comment.