Skip to content

Commit

Permalink
Change the streaming mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
IoannisPanagiotas committed Aug 6, 2024
1 parent d95ee98 commit 2b3ec3a
Showing 1 changed file with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class WeightedAllShortestPaths extends MSBFSASPAlgorithm {
private final ExecutorService executorService;
private final Graph graph;
private final AtomicInteger counter; // nodeId counter (init with nodeCount, counts down for each node)

private final AtomicInteger runningTaskCounter = new AtomicInteger(0);
private volatile boolean outputStreamOpen;

public WeightedAllShortestPaths(Graph graph, ExecutorService executorService, Concurrency concurrency, TerminationFlag terminationFlag) {
Expand Down Expand Up @@ -93,6 +93,7 @@ public Stream<AllShortestPathsStreamResult> compute() {

for (int i = 0; i < concurrency.value(); i++) {
executorService.submit(new ShortestPathTask());
runningTaskCounter.incrementAndGet();
}

return AllShortestPathsStream.stream(resultQueue, () -> {
Expand Down Expand Up @@ -125,20 +126,23 @@ public void run() {
int startNode;
while (outputStreamOpen && terminationFlag.running() && (startNode = counter.getAndIncrement()) < nodeCount) {
compute(startNode);
for (int i = 0; i < nodeCount; i++) {
var result = AllShortestPathsStreamResult.result(
graph.toOriginalNodeId(startNode),
graph.toOriginalNodeId(i),
distance[i]
);
try {
resultQueue.put(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
progressTracker.logProgress();
}
if (runningTaskCounter.decrementAndGet() == 0 && outputStreamOpen) {
resultQueue.add(AllShortestPathsStreamResult.DONE);
}
}

private void streamResult(int source, int target, double distance){
var result = AllShortestPathsStreamResult.result(
graph.toOriginalNodeId(source),
graph.toOriginalNodeId(target),
distance
);
try {
resultQueue.put(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

Expand All @@ -149,6 +153,7 @@ void compute(int startNode) {
while (outputStreamOpen && !queue.isEmpty()) {
final int node = queue.pop();
final double sourceDistance = distance[node];
streamResult(startNode,node,sourceDistance);
threadLocalGraph.forEachRelationship(
node,
Double.NaN,
Expand All @@ -162,6 +167,7 @@ void compute(int startNode) {
return true;
}));
}
progressTracker.logProgress();
}
}
}

0 comments on commit 2b3ec3a

Please sign in to comment.