Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-927 Task log streaming for GCP Batch #7540

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
- Fixes the reference disk feature.
- Fixes pulling Docker image metadata from private GCR repositories.
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to save the logs into the the mounted disk, at the end, this log file gets copied to the google cloud storage bucket with "task.log" as the name.
- When "CLOUD_LOGGING" is used, many more Cromwell / WDL labels for workflow, root workflow, call, shard etc. are now assigned to GCP Batch log entries.
- A task log file with the name "task.log" that combines standard output and standard error is now streamed to the task directory in Google Cloud Storage.
- When Cloud Logging is enabled, many more Cromwell / WDL labels for workflow, root workflow, call, shard etc. are now assigned to GCP Batch log entries.

### Improved handling of Life Sciences API quota errors

Expand Down
11 changes: 7 additions & 4 deletions backend/src/main/scala/cromwell/backend/io/JobPaths.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ object JobPaths {
val ScriptPathKey = "script"
val StdoutPathKey = "stdout"
val StdErrPathKey = "stderr"
val TaskLogPathKey = "taskLog"
val ReturnCodePathKey = "returnCode"
val CallRootPathKey = "callRootPath"
val DockerCidPathKey = "dockerCidPath"

def callPathBuilder(root: Path, jobKey: JobKey, isCallCacheCopyAttempt: Boolean) = {
val callName = jobKey.node.localName
Expand All @@ -43,7 +43,9 @@ trait JobPaths {
def memoryRetryRCFilename: String = "memory_retry_rc"
def defaultStdoutFilename = "stdout"
def defaultStderrFilename = "stderr"
def defaultTaskLogFilename = "task.log"
def isDocker: Boolean = false
def implementsTaskLogging: Boolean = false

// In this non-Docker version of `JobPaths` there is no distinction between host and container roots so this is
// just called 'rootWithSlash'.
Expand Down Expand Up @@ -73,7 +75,8 @@ trait JobPaths {
// enable dynamic standard output and error file names for languages like CWL that support this feature.
var standardPaths: StandardPaths = StandardPaths(
output = callExecutionRoot.resolve(defaultStdoutFilename),
error = callExecutionRoot.resolve(defaultStderrFilename)
error = callExecutionRoot.resolve(defaultStderrFilename),
taskLog = callExecutionRoot.resolve(defaultTaskLogFilename)
)

lazy val script = callExecutionRoot.resolve(scriptFilename)
Expand All @@ -86,7 +89,7 @@ trait JobPaths {
def standardOutputAndErrorPaths: Map[String, Path] = Map(
CallMetadataKeys.Stdout -> standardPaths.output,
CallMetadataKeys.Stderr -> standardPaths.error
)
) ++ (if (implementsTaskLogging) Map(CallMetadataKeys.TaskLog -> standardPaths.taskLog) else Map.empty)

private lazy val commonMetadataPaths: Map[String, Path] =
standardOutputAndErrorPaths + (CallMetadataKeys.CallRoot -> callRoot)
Expand All @@ -99,7 +102,7 @@ trait JobPaths {
JobPaths.StdoutPathKey -> standardPaths.output,
JobPaths.StdErrPathKey -> standardPaths.error,
JobPaths.ReturnCodePathKey -> returnCode
)
) ++ (if (implementsTaskLogging) Map(JobPaths.TaskLogPathKey -> standardPaths.taskLog) else Map.empty)

private lazy val commonLogPaths: Map[String, Path] = Map(
JobPaths.StdoutPathKey -> standardPaths.output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ trait StandardAsyncExecutionActor
instantiatedCommand.evaluatedStdoutOverride.getOrElse(jobPaths.defaultStdoutFilename) |> absolutizeContainerPath
def executionStderr: String =
instantiatedCommand.evaluatedStderrOverride.getOrElse(jobPaths.defaultStderrFilename) |> absolutizeContainerPath
def executionTaskLog: String = jobPaths.defaultTaskLogFilename |> absolutizeContainerPath

/*
* Ensures the standard paths are correct w.r.t overridden paths. This is called in two places: when generating the command and
Expand All @@ -393,9 +394,10 @@ trait StandardAsyncExecutionActor
// .get's are safe on stdout and stderr after falling back to default names above.
jobPaths.standardPaths = StandardPaths(
output = hostPathFromContainerPath(executionStdout),
error = hostPathFromContainerPath(executionStderr)
error = hostPathFromContainerPath(executionStderr),
taskLog = hostPathFromContainerPath(executionTaskLog)
)
// Re-publish stdout and stderr paths that were possibly just updated.
// Re-publish stdout, stderr and task log paths that were possibly just updated.
tellMetadata(jobPaths.standardOutputAndErrorPaths)
jobPathsUpdated = true
}
Expand Down Expand Up @@ -423,6 +425,7 @@ trait StandardAsyncExecutionActor
val stdinRedirection = executionStdin.map("< " + _.shellQuote).getOrElse("")
val stdoutRedirection = executionStdout.shellQuote
val stderrRedirection = executionStderr.shellQuote
val taskLogRedirection = executionTaskLog.shellQuote
val rcTmpPath = rcPath.plusExt("tmp")

val errorOrDirectoryOutputs: ErrorOr[List[WomUnlistedDirectory]] =
Expand Down Expand Up @@ -471,6 +474,10 @@ trait StandardAsyncExecutionActor
}
}

val taskLoggingCommand =
if (jobPaths.implementsTaskLogging) s"tail -q -f $stdoutRedirection $stderrRedirection > $taskLogRedirection &"
else ""

// The `tee` trickery below is to be able to redirect to known filenames for CWL while also streaming
// stdout and stderr for PAPI to periodically upload to cloud storage.
// https://stackoverflow.com/questions/692000/how-do-i-write-stderr-to-a-file-while-using-tee-with-a-pipe
Expand All @@ -491,6 +498,7 @@ trait StandardAsyncExecutionActor
|touch $stdoutRedirection $stderrRedirection
|tee $stdoutRedirection < "$$$out" &
|tee $stderrRedirection < "$$$err" >&2 &
|TASK_LOGGING_COMMAND
|(
|cd ${cwd.pathAsString}
|ENVIRONMENT_VARIABLES
Expand All @@ -511,6 +519,7 @@ trait StandardAsyncExecutionActor
.replace("INSTANTIATED_COMMAND", commandString)
.replace("SCRIPT_EPILOGUE", scriptEpilogue)
.replace("DOCKER_OUTPUT_DIR_LINK", dockerOutputDir)
.replace("TASK_LOGGING_COMMAND", taskLoggingCommand)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ metadata {
fileSystemCheck: "gcs"
outputExpectations: {
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/exhaustive_delete/<<UUID>>/call-exhaustive/delete.txt": 0
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/exhaustive_delete/<<UUID>>/call-exhaustive/": 8
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/exhaustive_delete/<<UUID>>/call-exhaustive/": 9
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/exhaustive_delete/<<UUID>>/call-exhaustive/gcs_delocalization.sh": 1
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/exhaustive_delete/<<UUID>>/call-exhaustive/gcs_localization.sh": 1
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/exhaustive_delete/<<UUID>>/call-exhaustive/gcs_transfer.sh": 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ metadata {
fileSystemCheck: "gcs"
outputExpectations: {
# No current way to match on the subworkflow id, so for now just make sure the total directory count matches.
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/sub_workflow_delete/<<UUID>>/call-sub_call/sub_workflow_delete_import/": 8
"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/sub_workflow_delete/<<UUID>>/call-sub_call/sub_workflow_delete_import/": 9
#"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/sub_workflow_delete/<<UUID>>/call-sub_call/sub_workflow_delete_import/<<SUB_WORKFLOW_UUID>>/call-sub_workflow_task/gcs_delocalization.sh": 1
#"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/sub_workflow_delete/<<UUID>>/call-sub_call/sub_workflow_delete_import/<<SUB_WORKFLOW_UUID>>/call-sub_workflow_task/gcs_localization.sh": 1
#"gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci/sub_workflow_delete/<<UUID>>/call-sub_call/sub_workflow_delete_import/<<SUB_WORKFLOW_UUID>>/call-sub_workflow_task/gcs_transfer.sh": 1
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/cromwell/core/core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import mouse.boolean._
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NoStackTrace

case class StandardPaths(output: Path, error: Path)
case class StandardPaths(output: Path, error: Path, taskLog: Path)

case class CallContext(root: Path, standardPaths: StandardPaths, isDocker: Boolean)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object CallMetadataKeys {
val Failures = "failures"
val Stdout = "stdout"
val Stderr = "stderr"
val TaskLog = "taskLog"
val BackendLogsPrefix = "backendLogs"
val BackendStatus = "backendStatus"
val JobId = "jobId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,16 +838,6 @@
contentType = plainTextContentType
)

val logFileOutput = GcpBatchFileOutput(
logFilename,
logGcsPath,
DefaultPathBuilder.get(logFilename),
workingDisk,
optional = true,
secondary = false,
contentType = plainTextContentType
)

val memoryRetryRCFileOutput = GcpBatchFileOutput(
memoryRetryRCFilename,
memoryRetryRCGcsPath,
Expand All @@ -864,7 +854,8 @@

val standardStreams = List(
StandardStream("stdout", _.output),
StandardStream("stderr", _.error)
StandardStream("stderr", _.error),
StandardStream("taskLog", _.taskLog)

Check warning on line 858 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L857-L858

Added lines #L857 - L858 were not covered by tests
) map { s =>
GcpBatchFileOutput(
s.name,
Expand All @@ -888,8 +879,7 @@
DetritusOutputParameters(
monitoringScriptOutputParameter = monitoringOutput,
rcFileOutputParameter = rcFileOutput,
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput,
logFileOutputParameter = logFileOutput
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput
),
List.empty
)
Expand All @@ -908,10 +898,7 @@
runtimeAttributes = runtimeAttributes,
batchAttributes = batchAttributes,
projectId = batchAttributes.project,
region = batchAttributes.location,
logfile = createParameters.commandScriptContainerPath.sibling(
batchParameters.detritusOutputParameters.logFileOutputParameter.name
)
region = batchAttributes.location

Check warning on line 901 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L901

Added line #L901 was not covered by tests
)

drsLocalizationManifestCloudPath = jobPaths.callExecutionRoot / GcpBatchJobPaths.DrsLocalizationManifestName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
}
import cromwell.backend.google.batch.models._
import cromwell.backend.google.batch.runnable.WorkflowOptionKeys
import cromwell.backend.io.JobPaths
import cromwell.backend.standard.{
StandardInitializationActor,
StandardInitializationActorParams,
Expand Down Expand Up @@ -274,7 +275,7 @@
// For metadata publishing purposes default to using the name of a standard stream as the stream's filename.
def defaultStandardStreamNameToFileNameMetadataMapper(gcpBatchJobPaths: GcpBatchJobPaths,
streamName: String
): String = streamName
): String = if (streamName == JobPaths.TaskLogPathKey) gcpBatchJobPaths.batchLogFilename else streamName

Check warning on line 278 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchInitializationActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchInitializationActor.scala#L278

Added line #L278 was not covered by tests

def encryptKms(keyName: String, credentials: OAuth2Credentials, plainText: String): String = {
val httpCredentialsAdapter = new HttpCredentialsAdapter(credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
lazy val memoryRetryRCFilename: String = gcpBatchCallPaths.memoryRetryRCFilename
lazy val memoryRetryRCGcsPath: Path = gcpBatchCallPaths.memoryRetryRC

lazy val logFilename: String = "task.log"
lazy val logGcsPath: Path = gcpBatchCallPaths.callExecutionRoot.resolve(logFilename)

lazy val batchAttributes: GcpBatchConfigurationAttributes = batchConfiguration.batchAttributes

lazy val defaultLabels: Labels = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ object GcpBatchRequestFactory {
case class DetritusOutputParameters(
monitoringScriptOutputParameter: Option[GcpBatchFileOutput],
rcFileOutputParameter: GcpBatchFileOutput,
memoryRetryRCFileOutputParameter: GcpBatchFileOutput,
logFileOutputParameter: GcpBatchFileOutput
memoryRetryRCFileOutputParameter: GcpBatchFileOutput
) {
def all: List[GcpBatchFileOutput] = memoryRetryRCFileOutputParameter ::
logFileOutputParameter ::
rcFileOutputParameter ::
monitoringScriptOutputParameter.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,6 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val logsPolicy = data.gcpBatchParameters.batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging =>
LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build
case GcpBatchLogsPolicy.Path =>
LogsPolicy.newBuilder
.setDestination(Destination.PATH)
.setLogsPath(data.gcpBatchParameters.logfile.toString)
.build
}

val googleLabels = data.createParameters.googleLabels.map(l => Label(l.key, l.value))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cromwell.backend.google.batch.models

import cromwell.backend.BackendJobDescriptor
import cromwell.core.path.Path

case class CreateGcpBatchParameters(jobDescriptor: BackendJobDescriptor,
runtimeAttributes: GcpBatchRuntimeAttributes,
batchAttributes: GcpBatchConfigurationAttributes,
projectId: String,
region: String,
logfile: Path
region: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ object GcpBatchConfigurationAttributes extends GcpBatchReferenceFilesMappingOper
val logsPolicy: ErrorOr[GcpBatchLogsPolicy] = validate {
backendConfig.as[Option[String]]("batch.logs-policy").getOrElse("CLOUD_LOGGING") match {
case "CLOUD_LOGGING" => GcpBatchLogsPolicy.CloudLogging
case "PATH" => GcpBatchLogsPolicy.Path
case other =>
throw new IllegalArgumentException(
s"Unrecognized logs policy entry: $other. Supported strategies are CLOUD_LOGGING and PATH."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
override val isCallCacheCopyAttempt: Boolean = false
) extends JobPaths {

def batchLogBasename = {
val index = jobKey.index
.map(s => s"-$s")
.getOrElse("")
s"${jobKey.node.localName}$index"
}

val batchLogFilename: String = s"$batchLogBasename.log"
override def implementsTaskLogging: Boolean = true

Check warning on line 24 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchJobPaths.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchJobPaths.scala#L24

Added line #L24 was not covered by tests

val batchLogFilename: String = "task.log"

Check warning on line 26 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchJobPaths.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchJobPaths.scala#L26

Added line #L26 was not covered by tests
lazy val batchLogPath: Path = callExecutionRoot.resolve(batchLogFilename)

val batchMonitoringLogFilename: String = s"${GcpBatchJobPaths.BatchMonitoringKey}.log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ sealed trait GcpBatchLogsPolicy extends Product with Serializable

object GcpBatchLogsPolicy {
case object CloudLogging extends GcpBatchLogsPolicy
case object Path extends GcpBatchLogsPolicy
}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
"gs://path/to/gcs_root/wf_hello/e6236763-c518-41d0-9688-432549a8bf7c/call-hello/stderr"
batchBackend.gcpBatchCallPaths.batchLogPath should be(a[GcsPath])
batchBackend.gcpBatchCallPaths.batchLogPath.pathAsString shouldBe
"gs://path/to/gcs_root/wf_hello/e6236763-c518-41d0-9688-432549a8bf7c/call-hello/hello.log"
"gs://path/to/gcs_root/wf_hello/e6236763-c518-41d0-9688-432549a8bf7c/call-hello/task.log"
}

it should "return Batch log paths for scattered call" in {
Expand Down Expand Up @@ -1132,7 +1132,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
"gs://path/to/gcs_root/w/e6236763-c518-41d0-9688-432549a8bf7d/call-B/shard-2/stderr"
batchBackend.gcpBatchCallPaths.batchLogPath should be(a[GcsPath])
batchBackend.gcpBatchCallPaths.batchLogPath.pathAsString shouldBe
"gs://path/to/gcs_root/w/e6236763-c518-41d0-9688-432549a8bf7d/call-B/shard-2/B-2.log"
"gs://path/to/gcs_root/w/e6236763-c518-41d0-9688-432549a8bf7d/call-B/shard-2/task.log"
}

it should "return the project from the workflow options in the start metadata" in {
Expand Down Expand Up @@ -1202,7 +1202,8 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
"runtimeAttributes:zones" -> "us-central1-b,us-central1-a",
"runtimeAttributes:maxRetries" -> "0",
"stderr" -> s"$batchGcsRoot/wf_hello/$workflowId/call-goodbye/stderr",
"stdout" -> s"$batchGcsRoot/wf_hello/$workflowId/call-goodbye/stdout"
"stdout" -> s"$batchGcsRoot/wf_hello/$workflowId/call-goodbye/stdout",
"taskLog" -> s"$batchGcsRoot/wf_hello/$workflowId/call-goodbye/task.log"
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ class GcpBatchConfigurationAttributesSpec
gcpBatchAttributes.logsPolicy should be(GcpBatchLogsPolicy.CloudLogging)
}

it should "parse logs-policy = PATH" in {
val backendConfig = ConfigFactory.parseString(configString(batch = "logs-policy = PATH"))
val gcpBatchAttributes = GcpBatchConfigurationAttributes(googleConfig, backendConfig, "batch")
gcpBatchAttributes.logsPolicy should be(GcpBatchLogsPolicy.Path)
}

it should "reject invalid logs-policy" in {
val expected =
"Google Cloud Batch configuration is not valid: Errors:\nUnrecognized logs policy entry: INVALID. Supported strategies are CLOUD_LOGGING and PATH."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GcpBatchJobPathsSpec extends TestKitSuite with AnyFlatSpecLike with Matche
callPaths.returnCodeFilename should be("rc")
callPaths.stderr.getFileName.pathAsString should be("gs://my-cromwell-workflows-bucket/stderr")
callPaths.stdout.getFileName.pathAsString should be("gs://my-cromwell-workflows-bucket/stdout")
callPaths.batchLogFilename should be("hello.log")
callPaths.batchLogFilename should be("task.log")
}

it should "map the correct paths" in {
Expand Down Expand Up @@ -69,7 +69,7 @@ class GcpBatchJobPathsSpec extends TestKitSuite with AnyFlatSpecLike with Matche
callPaths.stderr.pathAsString should
be(s"gs://my-cromwell-workflows-bucket/wf_hello/${workflowDescriptor.id}/call-hello/stderr")
callPaths.batchLogPath.pathAsString should
be(s"gs://my-cromwell-workflows-bucket/wf_hello/${workflowDescriptor.id}/call-hello/hello.log")
be(s"gs://my-cromwell-workflows-bucket/wf_hello/${workflowDescriptor.id}/call-hello/task.log")
}

it should "map the correct call context" in {
Expand Down
Loading