Skip to content

Commit

Permalink
create an operations procedure facade module and migrate listProgress…
Browse files Browse the repository at this point in the history
… in there
  • Loading branch information
lassewesth committed Jul 31, 2024
1 parent 958ea10 commit 1120a9e
Show file tree
Hide file tree
Showing 21 changed files with 332 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.gds.api.GraphLoaderContext;
import org.neo4j.gds.api.User;
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
import org.neo4j.gds.core.utils.progress.TaskStore;
import org.neo4j.gds.core.utils.warnings.UserLogRegistryFactory;
import org.neo4j.gds.core.utils.warnings.UserLogStore;
import org.neo4j.gds.termination.TerminationFlag;
Expand All @@ -35,6 +36,7 @@ public final class RequestScopedDependencies {
private final DatabaseId databaseId;
private final GraphLoaderContext graphLoaderContext;
private final TaskRegistryFactory taskRegistryFactory;
private final TaskStore taskStore;
private final TerminationFlag terminationFlag;
private final User user;
private final UserLogRegistryFactory userLogRegistryFactory;
Expand All @@ -49,6 +51,7 @@ private RequestScopedDependencies(
DatabaseId databaseId,
GraphLoaderContext graphLoaderContext,
TaskRegistryFactory taskRegistryFactory,
TaskStore taskStore,
TerminationFlag terminationFlag,
User user,
UserLogRegistryFactory userLogRegistryFactory,
Expand All @@ -57,13 +60,15 @@ private RequestScopedDependencies(
this.databaseId = databaseId;
this.graphLoaderContext = graphLoaderContext;
this.taskRegistryFactory = taskRegistryFactory;
this.taskStore = taskStore;
this.terminationFlag = terminationFlag;
this.user = user;
this.userLogRegistryFactory = userLogRegistryFactory;
this.userLogStore = userLogStore;
}
public static RequestScopedDependenciesBuilder builder(){
return new RequestScopedDependenciesBuilder();

public static RequestScopedDependenciesBuilder builder() {
return new RequestScopedDependenciesBuilder();
}

public DatabaseId getDatabaseId() {
Expand All @@ -79,6 +84,10 @@ public TaskRegistryFactory getTaskRegistryFactory() {
return taskRegistryFactory;
}

public TaskStore getTaskStore() {
return taskStore;
}

public TerminationFlag getTerminationFlag() {
return terminationFlag;
}
Expand All @@ -96,7 +105,6 @@ public UserLogStore getUserLogStore() {
}



/**
* A handy builder where you can include as many or as few components as you are interested in.
* We deliberately do not have defaults,
Expand All @@ -107,6 +115,7 @@ public static class RequestScopedDependenciesBuilder {
private GraphLoaderContext graphLoaderContext;
private TerminationFlag terminationFlag;
private TaskRegistryFactory taskRegistryFactory;
private TaskStore taskStore;
private User user;
private UserLogRegistryFactory userLogRegistryFactory;
private UserLogStore userLogStore;
Expand All @@ -121,13 +130,16 @@ public RequestScopedDependenciesBuilder with(GraphLoaderContext graphLoaderConte
return this;
}



public RequestScopedDependenciesBuilder with(TaskRegistryFactory taskRegistryFactory) {
this.taskRegistryFactory = taskRegistryFactory;
return this;
}

public RequestScopedDependenciesBuilder with(TaskStore taskStore) {
this.taskStore = taskStore;
return this;
}

public RequestScopedDependenciesBuilder with(TerminationFlag terminationFlag) {
this.terminationFlag = terminationFlag;
return this;
Expand All @@ -148,13 +160,12 @@ public RequestScopedDependenciesBuilder with(UserLogStore userLogStore) {
return this;
}



public RequestScopedDependencies build() {
return new RequestScopedDependencies(
databaseId,
graphLoaderContext,
taskRegistryFactory,
taskStore,
terminationFlag,
user,
userLogRegistryFactory,
Expand Down
17 changes: 0 additions & 17 deletions proc/common/src/main/java/org/neo4j/gds/BaseProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.neo4j.gds.config.BaseConfig;
import org.neo4j.gds.core.CypherMapAccess;
import org.neo4j.gds.core.Username;
import org.neo4j.gds.core.loading.GraphStoreCatalog;
import org.neo4j.gds.core.loading.GraphStoreCatalogEntry;
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
import org.neo4j.gds.core.utils.warnings.UserLogRegistryFactory;
Expand All @@ -51,8 +50,6 @@

import java.util.function.Supplier;

import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;

public abstract class BaseProc {

public static final String ESTIMATE_DESCRIPTION = "Returns an estimation of the memory consumption for that procedure.";
Expand Down Expand Up @@ -125,20 +122,6 @@ protected final void validateConfig(CypherMapAccess cypherConfig, BaseConfig con
cypherConfig.requireOnlyKeysFrom(config.configKeys());
}

protected final void validateGraphNameAndEnsureItDoesNotExist(String username, String graphName) {
validateGraphName(graphName);
if (GraphStoreCatalog.exists(username, databaseId(), graphName)) {
throw new IllegalArgumentException(formatWithLocale(
"A graph with name '%s' already exists.",
graphName
));
}
}

private void validateGraphName(String graphName) {
CypherMapAccess.failOnBlank("graphName", graphName);
}

public ExecutionContext executionContext() {
return databaseService == null
? ExecutionContext.EMPTY
Expand Down
1 change: 1 addition & 0 deletions proc/community/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation project(':neo4j-api')
implementation project(':node-embedding-algorithms')
implementation project(':opengds-procedure-facade')
implementation project(':operations-procedure-facade')
implementation project(':progress-tracking')
implementation project(':string-formatting')
implementation project(':termination')
Expand Down
1 change: 1 addition & 0 deletions proc/machine-learning/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
implementation project(':neo4j-api')
implementation project(':node-embedding-algorithms')
implementation project(':opengds-procedure-facade')
implementation project(':operations-procedure-facade')
implementation project(':pipeline')
implementation project(':proc-common')
implementation project(':progress-tracking')
Expand Down
1 change: 1 addition & 0 deletions proc/misc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation project(':logging')
implementation project(':memory-usage')
implementation project(':opengds-procedure-facade')
implementation project(':operations-procedure-facade')
implementation project(':proc-common')
implementation project(':proc-embeddings')
implementation project(':progress-tracking')
Expand Down
171 changes: 9 additions & 162 deletions proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,189 +19,36 @@
*/
package org.neo4j.gds;

import org.apache.commons.lang3.time.DurationFormatUtils;
import org.neo4j.gds.core.utils.ClockService;
import org.neo4j.gds.core.utils.progress.JobId;
import org.neo4j.gds.core.utils.progress.TaskStore;
import org.neo4j.gds.core.utils.progress.tasks.DepthAwareTaskVisitor;
import org.neo4j.gds.core.utils.progress.tasks.Task;
import org.neo4j.gds.core.utils.progress.tasks.TaskTraversal;
import org.neo4j.gds.procedures.GraphDataScienceProcedures;
import org.neo4j.gds.procedures.operations.ProgressResult;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Internal;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.values.storable.LocalTimeValue;

import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;

public class ListProgressProc extends BaseProc {

static final int PROGRESS_BAR_LENGTH = 10;
public class ListProgressProc {
private static final String DESCRIPTION = "List progress events for currently running tasks.";

@Context
public TaskStore taskStore;
public GraphDataScienceProcedures facade;

@Internal
@Deprecated(forRemoval = true)
@Procedure(value = "gds.beta.listProgress", deprecatedBy = "gds.listProgress")
@Description(DESCRIPTION)
public Stream<ProgressResult> betaListProgress(
@Name(value = "jobId", defaultValue = "") String jobId
) {
executionContext()
.metricsFacade()
.deprecatedProcedures().called("gds.beta.listProgress");

executionContext()
.log()
.warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`.");
public Stream<ProgressResult> betaListProgress(@Name(value = "jobId", defaultValue = "") String jobId) {
facade.deprecatedProcedures().called("gds.beta.listProgress");
facade.log().warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`.");

return listProgress(jobId);
}

@Procedure("gds.listProgress")
@Description(DESCRIPTION)
public Stream<ProgressResult> listProgress(
@Name(value = "jobId", defaultValue = "") String jobId
) {
return jobId.isBlank()
? jobsSummaryView()
: jobDetailView(jobId);
}

private Stream<ProgressResult> jobsSummaryView() {
if (isGdsAdmin()) {
return taskStore.query().map(ProgressResult::fromTaskStoreEntry);
} else {
return taskStore.query(username()).map(ProgressResult::fromTaskStoreEntry);
}
}

private Stream<ProgressResult> jobDetailView(String jobIdAsString) {
var jobId = new JobId(jobIdAsString);

if (isGdsAdmin()) {
var progressResults = taskStore
.query(jobId)
.flatMap(ListProgressProc::jobProgress)
.collect(Collectors.toList());

if (progressResults.isEmpty()) {
throw new IllegalArgumentException(formatWithLocale(
"No task with job id `%s` was found.",
jobIdAsString
));
}

return progressResults.stream();
} else {
return taskStore.query(username(), jobId).map(ListProgressProc::jobProgress).orElseThrow(
() -> new IllegalArgumentException(formatWithLocale(
"No task with job id `%s` was found.",
jobIdAsString
))
);
}
}

private static Stream<ProgressResult> jobProgress(TaskStore.UserTask userTask) {
var jobProgressVisitor = new JobProgressVisitor(userTask.jobId(), userTask.username());
TaskTraversal.visitPreOrderWithDepth(userTask.task(), jobProgressVisitor);
return jobProgressVisitor.progressRowsStream();
}

@SuppressWarnings("unused")
public static class ProgressResult {
public String username;
public String jobId;
public String taskName;
public String progress;
public String progressBar;
public String status;
public LocalTimeValue timeStarted;
public String elapsedTime;

static ProgressResult fromTaskStoreEntry(String username, Map.Entry<JobId, Task> taskStoreEntry) {
var jobId = taskStoreEntry.getKey();
var task = taskStoreEntry.getValue();
return new ProgressResult(username, task, jobId, task.description());
}

static ProgressResult fromTaskStoreEntry(TaskStore.UserTask userTask) {
return new ProgressResult(userTask.username(), userTask.task(), userTask.jobId(), userTask.task().description());
}

static ProgressResult fromTaskWithDepth(String username, Task task, JobId jobId, int depth) {
var treeViewTaskName = StructuredOutputHelper.treeViewDescription(task.description(), depth);
return new ProgressResult(username, task, jobId, treeViewTaskName);
}

public ProgressResult(String username, Task task, JobId jobId, String taskName) {
var progressContainer = task.getProgress();

this.jobId = jobId.asString();
this.taskName = taskName;
this.username = username;
this.progress = StructuredOutputHelper.computeProgress(progressContainer);
this.progressBar = StructuredOutputHelper.progressBar(progressContainer, PROGRESS_BAR_LENGTH);
this.status = task.status().name();
this.timeStarted = localTimeValue(task);
this.elapsedTime = prettyElapsedTime(task);
}

private LocalTimeValue localTimeValue(Task task) {
if (task.hasNotStarted()) {
return null;
}
return LocalTimeValue.localTime(LocalTime.ofInstant(
Instant.ofEpochMilli(task.startTime()),
ZoneId.systemDefault()
));
}

private String prettyElapsedTime(Task task) {
if (task.hasNotStarted()) {
return "Not yet started";
}
var finishTime = task.finishTime();
var finishTimeOrNow = finishTime != -1
? finishTime
: ClockService.clock().millis();
var elapsedTime = finishTimeOrNow - task.startTime();
return DurationFormatUtils.formatDurationWords(elapsedTime, true, true);
}
}

public static class JobProgressVisitor extends DepthAwareTaskVisitor {

private final JobId jobId;
private final String username;
private final List<ProgressResult> progressRows;

JobProgressVisitor(JobId jobId, String username) {
this.jobId = jobId;
this.username = username;
this.progressRows = new ArrayList<>();
}

Stream<ProgressResult> progressRowsStream() {
return this.progressRows.stream();
}

@Override
public void visit(Task task) {
progressRows.add(ProgressResult.fromTaskWithDepth(username, task, jobId, depth()));
}
public Stream<ProgressResult> listProgress(@Name(value = "jobId", defaultValue = "") String jobId) {
return facade.operations().listProgress(jobId);
}
}
5 changes: 3 additions & 2 deletions proc/test/src/main/java/org/neo4j/gds/ProcedureRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.gds.core.Username;
import org.neo4j.gds.core.loading.GraphStoreCatalogService;
import org.neo4j.gds.core.model.OpenModelCatalog;
import org.neo4j.gds.core.utils.progress.EmptyTaskStore;
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
import org.neo4j.gds.core.utils.warnings.EmptyUserLogRegistryFactory;
import org.neo4j.gds.core.utils.warnings.EmptyUserLogStore;
Expand Down Expand Up @@ -152,13 +153,13 @@ private static GraphDataScienceProcedures createGraphDataScienceProcedures(
) {
var gdsLog = new LogAdapter(log);

var procedureContext = WriteContext.builder()
.build();
var procedureContext = WriteContext.builder().build();

var requestScopedDependencies = RequestScopedDependencies.builder()
.with(new DatabaseIdAccessor().getDatabaseId(graphDatabaseService))
.with(GraphLoaderContext.NULL_CONTEXT)
.with(taskRegistryFactory)
.with(EmptyTaskStore.INSTANCE)
.with(new User(username.username(), false))
.with(EmptyUserLogRegistryFactory.INSTANCE)
.with(EmptyUserLogStore.INSTANCE)
Expand Down
Loading

0 comments on commit 1120a9e

Please sign in to comment.