Skip to content

Commit

Permalink
Merge pull request #9381 from IoannisPanagiotas/do-not-throw-histogra…
Browse files Browse the repository at this point in the history
…m-hiccups

Refactoring to allow histogram failure testing and a  first solution
  • Loading branch information
IoannisPanagiotas authored Jul 17, 2024
2 parents 979f51d + 0905a43 commit b27050a
Show file tree
Hide file tree
Showing 31 changed files with 752 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public WRITE_RESULT build() {
var communityCountAndHistogram = communityCountAndHistogram(
nodeCount,
communityFunction,
HistogramProvider::new,
executorService,
concurrency
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.result;

import org.HdrHistogram.DoubleHistogram;
import org.neo4j.gds.core.utils.partition.Partition;

import java.util.function.LongToDoubleFunction;
import java.util.function.Supplier;

class CentralityRecordTask implements Runnable {

private final DoubleHistogram histogram;
private final Partition partition;
private final LongToDoubleFunction centralityFunction;

CentralityRecordTask(
Partition partition,
LongToDoubleFunction centralityFunction,
Supplier<DoubleHistogram> histogramSupplier
) {
this.partition = partition;
this.centralityFunction = centralityFunction;
this.histogram = histogramSupplier.get();
}

@Override
public void run() {
partition.consume(id -> {
histogram.recordValue(centralityFunction.applyAsDouble(id));
});
}

DoubleHistogram histogram() {return histogram;}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.neo4j.gds.core.concurrency.Concurrency;
import org.neo4j.gds.core.concurrency.ParallelUtil;
import org.neo4j.gds.core.utils.ProgressTimer;
import org.neo4j.gds.core.utils.partition.Partition;
import org.neo4j.gds.core.utils.partition.PartitionUtils;

import java.util.Collections;
Expand All @@ -33,92 +32,117 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongToDoubleFunction;
import java.util.function.Supplier;

public final class CentralityStatistics {

public static DoubleHistogram histogram(
static DoubleHistogram histogram(
long nodeCount,
LongToDoubleFunction centralityFunction,
ExecutorService executorService,
Concurrency concurrency
) {
DoubleHistogram histogram;
return histogram(
nodeCount,
() -> new DoubleHistogram(ProcedureConstants.HISTOGRAM_PRECISION_DEFAULT),
centralityFunction,
executorService,
concurrency
);
}

static DoubleHistogram histogram(
long nodeCount,
Supplier<DoubleHistogram> histogramSupplier,
LongToDoubleFunction centralityFunction,
ExecutorService executorService,
Concurrency concurrency
) {
var histogram = histogramSupplier.get();
if (concurrency.value() == 1) {
histogram = new DoubleHistogram(ProcedureConstants.HISTOGRAM_PRECISION_DEFAULT);
for (long id = 0; id < nodeCount; id++) {
histogram.recordValue(centralityFunction.applyAsDouble(id));
}
} else {
var tasks = PartitionUtils.rangePartition(
concurrency,
nodeCount,
partition -> new RecordTask(partition, centralityFunction),
partition -> new CentralityRecordTask(partition, centralityFunction, histogramSupplier),
Optional.empty()
);

ParallelUtil.run(tasks, executorService);

histogram = new DoubleHistogram(ProcedureConstants.HISTOGRAM_PRECISION_DEFAULT);
for (var task : tasks) {
histogram.add(task.histogram);
histogram.add(task.histogram());
}
}
return histogram;
}

private CentralityStatistics() {}

private static class RecordTask implements Runnable {

private final DoubleHistogram histogram;
private final Partition partition;
private final LongToDoubleFunction centralityFunction;

RecordTask(Partition partition, LongToDoubleFunction centralityFunction) {
this.partition = partition;
this.centralityFunction = centralityFunction;
this.histogram = new DoubleHistogram(ProcedureConstants.HISTOGRAM_PRECISION_DEFAULT);
}

@Override
public void run() {
partition.consume(id -> {
histogram.recordValue(centralityFunction.applyAsDouble(id));
});
}
public static CentralityStats centralityStatistics(
long nodeCount,
LongToDoubleFunction centralityProvider,
ExecutorService executorService,
Concurrency concurrency,
boolean shouldCompute
) {
return computeCentralityStatistics(
nodeCount,
centralityProvider,
executorService,
concurrency,
() -> new DoubleHistogram(ProcedureConstants.HISTOGRAM_PRECISION_DEFAULT),
shouldCompute
);
}

public static CentralityStats centralityStatistics(
public static CentralityStats computeCentralityStatistics(
long nodeCount,
LongToDoubleFunction centralityProvider,
ExecutorService executorService,
Concurrency concurrency,
Supplier<DoubleHistogram> histogramSupplier,
boolean shouldCompute
) {
Optional<DoubleHistogram> maybeHistogram = Optional.empty();
var computeMilliseconds = new AtomicLong(0);

try (var ignored = ProgressTimer.start(computeMilliseconds::set)) {
if (shouldCompute) {
maybeHistogram = Optional.of(histogram(
var histogram = histogram(
nodeCount,
histogramSupplier,
centralityProvider,
executorService,
concurrency
));
);
maybeHistogram = Optional.of(histogram);
}
} catch (Exception e) {
if (e.getMessage().contains("is out of bounds for histogram, current covered range")) {
return new CentralityStats(Optional.empty(), computeMilliseconds.get(), false);
} else {
throw e;
}

}

return new CentralityStats(maybeHistogram, computeMilliseconds.get());
return new CentralityStats(maybeHistogram, computeMilliseconds.get(), true);
}

public static Map<String, Object> centralitySummary(Optional<DoubleHistogram> histogram) {
public static Map<String, Object> centralitySummary(Optional<DoubleHistogram> histogram, boolean success) {
if (!success) {
return HistogramUtils.failure();
}
return histogram
.map(HistogramUtils::centralitySummary)
.orElseGet(Collections::emptyMap);
}

public record CentralityStats(Optional<DoubleHistogram> histogram, long computeMilliseconds) {
public record CentralityStats(Optional<DoubleHistogram> histogram, long computeMilliseconds, boolean success) {
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.result;

import com.carrotsearch.hppc.LongLongHashMap;
import com.carrotsearch.hppc.LongLongMap;
import com.carrotsearch.hppc.procedures.LongLongProcedure;
import org.neo4j.gds.collections.hsa.HugeSparseLongArray;

import java.util.function.LongUnaryOperator;

class CommunityAddTask implements Runnable {

private final HugeSparseLongArray.Builder builder;

private final LongUnaryOperator communityFunction;

private final long startId;
private final long length;

// Use local buffer to avoid contention on GrowingBuilder.add().
// This is especially useful, if the input has a skewed
// distribution, i.e. most nodes end up in the same community.
private final LongLongMap buffer;

CommunityAddTask(
HugeSparseLongArray.Builder builder,
LongUnaryOperator communityFunction,
long startId,
long length
) {
this.builder = builder;
this.communityFunction = communityFunction;
this.startId = startId;
this.length = length;
// safe cast, since max batch size less than Integer.MAX_VALUE
this.buffer = new LongLongHashMap((int) length);
}

@Override
public void run() {
var endId = startId + length;
for (long id = startId; id < endId; id++) {
buffer.addTo(communityFunction.applyAsLong(id), 1L);
}
buffer.forEach((LongLongProcedure) builder::addTo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.result;

import org.HdrHistogram.Histogram;
import org.neo4j.gds.collections.hsa.HugeSparseLongArray;
import org.neo4j.gds.core.utils.partition.Partition;

class CommunityCountAndRecordTask implements Runnable {

private final HugeSparseLongArray communitySizes;

private final Partition partition;

private final Histogram histogram;

private long count;

CommunityCountAndRecordTask(
HugeSparseLongArray communitySizes,
Partition partition,
HistogramProvider histogramProvider
) {
this.communitySizes = communitySizes;
this.partition = partition;
this.histogram = histogramProvider.get();
}

@Override
public void run() {
partition.consume(id -> {
long communitySize = communitySizes.get(id);
if (communitySize != CommunityStatistics.EMPTY_COMMUNITY) {
count++;
histogram.recordValue(communitySize);
}
});
}

public long count(){
return count;
}

public Histogram histogram(){
return histogram;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.gds.result;

import org.neo4j.gds.collections.hsa.HugeSparseLongArray;
import org.neo4j.gds.core.utils.partition.Partition;

class CommunityCountTask implements Runnable {

private final HugeSparseLongArray communitySizes;

private final Partition partition;

private long count;

CommunityCountTask(HugeSparseLongArray communitySizes, Partition partition) {
this.communitySizes = communitySizes;
this.partition = partition;
}

@Override
public void run() {
partition.consume(id -> {
if (communitySizes.get(id) != CommunityStatistics.EMPTY_COMMUNITY) {
count++;
}
});
}

long count() {
return count;
}
}
Loading

0 comments on commit b27050a

Please sign in to comment.