Skip to content

Commit

Permalink
Merge #16
Browse files Browse the repository at this point in the history
16: overhauled to use AWS.jl r=mattBrzezinski a=ExpandingMan

As you probably know, this package was using the legacy AWSCore.jl.  This was a problem since attempting to use AWSCore.jl with AWS.jl results in a downgrade catastrophe, so it was impossible to use this package with AWS.jl or anything based on it (e.g. AWSS3.jl).

This was a much more extensive change than I was expecting.  My original intent was to make this upgrade non-breaking, but given the significant changes in the underlying API this seemed impractical.  What I have here now is *mostly* non-breaking.

Tests with Mocking.jl pass and everything seems to be working in my AWS environment.

My primary motivation for doing this was to upgrade AWSClusterManagers.jl to use this.  The changes there should be smaller, but there may also be some significant changes for anything that's not contained with AWSBatch.jl.

For the most part, the breaking changes are confined to keyword arguments.  In particular, following the convention of AWS.jl the AWS config is now always given by the `aws_config` keyword argument where applicable.  This in turn causes a few other arguments (e.g. `retion`) to become deprecated.  The only change I didn't need to make to the interface was that I changed the 3rd argument of `submit` to be `JobQueue` instead of a string, allowing it to be handled in a more uniform way.  It's unclear whether `submit` was intended to be public facing, so I don't think it should matter that much.

Co-authored-by: ExpandingMan <[email protected]>
Co-authored-by: ExpandingMan <[email protected]>
Co-authored-by: ExpandingMan <[email protected]>
Co-authored-by: mattBrzezinski <[email protected]>
Co-authored-by: Expanding Man <[email protected]>
  • Loading branch information
5 people authored Aug 11, 2021
2 parents 329af22 + b152f81 commit de9fec6
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 185 deletions.
11 changes: 3 additions & 8 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ name = "AWSBatch"
uuid = "dcae83d4-2881-5875-9d49-e5534165e9c0"
license = "MIT"
authors = ["Invenia Technical Computing"]
version = "1.4.2"
version = "2.0.0"

[deps]
AWSCore = "4f1ea46c-232b-54a6-9b17-cc2d0f3e6598"
AWSSDK = "0d499d91-6ae5-5d63-9313-12987b87d5ad"
AWSTools = "83bcdc74-1232-581c-948a-f29122bf8259"
AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"
AutoHashEquals = "15f4f7f2-30c1-5605-9d31-71845cf9641f"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Memento = "f28f55f0-a522-5efc-85c2-fe41dfb9b2d9"
Expand All @@ -16,11 +14,8 @@ OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"

[compat]
AWSCore = "0.5.2, 0.6"
AWSSDK = "0.2, 0.3, 0.4, 0.5"
AWSTools = "0.3.1, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1"
AWS = "1"
AutoHashEquals = "0.2.0"
HTTP = "0.8.1"
Memento = "0.7, 0.8, 0.9, 0.10, 0.11, 0.12, 0.13, 1"
Mocking = "0.7"
OrderedCollections = "1.4"
Expand Down
13 changes: 12 additions & 1 deletion docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ run_batch()

```@docs
AWSBatch.BatchJob
AWSBatch.submit(::AbstractString, ::JobDefinition, ::AbstractString)
AWSBatch.submit
AWSBatch.describe(::BatchJob)
AWSBatch.JobDefinition(::BatchJob)
AWSBatch.status(::BatchJob)
Expand All @@ -67,13 +67,24 @@ AWSBatch.log_events(::BatchJob)

```@docs
AWSBatch.JobDefinition
AWSBatch.ComputeEnvironment
AWSBatch.create_compute_environment
AWSBatch.list_job_definitions
AWSBatch.job_definition_arn(::AbstractString)
AWSBatch.register(::AbstractString)
AWSBatch.deregister(::JobDefinition)
AWSBatch.isregistered(::JobDefinition)
AWSBatch.describe(::JobDefinition)
```

### JobQueue

```@docs
AWSBatch.JobQueue
AWSBatch.create_job_queue
AWSBatch.list_job_queues
```

### JobState

```@docs
Expand Down
26 changes: 13 additions & 13 deletions src/AWSBatch.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
module AWSBatch

using AWS
using AutoHashEquals
using AWSCore: AWSException, aws_config
using AWSSDK.Batch
using AWSSDK.CloudWatchLogs
using AWSTools.EC2: instance_region
using OrderedCollections: OrderedDict
using Dates
using Memento
using Mocking

@service Batch
@service Cloudwatch_Logs

export BatchJob, ComputeEnvironment, BatchEnvironmentError, BatchJobError
export JobQueue, JobDefinition, JobState
export JobQueue, JobDefinition, JobState, LogEvent
export run_batch, describe, status, status_reason, wait, log_events, isregistered, register, deregister
export list_job_queues, list_job_definitions, create_compute_environment, create_job_queue


const logger = getlogger(@__MODULE__)
Expand All @@ -22,7 +23,6 @@ __init__() = Memento.register(logger)


