Skip to content

Commit

Permalink
fix rebase collateral
Browse files Browse the repository at this point in the history
  • Loading branch information
truthbk committed Oct 13, 2020
1 parent 784f099 commit 183d368
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 20 deletions.
11 changes: 7 additions & 4 deletions src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ public class StatsDBlockingProcessor extends StatsDProcessor {

private class ProcessingTask extends StatsDProcessor.ProcessingTask {

public ProcessingTask(int id) {
super(id);
}

@Override
public void run() {
ByteBuffer sendBuffer;
Expand Down Expand Up @@ -76,7 +80,7 @@ public void run() {
writeBuilderToSendBuffer(sendBuffer);
}

if (null == messages.peek()) {
if (null == processorWorkQueue[this.processorQueueId].peek()) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
Expand All @@ -96,7 +100,6 @@ public void run() {
aggregator.stop();
endSignal.countDown();
}

}

StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
Expand All @@ -115,8 +118,8 @@ public void run() {
}

@Override
protected ProcessingTask createProcessingTask() {
return new ProcessingTask();
protected ProcessingTask createProcessingTask(int id) {
return new ProcessingTask(id);
}

StatsDBlockingProcessor(final StatsDBlockingProcessor processor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ private class ProcessingTask extends StatsDProcessor.ProcessingTask {
private final int processorQueueId;

public ProcessingTask(int id) {
super();
this.processorQueueId = id;
super(id);
}

@Override
Expand Down Expand Up @@ -144,8 +143,8 @@ public void run() {
}

@Override
protected ProcessingTask createProcessingTask() {
return new ProcessingTask();
protected ProcessingTask createProcessingTask(int id) {
return new ProcessingTask(id);
}

StatsDNonBlockingProcessor(final StatsDNonBlockingProcessor processor) throws Exception {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Thread newThread(final Runnable runnable) {
processor.getAggregator().getFlushInterval());
}

protected abstract ProcessingTask createProcessingTask();
protected abstract ProcessingTask createProcessingTask(int id);

protected abstract boolean send(final Message message);

Expand Down Expand Up @@ -172,7 +172,7 @@ public static int getThreadId() {
public void run() {

for (int i = 0 ; i < workers ; i++) {
executor.submit(createProcessingTask());
executor.submit(createProcessingTask(i));
}

boolean done = false;
Expand Down
25 changes: 15 additions & 10 deletions src/test/java/com/timgroup/statsd/TelemetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ public static class FakeProcessor extends StatsDProcessor {
public final List<Message> messages = new ArrayList<>();

FakeProcessor(final StatsDClientErrorHandler handler) throws Exception {
super(0, handler, 0, 1, 1, 0, 0);
super(0, handler, 0, 1, 1, 1, 0, 0);
}


private class FakeProcessingTask extends StatsDProcessor.ProcessingTask {

public FakeProcessingTask(int id) {
super(id);
}

@Override
public void run() {}
}
Expand All @@ -50,8 +55,8 @@ public boolean send(final Message msg) {
public void run(){}

@Override
protected ProcessingTask createProcessingTask() {
return new FakeProcessingTask();
protected ProcessingTask createProcessingTask(int id) {
return new FakeProcessingTask(id);
}

public List<Message> getMessages() {
Expand Down Expand Up @@ -82,12 +87,12 @@ public StatsDNonBlockingTelemetry(final String prefix, final int queueSize, Stri
final StatsDClientErrorHandler errorHandler, Callable<SocketAddress> addressLookup,
final int timeout, final int bufferSize, final int maxPacketSizeBytes,
String entityID, final int poolSize, final int processorWorkers,
final int senderWorkers, boolean blocking, final boolean enableTelemetry,
final int telemetryFlushInterval)
final int senderWorkers, final int lockShardGrain, boolean blocking,
final boolean enableTelemetry, final int telemetryFlushInterval)
throws StatsDClientException {
super(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,
bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers,
blocking, enableTelemetry, telemetryFlushInterval, 0, 0);
lockShardGrain, blocking, enableTelemetry, telemetryFlushInterval, 0, 0);
}
};

Expand All @@ -98,13 +103,13 @@ public StatsDNonBlockingTelemetry build() throws StatsDClientException {
if (addressLookup != null) {
return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler,
addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID,
bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry,
telemetryFlushInterval);
bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking,
enableTelemetry, telemetryFlushInterval);
} else {
return new StatsDNonBlockingTelemetry(prefix, queueSize, constantTags, errorHandler,
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,
entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry,
telemetryFlushInterval);
entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking,
enableTelemetry, telemetryFlushInterval);
}
}
}
Expand Down

0 comments on commit 183d368

Please sign in to comment.