Skip to content

Commit

Permalink
Fixes for internal error reattempts (#1436)
Browse files Browse the repository at this point in the history
* magic links on span event errors

* prevent task monitor from processing errors handled elsewhere

* exclusively use internal error code enum for completion data

* add complete attempt service opts

* reattempts need to go via the queue for task controllers that may have exited

* only infer retry config if completed via crash or system failure

* enhance error before deciding if retriable

* retry on SIGTERM

* enable retry config helper for latest sdk

* don't retry heartbeat timeouts for now

* enable task monitor to update fatal errors

* add missing service

* update retry config since package version

* don't alter completion time when updating existing error
  • Loading branch information
nicktrn authored Oct 28, 2024
1 parent cbe5170 commit d2c779e
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 95 deletions.
13 changes: 7 additions & 6 deletions apps/kubernetes-provider/src/taskMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ export class TaskMonitor {

let reason = rawReason || "Unknown error";
let logs = rawLogs || "";
let overrideCompletion = false;

/** This will only override existing task errors. It will not crash the run. */
let onlyOverrideExistingError = exitCode === EXIT_CODE_CHILD_NONZERO;

let errorCode: TaskRunInternalError["code"] = TaskRunErrorCodes.POD_UNKNOWN_ERROR;

switch (rawReason) {
Expand All @@ -185,10 +188,8 @@ export class TaskMonitor {
}
break;
case "OOMKilled":
overrideCompletion = true;
reason = `${
exitCode === EXIT_CODE_CHILD_NONZERO ? "Child process" : "Parent process"
} ran out of memory! Try choosing a machine preset with more memory for this task.`;
reason =
"[TaskMonitor] Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak.";
errorCode = TaskRunErrorCodes.TASK_PROCESS_OOM_KILLED;
break;
default:
Expand All @@ -199,7 +200,7 @@ export class TaskMonitor {
exitCode,
reason,
logs,
overrideCompletion,
overrideCompletion: onlyOverrideExistingError,
errorCode,
} satisfies FailureDetails;

Expand Down
28 changes: 23 additions & 5 deletions apps/webapp/app/components/runs/v3/SpanEvents.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { EnvelopeIcon } from "@heroicons/react/20/solid";
import {
exceptionEventEnhancer,
isExceptionSpanEvent,
type ExceptionEventProperties,
type SpanEvent as OtelSpanEvent,
} from "@trigger.dev/core/v3";
import { CodeBlock } from "~/components/code/CodeBlock";
import { Feedback } from "~/components/Feedback";
import { Button } from "~/components/primitives/Buttons";
import { Callout } from "~/components/primitives/Callout";
import { DateTimeAccurate } from "~/components/primitives/DateTime";
import { Header2, Header3 } from "~/components/primitives/Headers";
Expand Down Expand Up @@ -75,11 +78,26 @@ export function SpanEventError({
titleClassName="text-rose-500"
/>
{enhancedException.message && <Callout variant="error">{enhancedException.message}</Callout>}
{enhancedException.link && (
<Callout variant="docs" to={enhancedException.link.href}>
{enhancedException.link.name}
</Callout>
)}
{enhancedException.link &&
(enhancedException.link.magic === "CONTACT_FORM" ? (
<Feedback
button={
<Button
variant="tertiary/medium"
LeadingIcon={EnvelopeIcon}
leadingIconClassName="text-blue-400"
fullWidth
textAlignLeft
>
{enhancedException.link.name}
</Button>
}
/>
) : (
<Callout variant="docs" to={enhancedException.link.href}>
{enhancedException.link.name}
</Callout>
))}
{enhancedException.stacktrace && (
<CodeBlock
showCopyButton={false}
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/models/taskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type {
TaskRunFailedExecutionResult,
TaskRunSuccessfulExecutionResult,
} from "@trigger.dev/core/v3";
import { TaskRunError } from "@trigger.dev/core/v3";
import { TaskRunError, TaskRunErrorCodes } from "@trigger.dev/core/v3";

import type {
TaskRun,
Expand Down Expand Up @@ -62,7 +62,7 @@ export function executionResultForTaskRun(
id: taskRun.friendlyId,
error: {
type: "INTERNAL_ERROR",
code: "TASK_RUN_CANCELLED",
code: TaskRunErrorCodes.TASK_RUN_CANCELLED,
},
} satisfies TaskRunFailedExecutionResult;
}
Expand Down Expand Up @@ -94,7 +94,7 @@ export function executionResultForTaskRun(
id: taskRun.friendlyId,
error: {
type: "INTERNAL_ERROR",
code: "CONFIGURED_INCORRECTLY",
code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY,
},
} satisfies TaskRunFailedExecutionResult;
}
Expand Down
23 changes: 7 additions & 16 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,15 @@ export class FailedTaskRunRetryHelper extends BaseService {

logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });

const executionRetry =
completion.retry ??
(await FailedTaskRunRetryHelper.getExecutionRetry({
run: taskRun,
execution: retriableExecution,
}));

const completeAttempt = new CompleteAttemptService(this._prisma);
const completeResult = await completeAttempt.call({
completion: {
...completion,
retry: executionRetry,
},
execution: retriableExecution,
const completeAttempt = new CompleteAttemptService({
prisma: this._prisma,
isSystemFailure: !isCrash,
isCrash,
});
const completeResult = await completeAttempt.call({
completion,
execution: retriableExecution,
});

return completeResult;
}
Expand Down Expand Up @@ -280,6 +272,5 @@ export class FailedTaskRunRetryHelper extends BaseService {
}
}

// TODO: update this to the correct version
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.14";
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.1.0";
}
18 changes: 11 additions & 7 deletions apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { Redis } from "ioredis";
import { createAdapter } from "@socket.io/redis-adapter";
import { CrashTaskRunService } from "./services/crashTaskRun.server";
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
import { UpdateFatalRunErrorService } from "./services/updateFatalRunError.server";

export const socketIo = singleton("socketIo", initalizeIoServer);

Expand Down Expand Up @@ -123,12 +124,13 @@ function createCoordinatorNamespace(io: Server) {
await resumeAttempt.call(message);
},
TASK_RUN_COMPLETED: async (message) => {
const completeAttempt = new CompleteAttemptService();
const completeAttempt = new CompleteAttemptService({
supportsRetryCheckpoints: message.version === "v1",
});
await completeAttempt.call({
completion: message.completion,
execution: message.execution,
checkpoint: message.checkpoint,
supportsRetryCheckpoints: message.version === "v1",
});
},
TASK_RUN_FAILED_TO_RUN: async (message) => {
Expand Down Expand Up @@ -301,11 +303,13 @@ function createProviderNamespace(io: Server) {
handlers: {
WORKER_CRASHED: async (message) => {
try {
const service = new CrashTaskRunService();

await service.call(message.runId, {
...message,
});
if (message.overrideCompletion) {
const updateErrorService = new UpdateFatalRunErrorService();
await updateErrorService.call(message.runId, { ...message });
} else {
const crashRunService = new CrashTaskRunService();
await crashRunService.call(message.runId, { ...message });
}
} catch (error) {
logger.error("Error while handling crashed worker", { error });
}
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/requeueTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { BaseService } from "./services/baseService.server";
import { PrismaClientOrTransaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";
import { socketIo } from "./handleSocketIo.server";
import { TaskRunErrorCodes } from "@trigger.dev/core/v3";

export class RequeueTaskRunService extends BaseService {
public async call(runId: string) {
Expand Down Expand Up @@ -59,7 +60,7 @@ export class RequeueTaskRunService extends BaseService {
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: "TASK_RUN_HEARTBEAT_TIMEOUT",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
Expand Down
Loading

0 comments on commit d2c779e

Please sign in to comment.