include("exceptions.jl")
include("version.jl")
include("log_event.jl")
include("compute_environment.jl")
include("job_queue.jl")
Expand Down Expand Up @@ -72,14 +72,15 @@ function run_batch(;
num_jobs::Integer=1,
parameters::Dict{String, String}=Dict{String, String}(),
allow_job_registration::Bool=true,
aws_config::AbstractAWSConfig=global_aws_config(),
)
if isa(definition, AbstractString)
definition = isempty(definition) ? nothing : definition
end

# Determine if the job definition already exists and update the default job parameters
if definition !== nothing
response = describe_job_definition(definition)
response = describe_job_definition(definition; aws_config=aws_config)
if !isempty(response["jobDefinitions"])
details = first(response["jobDefinitions"])

Expand All @@ -101,11 +102,11 @@ function run_batch(;
job_id = ENV["AWS_BATCH_JOB_ID"]
job_queue = ENV["AWS_BATCH_JQ_NAME"]

# Get the zone information from the EC2 instance metadata.
isempty(region) && (region = @mock instance_region())
# if not specified, get region from the aws_config
isempty(region) && (region = aws_config.region)

# Requires permissions to access to "batch:DescribeJobs"
response = @mock describe_jobs(Dict("jobs" => [job_id]))
response = @mock Batch.describe_jobs([job_id]; aws_config=aws_config)

# Use the job's description to only update fields that are using the default
# values since explict arguments passed in via `kwargs` have higher priority
Expand Down Expand Up @@ -165,8 +166,8 @@ function run_batch(;
vcpus=vcpus,
memory=memory,
cmd=cmd,
region=region,
parameters=parameters,
aws_config=aws_config,
)
else
throw(BatchEnvironmentError(string(
Expand All @@ -193,10 +194,9 @@ function run_batch(;
return submit(
name,
definition,
queue;
JobQueue(queue);
container=container_overrides,
parameters=parameters,
region=region,
num_jobs=num_jobs,
)
end
Expand Down
46 changes: 18 additions & 28 deletions src/batch_job.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using AWSSDK.Batch: describe_jobs, submit_job
using AWSSDK.CloudWatchLogs: get_log_events


"""
BatchJob
Expand All @@ -23,10 +20,9 @@ end
submit(
name::AbstractString,
definition::JobDefinition,
queue::AbstractString;
queue::JobQueue;
container::AbstractDict=Dict(),
parameters::Dict{String,String}=Dict{String, String}(),
region::AbstractString="",
num_jobs::Integer=1,
) -> BatchJob
Expand All @@ -35,23 +31,17 @@ Handles submitting the batch job. Returns a `BatchJob` wrapper for the id.
function submit(
name::AbstractString,
definition::JobDefinition,
queue::AbstractString;
queue::JobQueue;
container::AbstractDict=Dict(),
parameters::Dict{String,String}=Dict{String, String}(),
region::AbstractString="",
num_jobs::Integer=1,
aws_config::AbstractAWSConfig=global_aws_config(),
)
region = isempty(region) ? "us-east-1" : region
config = aws_config(region = region)

debug(logger, "Submitting job \"$name\"")
input = [
"jobName" => name,
"jobQueue" => queue,
"jobDefinition" => definition.arn,
input = OrderedDict(
"parameters" => parameters,
"containerOverrides" => container,
]
)

if num_jobs > 1
# https://docs.aws.amazon.com/batch/latest/userguide/array_jobs.html
Expand All @@ -61,7 +51,7 @@ function submit(

debug(logger, "Input: $input")

response = @mock submit_job(config, input)
response = @mock Batch.submit_job(definition.arn, name, queue.arn, input; aws_config=aws_config)
job = BatchJob(response["jobId"])

if num_jobs > 1
Expand All @@ -78,8 +68,8 @@ end
Provides details about the AWS batch job.
"""
function describe(job::BatchJob)
response = @mock describe_jobs(; jobs=[job.id])
function describe(job::BatchJob; aws_config::AbstractAWSConfig=global_aws_config())
response = @mock Batch.describe_jobs([job.id]; aws_config=aws_config)
isempty(response["jobs"]) && error(logger, "Job $(job.id) not found.")
debug(logger, "Job $(job.id): $response")
return first(response["jobs"])
Expand All @@ -90,15 +80,17 @@ end
Returns the job definition corresponding to a batch job.
"""
JobDefinition(job::BatchJob) = JobDefinition(describe(job)["jobDefinition"])
function JobDefinition(job::BatchJob; aws_config::AbstractAWSConfig=global_aws_config())
JobDefinition(describe(job)["jobDefinition"]; aws_config=aws_config)
end

"""
status(job::BatchJob) -> JobState
Returns the current status of a job.
"""
function status(job::BatchJob)::JobState
details = describe(job)
function status(job::BatchJob; aws_config::AbstractAWSConfig=global_aws_config())::JobState
details = describe(job; aws_config=aws_config)
return parse(JobState, details["status"])
end

Expand All @@ -108,8 +100,8 @@ end
A short, human-readable string to provide additional details about the current status of the
job.
"""
function status_reason(job::BatchJob)
details = describe(job)
function status_reason(job::BatchJob; aws_config::AbstractAWSConfig=global_aws_config())
details = describe(job; aws_config=aws_config)
return get(details, "statusReason", nothing)
end

Expand All @@ -136,15 +128,16 @@ function Base.wait(
cond::Function,
job::BatchJob;
timeout=600,
delay=5
delay=5,
aws_config::AbstractAWSConfig=global_aws_config(),
)
completed = false
last_state = PENDING
initial = true

start_time = time() # System time in seconds since epoch
while time() - start_time < timeout
state = status(job)
state = status(job; aws_config=aws_config)

if state != last_state || initial
info(logger, "$(job.id) status $state")
Expand Down Expand Up @@ -229,6 +222,3 @@ function log_events(job::BatchJob)
return log_events("/aws/batch/job", stream)
end

if @__VERSION__() < v"1.5"
@deprecate log_events(job::BatchJob, ::Union{Type{Vector{LogEvent}},Type{Nothing}}) log_events(job)
end
64 changes: 54 additions & 10 deletions src/compute_environment.jl
Original file line number Diff line number Diff line change
@@ -1,30 +1,74 @@
using AWSSDK.Batch: describe_compute_environments

"""
ComputeEnvironment
An object representing an AWS batch compute environment.
See [`AWSBatch.create_compute_environment`](@ref).
"""
struct ComputeEnvironment
arn::String

function ComputeEnvironment(ce::AbstractString)
arn = compute_environment_arn(ce)
function ComputeEnvironment(ce::AbstractString; aws_config::AbstractAWSConfig=global_aws_config())
arn = compute_environment_arn(ce; aws_config=aws_config)
arn === nothing && error("No compute environment ARN found for $ce")
new(arn)
end
end

Base.:(==)(a::ComputeEnvironment, b::ComputeEnvironment) = a.arn == b.arn

describe(ce::ComputeEnvironment) = describe_compute_environment(ce)
max_vcpus(ce::ComputeEnvironment) = describe(ce)["computeResources"]["maxvCpus"]
function describe(ce::ComputeEnvironment; aws_config::AbstractAWSConfig=global_aws_config())
describe_compute_environment(ce; aws_config=aws_config)
end
function max_vcpus(ce::ComputeEnvironment; aws_config::AbstractAWSConfig=global_aws_config())
describe(ce; aws_config=aws_config)["computeResources"]["maxvCpus"]
end

"""
create_compute_environment(name;
managed=true,
role="",
resources=Dict(),
enabled=true,
tags=Dict(),
aws_config=global_aws_config())
function compute_environment_arn(ce::AbstractString)
Create a compute environment of type `type` with name `name`.
See the AWS docs [here](https://docs.aws.amazon.com/batch/latest/APIReference/API_CreateComputeEnvironment.html).
"""
function create_compute_environment(name::AbstractString;
managed::Bool=true,
role::AbstractString="",
resources::AbstractDict=Dict{String,Any}(),
enabled::Bool=true,
tags::AbstractDict=Dict{String,Any}(),
aws_config::AbstractAWSConfig=global_aws_config())
type = managed ? "MANAGED" : "UNMANAGED"
args = Dict{String,Any}()
isempty(role) || (args["serviceRole"] = role)
isempty(resources) || (args["computeResources"] = resources)
isempty(tags) || (args["tags"] = tags)
enabled || (args["state"] = "DISABLED")
return @mock Batch.create_compute_environment(name, type, args; aws_config=aws_config)
end

function compute_environment_arn(ce::AbstractString; aws_config::AbstractAWSConfig=global_aws_config())
startswith(ce, "arn:") && return ce
json = describe_compute_environment(ce)
json = describe_compute_environment(ce; aws_config=aws_config)
isempty(json) ? nothing : json["computeEnvironmentArn"]
end

describe_compute_environment(ce::ComputeEnvironment) = describe_compute_environment(ce.arn)
function describe_compute_environment(ce::ComputeEnvironment;
aws_config::AbstractAWSConfig=global_aws_config())
describe_compute_environment(ce.arn; aws_config=aws_config)
end

function describe_compute_environment(ce::AbstractString)::OrderedDict
json = @mock describe_compute_environments(Dict("computeEnvironments" => [ce]))
function describe_compute_environment(ce::AbstractString;
aws_config::AbstractAWSConfig=global_aws_config())::OrderedDict
json = @mock Batch.describe_compute_environments(Dict("computeEnvironments" => [ce]);
aws_config=aws_config)
envs = json["computeEnvironments"]
len = length(envs)::Int
@assert len <= 1
Expand Down
Loading

2 comments on commit de9fec6

@mattBrzezinski
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/42668

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v2.0.0 -m "<description of version>" de9fec6ab077b753a1b34583881fa201a169ae8d
git push origin v2.0.0

Please sign in to comment.