Skip to content

Commit

Permalink
migrate list progress
Browse files Browse the repository at this point in the history
  • Loading branch information
lassewesth committed Aug 13, 2024
1 parent 39d1750 commit 6764132
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 163 deletions.
171 changes: 9 additions & 162 deletions proc/misc/src/main/java/org/neo4j/gds/ListProgressProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,189 +19,36 @@
*/
package org.neo4j.gds;

import org.apache.commons.lang3.time.DurationFormatUtils;
import org.neo4j.gds.core.utils.ClockService;
import org.neo4j.gds.core.utils.progress.JobId;
import org.neo4j.gds.core.utils.progress.TaskStore;
import org.neo4j.gds.core.utils.progress.tasks.DepthAwareTaskVisitor;
import org.neo4j.gds.core.utils.progress.tasks.Task;
import org.neo4j.gds.core.utils.progress.tasks.TaskTraversal;
import org.neo4j.gds.procedures.operations.StructuredOutputHelper;
import org.neo4j.gds.procedures.GraphDataScienceProcedures;
import org.neo4j.gds.procedures.operations.ProgressResult;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Internal;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;
import org.neo4j.values.storable.LocalTimeValue;

import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;

public class ListProgressProc extends BaseProc {
static final int PROGRESS_BAR_LENGTH = 10;
public class ListProgressProc {
private static final String DESCRIPTION = "List progress events for currently running tasks.";

@Context
public TaskStore taskStore;
public GraphDataScienceProcedures facade;

@Internal
@Deprecated(forRemoval = true)
@Procedure(value = "gds.beta.listProgress", deprecatedBy = "gds.listProgress")
@Description(DESCRIPTION)
public Stream<ProgressResult> betaListProgress(
@Name(value = "jobId", defaultValue = "") String jobId
) {
executionContext()
.metricsFacade()
.deprecatedProcedures().called("gds.beta.listProgress");

executionContext()
.log()
.warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`.");
public Stream<ProgressResult> betaListProgress(@Name(value = "jobId", defaultValue = "") String jobId) {
facade.deprecatedProcedures().called("gds.beta.listProgress");
facade.log().warn("Procedure `gds.beta.listProgress` has been deprecated, please use `gds.listProgress`.");

return listProgress(jobId);
}

@Procedure("gds.listProgress")
@Description(DESCRIPTION)
public Stream<ProgressResult> listProgress(
@Name(value = "jobId", defaultValue = "") String jobId
) {
return jobId.isBlank()
? jobsSummaryView()
: jobDetailView(jobId);
}

private Stream<ProgressResult> jobsSummaryView() {
if (isGdsAdmin()) {
return taskStore.query().map(ProgressResult::fromTaskStoreEntry);
} else {
return taskStore.query(username()).map(ProgressResult::fromTaskStoreEntry);
}
}

private Stream<ProgressResult> jobDetailView(String jobIdAsString) {
var jobId = new JobId(jobIdAsString);

if (isGdsAdmin()) {
var progressResults = taskStore
.query(jobId)
.flatMap(ListProgressProc::jobProgress)
.collect(Collectors.toList());

if (progressResults.isEmpty()) {
throw new IllegalArgumentException(formatWithLocale(
"No task with job id `%s` was found.",
jobIdAsString
));
}

return progressResults.stream();
} else {
return taskStore.query(username(), jobId).map(ListProgressProc::jobProgress).orElseThrow(
() -> new IllegalArgumentException(formatWithLocale(
"No task with job id `%s` was found.",
jobIdAsString
))
);
}
}

private static Stream<ProgressResult> jobProgress(TaskStore.UserTask userTask) {
var jobProgressVisitor = new JobProgressVisitor(userTask.jobId(), userTask.username());
TaskTraversal.visitPreOrderWithDepth(userTask.task(), jobProgressVisitor);
return jobProgressVisitor.progressRowsStream();
}

@SuppressWarnings("unused")
public static class ProgressResult {
public String username;
public String jobId;
public String taskName;
public String progress;
public String progressBar;
public String status;
public LocalTimeValue timeStarted;
public String elapsedTime;

static ProgressResult fromTaskStoreEntry(String username, Map.Entry<JobId, Task> taskStoreEntry) {
var jobId = taskStoreEntry.getKey();
var task = taskStoreEntry.getValue();
return new ProgressResult(username, task, jobId, task.description());
}

static ProgressResult fromTaskStoreEntry(TaskStore.UserTask userTask) {
return new ProgressResult(userTask.username(), userTask.task(), userTask.jobId(), userTask.task().description());
}

static ProgressResult fromTaskWithDepth(String username, Task task, JobId jobId, int depth) {
var treeViewTaskName = StructuredOutputHelper.treeViewDescription(task.description(), depth);
return new ProgressResult(username, task, jobId, treeViewTaskName);
}

public ProgressResult(String username, Task task, JobId jobId, String taskName) {
var progressContainer = task.getProgress();

this.jobId = jobId.asString();
this.taskName = taskName;
this.username = username;
this.progress = StructuredOutputHelper.computeProgress(progressContainer);
this.progressBar = StructuredOutputHelper.progressBar(progressContainer, PROGRESS_BAR_LENGTH);
this.status = task.status().name();
this.timeStarted = localTimeValue(task);
this.elapsedTime = prettyElapsedTime(task);
}

private LocalTimeValue localTimeValue(Task task) {
if (task.hasNotStarted()) {
return null;
}
return LocalTimeValue.localTime(LocalTime.ofInstant(
Instant.ofEpochMilli(task.startTime()),
ZoneId.systemDefault()
));
}

private String prettyElapsedTime(Task task) {
if (task.hasNotStarted()) {
return "Not yet started";
}
var finishTime = task.finishTime();
var finishTimeOrNow = finishTime != -1
? finishTime
: ClockService.clock().millis();
var elapsedTime = finishTimeOrNow - task.startTime();
return DurationFormatUtils.formatDurationWords(elapsedTime, true, true);
}
}

public static class JobProgressVisitor extends DepthAwareTaskVisitor {

private final JobId jobId;
private final String username;
private final List<ProgressResult> progressRows;

JobProgressVisitor(JobId jobId, String username) {
this.jobId = jobId;
this.username = username;
this.progressRows = new ArrayList<>();
}

Stream<ProgressResult> progressRowsStream() {
return this.progressRows.stream();
}

@Override
public void visit(Task task) {
progressRows.add(ProgressResult.fromTaskWithDepth(username, task, jobId, depth()));
}
public Stream<ProgressResult> listProgress(@Name(value = "jobId", defaultValue = "") String jobId) {
return facade.operations().listProgress(jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private IllegalArgumentException createException() {
return new IllegalArgumentException(
formatWithLocale(
"No task with job id `%s` was found.",
jobId
jobId.asString()
)
);
}
Expand Down

0 comments on commit 6764132

Please sign in to comment.