-
Notifications
You must be signed in to change notification settings - Fork 0
/
terraform_discovery.py
executable file
·275 lines (242 loc) · 11.6 KB
/
terraform_discovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python
'''Github discovery - queries the github API for info about hmpps services and stores the results in the service catalogue'''
import os
import http.server
import socketserver
import threading
import logging
import re
# import json
from time import sleep
from git import Repo
import requests
from tfparse import load_from_path
SC_API_ENDPOINT = os.getenv("SERVICE_CATALOGUE_API_ENDPOINT")
SC_API_TOKEN = os.getenv("SERVICE_CATALOGUE_API_KEY")
REFRESH_INTERVAL_HOURS = int(os.getenv("REFRESH_INTERVAL_HOURS", "6"))
# Set maximum number of concurrent threads to run, try to avoid secondary github api limits.
MAX_THREADS = 10
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
TEMP_DIR = os.getenv("TEMP_DIR", "/tmp/cp_envs")
# limit results for testing/dev
# See Strapi filter syntax https://docs.strapi.io/dev-docs/api/rest/filters-locale-publication
# Example filter string = '&filters[name][$contains]=example'
SC_FILTER = os.getenv("SC_FILTER", '')
SC_PAGE_SIZE=10
SC_PAGINATION_PAGE_SIZE=f"&pagination[pageSize]={SC_PAGE_SIZE}"
# Example Sort filter
#SC_SORT='&sort=updatedAt:asc'
SC_SORT = ''
SC_ENDPOINT = f"{SC_API_ENDPOINT}/v1/components?populate=environments{SC_FILTER}{SC_PAGINATION_PAGE_SIZE}{SC_SORT}"
class HealthHttpRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(bytes("UP", "utf8"))
return
def update_sc_namespace(ns_id, data):
try:
log.debug(data)
if not ns_id:
x = requests.post(f"{SC_API_ENDPOINT}/v1/namespaces", headers=sc_api_headers, json = {"data": data}, timeout=10)
else:
x = requests.put(f"{SC_API_ENDPOINT}/v1/namespaces/{ns_id}", headers=sc_api_headers, json = {"data": data}, timeout=10)
if x.status_code == 200:
log.info(f"Successfully updated namespace id {ns_id}: {x.status_code}")
else:
log.info(f"Received non-200 response from service catalogue for namespace id {ns_id}: {x.status_code} {x.content}")
except Exception as e:
log.error(f"Error updating namespace in the SC: {e}")
def get_sc_product_id(product_id):
try:
r = requests.get(f"{SC_API_ENDPOINT}/v1/products?filters[p_id][$eq]={product_id}", headers=sc_api_headers, timeout=10)
if r.status_code == 200:
log.info(f"Successfully found product with internal ID {product_id}: {r.status_code}")
return r.json()['data'][0]['id']
else:
log.info(f"Received non-200 response from service catalogue searching for internal product ID: {product_id}: {r.status_code} {r.content}")
return False
except Exception as e:
log.error(f"Error getting product ID from SC: {e}")
return False
def process_repo(**component):
for e in component["attributes"]["environments"]:
namespace = e['namespace']
if not namespace in namespaces:
# Add namespace to list of namespaces being done.
namespaces.append(namespace)
else:
# Skip this namespace as it's already processed.
log.debug(f"skipping {namespace} namespace - already been processed")
continue
sc_filter_namespace = f"&filters[name][$eq]={namespace}&populate=*"
r = requests.get(f"{SC_API_ENDPOINT}/v1/namespaces?{sc_filter_namespace}", headers=sc_api_headers, timeout=10)
if r.status_code == 200:
sc_data = r.json()["data"]
if sc_data and sc_data[0]['id']:
log.debug("Found namespace ID")
namespace_id = sc_data[0]['id']
else:
log.debug("No namespace ID found")
namespace_id = None
data = { "name": namespace }
resources_dir = f"{TEMP_DIR}/namespaces/live.cloud-platform.service.justice.gov.uk/{namespace}/resources"
if os.path.isdir(resources_dir):
# tfparse is not thread-safe!
with lock:
log.debug(f"Thread locked for tfparse: {resources_dir}")
parsed = load_from_path(resources_dir)
# log.debug(json.dumps(parsed, indent=2))
#print(json.dumps(parsed, indent=2))
for m in parsed['module']:
# Get terraform module version
tf_mod_version = str()
try:
regex = r"(?<=[\\?]ref=)[0-9]+(\.[0-9])?(\.[0-9])?$"
tf_mod_version = re.search(regex, m['source'])[0]
except TypeError:
pass
# Look for RDS instances.
if "cloud-platform-terraform-rds-instance" in m['source']:
rds_instance = m
# Delete ID that is generated by tfparse
del rds_instance['id']
# Process fields
rds_instance.update({'tf_label': rds_instance['__tfmeta']['label']})
rds_instance.update({'tf_filename': rds_instance['__tfmeta']['filename']})
rds_instance.update({'tf_path': rds_instance['__tfmeta']['path']})
rds_instance.update({'tf_line_end': rds_instance['__tfmeta']['line_end']})
rds_instance.update({'tf_line_start': rds_instance['__tfmeta']['line_start']})
rds_instance.update({'tf_mod_version': tf_mod_version})
# Check for existing instance in SC and update same ID if so.
try:
# If there are any rds instances in the existing SC data
if sc_data[0]["attributes"]["rds_instance"]:
# Find the RDS instance SC ID that matches
rds_id = list(filter(lambda rds: rds['tf_path'] == rds_instance['__tfmeta']['path'], sc_data[0]["attributes"]["rds_instance"]))[0]['id']
rds_instance.update({'id': rds_id})
except IndexError:
pass
# Clean up field not used in post to SC
del rds_instance['__tfmeta']
data.update({"rds_instance": [rds_instance]})
# Look for elasticache instances.
if "cloud-platform-terraform-elasticache-cluster" in m['source']:
elasticache_cluster = m
# Delete ID that is generated by tfparse
del elasticache_cluster['id']
# Process fields
elasticache_cluster.update({'tf_label': elasticache_cluster['__tfmeta']['label']})
elasticache_cluster.update({'tf_filename': elasticache_cluster['__tfmeta']['filename']})
elasticache_cluster.update({'tf_path': elasticache_cluster['__tfmeta']['path']})
elasticache_cluster.update({'tf_line_end': elasticache_cluster['__tfmeta']['line_end']})
elasticache_cluster.update({'tf_line_start': elasticache_cluster['__tfmeta']['line_start']})
elasticache_cluster.update({'tf_mod_version': tf_mod_version})
# Check for existing instance in SC and update same ID if so.
try:
# If there are any rds instances in the existing SC data
if sc_data[0]["attributes"]["elasticache_cluster"]:
# Find the elasticache cluster SC ID that matches
elasticache_id = list(filter(lambda elasticache: elasticache['tf_path'] == elasticache_cluster['__tfmeta']['path'], sc_data[0]["attributes"]["elasticache_cluster"]))[0]['id']
elasticache_cluster.update({'id': elasticache_id})
except (IndexError,KeyError):
pass
# Clean up field not used in post to SC
del elasticache_cluster['__tfmeta']
data.update({"elasticache_cluster": [elasticache_cluster]})
if 'pingdom_check' in parsed.keys() :
for r in parsed['pingdom_check']:
# Look for RDS instances.
if "http" in r['type']:
pingdom_check = r
# Delete ID that is generated by tfparse
del pingdom_check['id']
# Process fields
pingdom_check.update({'tf_label': pingdom_check['__tfmeta']['label']})
pingdom_check.update({'tf_filename': pingdom_check['__tfmeta']['filename']})
pingdom_check.update({'tf_path': pingdom_check['__tfmeta']['path']})
pingdom_check.update({'tf_line_end': pingdom_check['__tfmeta']['line_end']})
pingdom_check.update({'tf_line_start': pingdom_check['__tfmeta']['line_start']})
# pingdom_check.update({'tf_mod_version': tf_mod_version})
# Check for existing instance in SC and update same ID if so.
try:
# If there are any rds instances in the existing SC data
if sc_data[0]["attributes"]["pingdom_check"]:
# Find the Pingdom check SC ID that matches
pingdom_id = list(filter(lambda pingdom: pingdom['tf_path'] == pingdom_check['__tfmeta']['path'], sc_data[0]["attributes"]["pingdom_check"]))[0]['id']
pingdom_check.update({'id': pingdom_id})
except IndexError:
pass
# Clean up field not used in post to SC
del pingdom_check['__tfmeta']
data.update({"pingdom_check": [pingdom_check]})
print(data)
update_sc_namespace(namespace_id, data)
return True
def startHttpServer():
handler_object = HealthHttpRequestHandler
with socketserver.TCPServer(("", 8080), handler_object) as httpd:
httpd.serve_forever()
def process_components(data):
log.info(f"Processing batch of {len(data)} components...")
for component in data:
t_repo = threading.local()
t_repo = threading.Thread(target=process_repo, kwargs=component, daemon=True)
# Apply limit on total active threads
while threading.active_count() > (MAX_THREADS-1):
log.debug(f"Active Threads={threading.active_count()}, Max Threads={MAX_THREADS}")
sleep(10)
t_repo.start()
component_name = component["attributes"]["name"]
log.info(f"Started thread for {component_name}")
if __name__ == '__main__':
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(threadName)s %(message)s', level=LOG_LEVEL)
log = logging.getLogger(__name__)
sc_api_headers = {"Authorization": f"Bearer {SC_API_TOKEN}", "Content-Type":"application/json","Accept": "application/json"}
# Test connection to Service Catalogue
try:
r = requests.head(f"{SC_API_ENDPOINT}/_health", headers=sc_api_headers, timeout=10)
log.info(f"Successfully connected to the Service Catalogue. {r.status_code}")
except Exception as e:
log.critical("Unable to connect to the Service Catalogue.")
raise SystemExit(e) from e
# Start health endpoint.
httpHealth = threading.Thread(target=startHttpServer, daemon=True)
httpHealth.start()
lock = threading.Lock()
while True:
# Start with an empty list.
namespaces = []
if not os.path.isdir(TEMP_DIR):
cp_envs_repo = Repo.clone_from("https://github.com/ministryofjustice/cloud-platform-environments.git", TEMP_DIR)
else:
cp_envs_repo = Repo(TEMP_DIR)
origin = cp_envs_repo.remotes.origin
origin.pull()
try:
r = requests.get(SC_ENDPOINT, headers=sc_api_headers, timeout=10)
log.debug(r)
if r.status_code == 200:
j_meta = r.json()["meta"]["pagination"]
log.debug(f"Got result page: {j_meta['page']} from SC")
sc_data = r.json()["data"]
process_components(sc_data)
else:
raise Exception(f"Received non-200 response from Service Catalogue: {r.status_code}")
# Loop over the remaining pages and return one at a time
num_pages = j_meta['pageCount']
for p in range(2, num_pages+1):
page=f"&pagination[page]={p}"
r = requests.get(f"{SC_ENDPOINT}{page}", headers=sc_api_headers, timeout=10)
if r.status_code == 200:
j_meta = r.json()["meta"]["pagination"]
log.debug(f"Got result page: {j_meta['page']} from SC")
sc_data = r.json()["data"]
process_components(sc_data)
else:
raise Exception(f"Received non-200 response from Service Catalogue: {r.status_code}")
except Exception as e:
log.error(f"Problem with Service Catalogue API. {e}")
sleep((REFRESH_INTERVAL_HOURS * 60 * 60))