-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
67 lines (55 loc) · 2.33 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import platform
import sys
from os import system, environ
from signal import signal, SIGTERM
from time import sleep
import requests
def monitor(namespace_name, pod_name, excluded_container_names):
kubernetes_service_host = environ.get("KUBERNETES_SERVICE_HOST", "localhost")
kubernetes_service_port = environ.get("KUBERNETES_SERVICE_PORT", "8001")
with open('/run/secrets/kubernetes.io/serviceaccount/token') as f:
token = f.read().strip()
path = f"/api/v1/namespaces/{namespace_name}/pods/{pod_name}"
endpoint = f"https://{kubernetes_service_host}:{kubernetes_service_port}{path}"
headers = {"Authorization": f"Bearer {token}"}
while True:
sleep(3)
response = requests.get(endpoint, verify="/run/secrets/kubernetes.io/serviceaccount/ca.crt", headers=headers)
response.raise_for_status()
pod = response.json()
status = pod.get("status", {})
container_statuses = status.get("containerStatuses", [])
#print(json.dumps(pod, indent=2))
all_terminated = True
for container in container_statuses:
if container.get("name") not in excluded_container_names:
# print(f'{container.get("name"):20s}', end='')
for state in container.get("state", {}).keys():
if state == "terminated":
# print('terminated')
break
else:
# print('running')
all_terminated = False
if all_terminated:
break
if __name__ == '__main__':
namespace_name = environ.get("NAMESPACE_NAME")
if namespace_name is None:
try:
with open('/run/secrets/kubernetes.io/serviceaccount/namespace') as f:
namespace_name = f.read().strip()
except:
namespace_name = "default"
pod_name = environ.get("POD_NAME", platform.node())
excluded_container_names = sys.argv[1:]
print(f"NAMESPACE_NAME={namespace_name}")
print(f"POD_NAME={pod_name}")
print(f"EXCLUDED_CONTAINER_NAMES={' '.join(excluded_container_names)}")
def dummy_handler(_signum, _frame):
pass
signal(SIGTERM, dummy_handler)
monitor(namespace_name, pod_name, excluded_container_names)
print("Signalling all processes to terminate")
system("kill -15 -1")
sys.exit(0)