-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
147 lines (124 loc) · 6.64 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import sys
import logging
import argparse
from analyzer import analyze_iac_file
from analyzer.security import check_security_issues
from analyzer.cost import check_cost_optimization
# Global issue ID counter
issue_id_counter = 1
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
global issue_id_counter
parser = argparse.ArgumentParser(description='IaC Configuration Analyzer')
parser.add_argument('path', nargs='?', default='test_main.tf', help='Path to Terraform file or directory')
parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
args = parser.parse_args()
# Set up logging
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.getLogger().setLevel(log_level)
path = args.path
checks = [check_security_issues, check_cost_optimization] # Default checks
all_issues = []
if os.path.isdir(path):
for root, _, files in os.walk(path):
for file in files:
if file.endswith('.tf'):
file_path = os.path.join(root, file)
logging.info(f"Analyzing file: {file_path}")
file_issues = analyze_iac_file(file_path, checks)
logging.debug(f"Appending issues from file {file_path}: {file_issues}")
all_issues.extend(file_issues)
elif os.path.isfile(path) and path.endswith('.tf'):
logging.info(f"Analyzing file: {path}")
file_issues = analyze_iac_file(path, checks)
logging.debug(f"Appending issues from file {path}: {file_issues}")
all_issues.extend(file_issues)
else:
logging.error("Error: Invalid path or non-Terraform file")
sys.exit(1)
if all_issues:
print_issues(all_issues)
else:
print("No issues found.")
def print_issues(issues):
severity_count = {'HIGH': 0, 'MEDIUM': 0, 'LOW': 0, 'INFO': 0}
logging.debug(f"Total issues: {len(issues)}")
# Log full list of issues before counting
logging.debug(f"Issues list: {issues}")
for i, (severity, message, issue_id) in enumerate(issues):
# Normalize severity labels to ensure consistency
severity_normalized = severity.strip().upper() # Ensure all labels are uppercase and without leading/trailing spaces
logging.debug(f"Issue #{i + 1}: ID: {issue_id}, Normalized severity: '{severity_normalized}', Message: '{message}'")
# Verify if the severity is valid and count it
if severity_normalized in severity_count:
severity_count[severity_normalized] += 1
logging.debug(f"Counting severity: {severity_normalized}, Current count: {severity_count[severity_normalized]}")
else:
logging.warning(f"Unknown severity level: {severity_normalized}")
total = sum(severity_count.values())
logging.debug(f"Final severity count: {severity_count}")
print(f"\n{total} issues found ({severity_count['HIGH']} HIGH, {severity_count['MEDIUM']} MEDIUM, {severity_count['LOW']} LOW, {severity_count['INFO']} INFO)")
# Updated security checks with global issue ID
def check_security_issues(resources):
global issue_id_counter
issues = []
logging.debug(f"Checking security issues for resources: {resources}")
for resource in resources:
for resource_type, resource_config in resource.items():
for resource_name, config in resource_config.items():
if resource_type == 'aws_security_group':
ingress_rules = config.get('ingress', [])
for rule in ingress_rules:
if rule.get('cidr_blocks') == ['0.0.0.0/0'] and rule.get('to_port') == 22:
issue = ('HIGH', f"Security issue: Open SSH access in security group {resource_name}", issue_id_counter)
issues.append(issue)
logging.debug(f"Identified issue: {issue}")
issue_id_counter += 1
elif resource_type == 'aws_s3_bucket':
if config.get('acl') == 'public-read':
issue = ('MEDIUM', f"Public read access enabled on S3 bucket '{config.get('bucket', 'Unknown')}'", issue_id_counter)
issues.append(issue)
logging.debug(f"Identified issue: {issue}")
issue_id_counter += 1
elif resource_type == 'aws_db_instance':
if 'storage_encrypted' not in config or not config['storage_encrypted']:
issue = ('HIGH', f"Security issue: Unencrypted RDS instance {resource_name}", issue_id_counter)
issues.append(issue)
logging.debug(f"Identified issue: {issue}")
issue_id_counter += 1
logging.debug(f"Final security issues list: {issues}")
return issues
# Similar update for cost optimization
def check_cost_optimization(resources):
global issue_id_counter
issues = []
logging.debug(f"Checking cost optimization for resources: {resources}")
for resource in resources:
for resource_type, resource_config in resource.items():
for resource_name, config in resource_config.items():
if resource_type == 'aws_instance':
instance_type = config.get('instance_type')
if instance_type in ['t3.large', 'm5.large']:
issue = ('INFO', f"Consider downsizing instance '{resource_name}' from {instance_type} to t3.medium", issue_id_counter)
issues.append(issue)
logging.debug(f"Identified issue: {issue}")
issue_id_counter += 1
elif resource_type == 'aws_ebs_volume':
size = config.get('size', 0)
if size > 1000:
issue = ('LOW', f"Consider resizing EBS volume '{resource_name}' to reduce cost", issue_id_counter)
issues.append(issue)
logging.debug(f"Identified issue: {issue}")
issue_id_counter += 1
elif resource_type == 'aws_eip':
if 'instance' not in config:
issue = ('LOW', f"Unattached Elastic IP: {resource_name}", issue_id_counter)
issues.append(issue)
logging.debug(f"Identified issue: {issue}")
issue_id_counter += 1
logging.debug(f"Final cost optimization issues list: {issues}")
return issues
if __name__ == "__main__":
main()