From b79fdf8a953e2e50fabab474b9b78639d2375914 Mon Sep 17 00:00:00 2001 From: Mansur Uralov Date: Thu, 23 Jan 2025 16:43:07 +0100 Subject: [PATCH] chore: enhance data sanitizer to remove last-applied-configuration - Implemented _remove_last_applied_configuration method to delete the kubectl.kubernetes.io/last-applied-configuration annotation from Kubernetes resource objects. - Added unit tests to verify the removal of the last-applied-configuration annotation from Deployment resources. --- src/services/data_sanitizer.py | 22 +++++ tests/unit/services/test_data_sanitizer.py | 110 +++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/src/services/data_sanitizer.py b/src/services/data_sanitizer.py index c17a56a1..089cf585 100644 --- a/src/services/data_sanitizer.py +++ b/src/services/data_sanitizer.py @@ -133,6 +133,9 @@ def _sanitize_secret(self, obj: dict) -> dict: def _sanitize_workload(self, obj: dict) -> dict: """Sanitize a workload object (Deployment, Pod, StatefulSet, DaemonSet).""" try: + # First remove last-applied-configuration + obj = self._remove_last_applied_configuration(obj) + # Handle template-based resources (Deployment, StatefulSet, DaemonSet) if "spec" in obj and "template" in obj["spec"]: containers = obj["spec"]["template"]["spec"]["containers"] @@ -172,6 +175,9 @@ def _sanitize_dict(self, data: dict) -> dict: """Recursively sanitize a dictionary by looking for sensitive data patterns.""" result = data.copy() + # First remove last-applied-configuration if exists + result = self._remove_last_applied_configuration(result) + for key, value in data.items(): # Check if the key should be excluded from sanitization if ( @@ -204,3 +210,19 @@ def _clean_personal_information(self, data: dict) -> dict: sanitized_data_str = self.scrubber.clean(data_str) return dict(json.loads(sanitized_data_str)) + + @staticmethod + def _remove_last_applied_configuration(data: dict) -> dict: + """Remove kubectl.kubernetes.io/last-applied-configuration annotation if it exists.""" + if "metadata" in data and "annotations" in data["metadata"]: + if ( + "kubectl.kubernetes.io/last-applied-configuration" + in data["metadata"]["annotations"] + ): + del data["metadata"]["annotations"][ + "kubectl.kubernetes.io/last-applied-configuration" + ] + # Remove empty annotations dict if it's the last annotation + if not data["metadata"]["annotations"]: + del data["metadata"]["annotations"] + return data diff --git a/tests/unit/services/test_data_sanitizer.py b/tests/unit/services/test_data_sanitizer.py index 9f7bf019..89bf83a5 100644 --- a/tests/unit/services/test_data_sanitizer.py +++ b/tests/unit/services/test_data_sanitizer.py @@ -1019,6 +1019,116 @@ def test_data_structures_and_pii(self, test_data, expected_results, error): "defaultRuntimePodPreset": "M", }, ), + # delete "kubectl.kubernetes.io/last-applied-configuration" field completely + ( + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "annotations": { + "deployment.kubernetes.io/revision": "1", + "kubectl.kubernetes.io/last-applied-configuration": '{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"annotations":{},"labels":{"app":"nginx"},"name":"nginx","namespace":"test-sanit"},"spec":{"replicas":3,"selector":{"matchLabels":{"app":"nginx"}},"template":{"metadata":{"labels":{"app":"nginx"}},"spec":{"containers":[{"env":[{"name":"TOKEN","value":"test token"},{"name":"PASSWORD","value":"test password"},{"name":"CLIENT_ID","value":"test client ID"},{"name":"USER_NAME","value":"test user name"}],"image":"nginx:1.14.2","name":"nginx","ports":[{"containerPort":80}]}]}}}}\n', + }, + }, + "name": "nginx", + "namespace": "test-sanit", + "spec": { + "progressDeadlineSeconds": 600, + "replicas": 3, + "revisionHistoryLimit": 10, + "selector": {"matchLabels": {"app": "nginx"}}, + "strategy": { + "rollingUpdate": { + "maxSurge": "25%", + "maxUnavailable": "25%", + }, + "type": "RollingUpdate", + }, + "template": { + "metadata": { + "labels": {"app": "nginx"}, + }, + "spec": { + "containers": [ + { + "env": [ + {"name": "TOKEN", "value": "test token"}, + { + "name": "PASSWORD", + "value": "test password", + }, + { + "name": "CLIENT_ID", + "value": "test client ID", + }, + { + "name": "USER_NAME", + "value": "test user name", + }, + ], + "image": "nginx:1.14.2", + "name": "nginx", + "ports": [{"containerPort": 80}], + } + ], + }, + }, + }, + }, + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "annotations": { + "deployment.kubernetes.io/revision": "1", + } + }, + "name": "nginx", + "namespace": "test-sanit", + "spec": { + "progressDeadlineSeconds": 600, + "replicas": 3, + "revisionHistoryLimit": 10, + "selector": {"matchLabels": {"app": "nginx"}}, + "strategy": { + "rollingUpdate": { + "maxSurge": "25%", + "maxUnavailable": "25%", + }, + "type": "RollingUpdate", + }, + "template": { + "metadata": { + "labels": {"app": "nginx"}, + }, + "spec": { + "containers": [ + { + "env": [ + {"name": "TOKEN", "value": REDACTED_VALUE}, + { + "name": "PASSWORD", + "value": REDACTED_VALUE, + }, + { + "name": "CLIENT_ID", + "value": REDACTED_VALUE, + }, + { + "name": "USER_NAME", + "value": REDACTED_VALUE, + }, + ], + "image": "nginx:1.14.2", + "name": "nginx", + "ports": [{"containerPort": 80}], + } + ], + }, + }, + }, + }, + ), ], ) def test_kubernetes_resources(self, test_data, expected_results):