Skip to content

Commit

Permalink
fix rebase collateral
Browse files Browse the repository at this point in the history
  • Loading branch information
truthbk committed Apr 9, 2020
1 parent 805c073 commit f0f6bf8
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ public NonBlockingStatsDClient build() throws StatsDClientException {
enableTelemetry, telemetryFlushInterval);
} else {
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,
entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry,
telemetryFlushInterval);
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize,
maxPacketSizeBytes, entityID, bufferPoolSize, processorWorkers, senderWorkers,
lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval);
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ public class StatsDBlockingProcessor extends StatsDProcessor {

private class ProcessingTask extends StatsDProcessor.ProcessingTask {

public ProcessingTask(int id) {
super(id);
}

@Override
public void run() {
boolean empty;
Expand Down Expand Up @@ -63,7 +67,7 @@ public void run() {
writeBuilderToSendBuffer(sendBuffer);
}

if (null == messages.peek()) {
if (null == processorWorkQueue[this.processorQueueId].peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
Expand All @@ -82,7 +86,6 @@ public void run() {
builder.trimToSize();
endSignal.countDown();
}

}

StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
Expand All @@ -100,13 +103,11 @@ public void run() {
for (int i = 0 ; i < workers ; i++) {
this.processorWorkQueue[i] = new ArrayBlockingQueue<Integer>(queueSize);
}

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers);
}

@Override
protected ProcessingTask createProcessingTask() {
return new ProcessingTask();
protected ProcessingTask createProcessingTask(int id) {
return new ProcessingTask(id);
}

@Override
Expand Down
73 changes: 7 additions & 66 deletions src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor {
private final int qcapacity;
private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate.

private class ProcessingTask implements Runnable {
private final int processorQueueId;
private class ProcessingTask extends StatsDProcessor.ProcessingTask {

public ProcessingTask(int id) {
this.processorQueueId = id;
super(id);
}

@Override
public void run() {
boolean empty;
ByteBuffer sendBuffer;
Expand All @@ -48,68 +48,9 @@ public void run() {
}

final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll();
final String message = messages[messageQueueIdx].poll();
final Message message = messages[messageQueueIdx].poll();
if (message != null) {
qsize[messageQueueIdx].decrementAndGet();
final byte[] data = message.getBytes(MESSAGE_CHARSET);
if (sendBuffer.capacity() < data.length) {
throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
}
if (sendBuffer.remaining() < (data.length + 1)) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
if (sendBuffer.position() > 0) {
sendBuffer.put((byte) '\n');
}
sendBuffer.put(data);
if (null == processorWorkQueue[this.processorQueueId].peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
}
} catch (final InterruptedException e) {
if (shutdown) {
endSignal.countDown();
return;
}
} catch (final Exception e) {
handler.handle(e);
}
}
endSignal.countDown();
}
}

private class ProcessingTask extends StatsDProcessor.ProcessingTask {

@Override
public void run() {
boolean empty;
ByteBuffer sendBuffer;

try {
sendBuffer = bufferPool.borrow();
} catch (final InterruptedException e) {
handler.handle(e);
return;
}

while (!((empty = messages.isEmpty()) && shutdown)) {

try {
if (empty) {
Thread.sleep(WAIT_SLEEP_MS);
continue;
}

if (Thread.interrupted()) {
return;
}
final Message message = messages.poll();
if (message != null) {

qsize.decrementAndGet();
builder.setLength(0);

message.writeTo(builder);
Expand Down Expand Up @@ -137,7 +78,7 @@ public void run() {
writeBuilderToSendBuffer(sendBuffer);
}

if (null == messages.peek()) {
if (null == processorWorkQueue[this.processorQueueId].peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
Expand Down Expand Up @@ -179,8 +120,8 @@ public void run() {
}

@Override
protected ProcessingTask createProcessingTask() {
return new ProcessingTask();
protected ProcessingTask createProcessingTask(int id) {
return new ProcessingTask(id);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
this.endSignal = new CountDownLatch(workers);
}

protected abstract ProcessingTask createProcessingTask();
protected abstract ProcessingTask createProcessingTask(int id);

protected abstract boolean send(final Message message);

Expand All @@ -115,7 +115,7 @@ public static int getThreadId() {
public void run() {

for (int i = 0 ; i < workers ; i++) {
executor.submit(createProcessingTask());
executor.submit(createProcessingTask(i));
}

boolean done = false;
Expand Down
27 changes: 16 additions & 11 deletions src/test/java/com/timgroup/statsd/TelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ public static class FakeProcessor extends StatsDProcessor {
public final List<Message> messages = new ArrayList<>();

FakeProcessor(final StatsDClientErrorHandler handler) throws Exception {
super(0, handler, 0, 1, 1);
super(0, handler, 0, 1, 1, 1);
}


private class FakeProcessingTask extends StatsDProcessor.ProcessingTask {

public FakeProcessingTask(int id) {
super(id);
}

@Override
public void run() {}
}
Expand All @@ -50,8 +55,8 @@ public boolean send(final Message msg) {
public void run(){}

@Override
protected ProcessingTask createProcessingTask() {
return new FakeProcessingTask();
protected ProcessingTask createProcessingTask(int id) {
return new FakeProcessingTask(id);
}

public List<Message> getMessages() {
Expand Down Expand Up @@ -82,12 +87,12 @@ public StatsDNonBlockingTelemetry(final String prefix, final int queueSize, Stri
final StatsDClientErrorHandler errorHandler, Callable<SocketAddress> addressLookup,
final int timeout, final int bufferSize, final int maxPacketSizeBytes,
String entityID, final int poolSize, final int processorWorkers,
final int senderWorkers, boolean blocking, final boolean enableTelemetry,
final int telemetryFlushInterval)
final int senderWorkers, final int lockShardGrain, boolean blocking,
final boolean enableTelemetry, final int telemetryFlushInterval)
throws StatsDClientException {
super(prefix, queueSize, constantTags, errorHandler, addressLookup, timeout, bufferSize,
maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, blocking,
enableTelemetry, telemetryFlushInterval);
maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers, lockShardGrain,
blocking, enableTelemetry, telemetryFlushInterval);
}
};

Expand All @@ -98,13 +103,13 @@ public StatsDNonBlockingTelemetry build() throws StatsDClientException {
if (addressLookup != null) {
return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler,
addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID,
bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry,
telemetryFlushInterval);
bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking,
enableTelemetry, telemetryFlushInterval);
} else {
return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler,
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,
entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry,
telemetryFlushInterval);
entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking,
enableTelemetry, telemetryFlushInterval);
}
}
}
Expand Down

0 comments on commit f0f6bf8

Please sign in to comment.