-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpython_cicd.py
176 lines (154 loc) · 6.87 KB
/
python_cicd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
This script is used for the CI/CD workflow of Django-based sites running
on AWS. Make sure you have the AWS boto3 Python SDK installed and your
AWS IAM credentials configured.
The overall process is as follows:
1. Collect the proper subnets and security groups to launch ECS tasks
2. Run the ECS Django migrations task to update the db and static files
3. Restart all ECS services so they use the most updated Docker image
A few assumptions are made in this script due to the nature of AWS:
- You are running your ECS tasks in the "default" VPC
- You have a security group named "ecs" that gives all proper firewall
rules (database, Redis, etc)
- Your ECS cluster is named <application>-<environment> (e.g. dash-dev)
- You have a task definition that runs Django migrations with the name of
<application>-<environment>-migrate
Usage:
python python_cicd.py <application> <environment>
Example:
python python_cicd.py dash dev
"""
import argparse
import boto3
import sys
from datetime import datetime
# Argument parser pulls in the application and environment
parser = argparse.ArgumentParser()
parser.add_argument(
'application',
help='the AWS name of your application (e.g. dash)')
parser.add_argument(
'environment',
help='which environment your are deploying (e.g. dev, prod)')
parser.add_argument(
'-g', '--group',
help='the name of the CloudWatch log group where logs are stored.')
parser.add_argument(
'-p', '--prefix',
help='the characters that precede the task ID in the CloudWatch log stream name')
args = parser.parse_args()
def main():
"""Run the AWS commands"""
# Prepare the naming convention of applications on AWS
cluster_name = '{}-{}'.format(args.application, args.environment)
log_group_name = '/ecs/{}/{}'.format(args.application, args.environment)
if args.group is not None:
log_group_name = args.group
log_stream_prefix = 'ecs-{}-{}/{}-{}'.format(
args.application,
args.environment,
args.application,
args.environment
)
if args.group is not None:
log_stream_prefix = args.stream
# Get subnets and security groups to run migration task under
ec2 = boto3.client('ec2')
vpc_resp = ec2.describe_vpcs(
Filters=[{'Name': 'isDefault', 'Values': ['true']}])
try:
vpc_id = vpc_resp['Vpcs'][0]['VpcId']
except IndexError:
print('ERROR: The default VPC does not exist. '
'Either something very bad has happened or '
'this script needs to be updated.')
return
vpc = boto3.resource('ec2').Vpc(vpc_id)
subnets = [subnet.id for subnet in vpc.subnets.all()]
security_groups = [
sg.id for sg in vpc.security_groups.filter(GroupNames=['ecs'])
]
# Find the migration task definition and run it under sufficient security
# groups and subnets
ecs = boto3.client('ecs')
logs = boto3.client('logs')
task_resp = ecs.list_task_definitions(
familyPrefix='{}-migrate'.format(cluster_name), sort='DESC')
try:
latest_migration_task = task_resp['taskDefinitionArns'][0]
# Launch migration task
response = ecs.run_task(cluster=cluster_name,
launchType='FARGATE',
taskDefinition=latest_migration_task,
networkConfiguration={
'awsvpcConfiguration': {
'subnets': subnets,
'securityGroups': security_groups,
'assignPublicIp': 'ENABLED'
}
})
print('Successfully started migrate task. Waiting for it to complete...')
# Wait until the task enters tasks_stopped
# Inspired by https://stackoverflow.com/questions/33701140/using-aws-ecs-with-boto3
task_arn = response['tasks'][0]['taskArn']
waiter = ecs.get_waiter('tasks_stopped')
waiter.wait(cluster=cluster_name, tasks=[task_arn])
# Get log events
task_id = task_arn.split(cluster_name)[1].strip('/')
log_stream_name = '{}/{}'.format(log_stream_prefix, task_id)
response = logs.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
startFromHead=True
)
migration_logs = response['events']
for event in migration_logs:
message = event['message']
print(message)
# Get the container exit code - if not zero, houston we have a problem
response = ecs.describe_tasks(
cluster=cluster_name,
tasks=[task_arn]
)
task_exit_code = response['tasks'][0]['containers'][0]['exitCode']
if task_exit_code != 0:
print('A migration error has occurred: please see the above logs.', file=sys.stderr)
sys.exit(1)
print('Migrate task complete.')
except IndexError:
print('There is no task definition following the "{}-migrate"'
' naming convention. Skipping...'.format(cluster_name))
# Update each service in cluster, forcing new deployment
services_resp = ecs.list_services(cluster=cluster_name)
for service in services_resp['serviceArns']:
# Get the task definition associated with the service so we can
# use the latest one
service_info = ecs.describe_services(cluster=cluster_name,
services=[service])
service_task = service_info['services'][0]['taskDefinition']
task_info = ecs.describe_task_definition(taskDefinition=service_task)
task_family = task_info['taskDefinition']['family']
ecs.update_service(
cluster=cluster_name,
service=service,
taskDefinition=task_family,
forceNewDeployment=True)
# Invalidate the Cloudfront cache as well
app_tag = {'Key': 'Application', 'Value': args.application}
env_tag = {'Key': 'Environment', 'Value': args.environment}
cloudfront = boto3.client('cloudfront')
dists = cloudfront.list_distributions()['DistributionList']['Items']
for dist in dists:
tags = cloudfront.list_tags_for_resource(Resource=dist['ARN'])
tags = tags['Tags']['Items']
time = datetime.now().strftime('%Y%m%d%H%M%S%f')
if app_tag in tags and env_tag in tags:
cloudfront.create_invalidation(DistributionId=dist['Id'],
InvalidationBatch={
'Paths': {
'Quantity': 1,
'Items': ['/*']
},
'CallerReference': time})
if __name__ == "__main__":
main()