-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #15 from krystianbajno/feature/cisa-kev
Add CISA-KEV, more enrichment, general refactor, more features, filte…
- Loading branch information
Showing
44 changed files
with
803 additions
and
382 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import os | ||
import time | ||
import json | ||
from typing import List, Dict | ||
import httpx | ||
from dateutil import parser as dateutil_parser | ||
|
||
from models.vulnerability import Vulnerability | ||
from services.api.source import Source | ||
from services.vulnerabilities.factories.vulnerability_factory import VulnerabilityFactory | ||
|
||
class CISAKEVAPI(Source): | ||
CACHE_DIR = "cache" | ||
CACHE_FILE = os.path.join(CACHE_DIR, "cisa_kev_cache.json") | ||
CACHE_DURATION = 600 | ||
|
||
def __init__(self): | ||
self.url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json" | ||
self.ensure_cache_dir() | ||
|
||
def ensure_cache_dir(self): | ||
if not os.path.exists(self.CACHE_DIR): | ||
try: | ||
os.makedirs(self.CACHE_DIR) | ||
print(f"[*] Created cache directory at '{self.CACHE_DIR}'.") | ||
except Exception as e: | ||
print(f"[!] Failed to create cache directory '{self.CACHE_DIR}': {e}") | ||
|
||
def is_cache_valid(self) -> bool: | ||
if os.path.exists(self.CACHE_FILE): | ||
cache_mtime = os.path.getmtime(self.CACHE_FILE) | ||
current_time = time.time() | ||
if (current_time - cache_mtime) < self.CACHE_DURATION: | ||
return True | ||
return False | ||
|
||
def load_cache(self) -> Dict: | ||
try: | ||
with open(self.CACHE_FILE, 'r', encoding='utf-8') as f: | ||
data = json.load(f) | ||
print("[*] Loaded CISA KEV catalog from cache.") | ||
return data | ||
except Exception as e: | ||
print(f"[!] Error reading CISA KEV cache: {e}") | ||
return {} | ||
|
||
def update_cache(self, data: Dict): | ||
try: | ||
with open(self.CACHE_FILE, 'w', encoding='utf-8') as f: | ||
json.dump(data, f, ensure_ascii=False, indent=4) | ||
print("[+] CISA KEV catalog downloaded and cached.") | ||
except Exception as e: | ||
print(f"[!] Error updating CISA KEV cache: {e}") | ||
|
||
def fetch_data(self) -> Dict: | ||
try: | ||
print("[*] Downloading CISA KEV catalog...") | ||
response = httpx.get(self.url, timeout=15) | ||
if response.status_code == 200: | ||
data = response.json() | ||
self.update_cache(data) | ||
return data | ||
else: | ||
print(f"[!] Failed to fetch CISA KEV catalog. Status code: {response.status_code}") | ||
except Exception as e: | ||
print(f"[!] Error fetching CISA KEV data: {e}") | ||
return {} | ||
|
||
def get_data(self) -> Dict: | ||
if self.is_cache_valid(): | ||
return self.load_cache() | ||
else: | ||
return self.fetch_data() | ||
|
||
def search(self, keywords: List[str], max_results: int) -> List[Vulnerability]: | ||
|
||
vulnerabilities = [] | ||
try: | ||
data = self.get_data() | ||
kev_vulnerabilities = data.get("vulnerabilities", []) | ||
|
||
for item in kev_vulnerabilities: | ||
cve_id = item.get("cveID") | ||
|
||
if not cve_id: | ||
continue | ||
|
||
date_added = item.get("dateAdded") | ||
try: | ||
parsed_date = dateutil_parser.parse(date_added) | ||
date = parsed_date.strftime('%Y-%m-%d') | ||
except Exception: | ||
date = date_added or "N/A" | ||
|
||
notes = item.get("notes", "") | ||
reference_urls = [url.strip() for url in notes.split(" ; ") if url.strip()] | ||
weaknesses = item.get("cwes", []) | ||
|
||
vulnerability = VulnerabilityFactory.make( | ||
id=cve_id, | ||
source=self.__class__.__name__, | ||
url="https://www.cisa.gov/known-exploited-vulnerabilities-catalog", | ||
date=date, | ||
reference_urls=reference_urls, | ||
description=item.get("shortDescription", "N/A"), | ||
vulnerable_components=[item.get("product", "N/A")], | ||
tags=[item.get("vendorProject", "N/A")], | ||
weaknesses=weaknesses | ||
) | ||
vulnerabilities.append(vulnerability) | ||
|
||
except Exception as e: | ||
print(f"[!] Error processing CISA KEV data: {e}") | ||
|
||
return vulnerabilities |
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from concurrent.futures import ThreadPoolExecutor, as_completed | ||
import time | ||
from typing import List | ||
from models.vulnerability import Vulnerability | ||
from models.vulnerability_intelligence import VulnerabilityIntelligence | ||
from services.api.source import Source | ||
from services.vulnerability_intelligence.enrichment.vulnerability_intelligence_enrichment_manager import VulnerabilityIntelligenceEnrichmentManager | ||
|
||
def collect_from_source_with_retries(manager, source: Source, keywords: List[str], max_results: int) -> List[Vulnerability]: | ||
attempts = 0 | ||
retry_delay = manager.retry_delay | ||
while attempts <= manager.max_retries: | ||
try: | ||
results = source.search(keywords, max_results) | ||
manager.progress_manager.increment_progress( | ||
source.__class__.__name__, len(results), len(manager.sources) | ||
) | ||
return results | ||
except Exception as e: | ||
attempts += 1 | ||
if attempts > manager.max_retries: | ||
raise e | ||
print(f"[!] Error with source {source.__class__.__name__}, attempt {attempts}. Retrying in {retry_delay} seconds...") | ||
time.sleep(retry_delay) | ||
retry_delay *= 2 | ||
|
||
def is_enrichment_enabled(config: dict) -> bool: | ||
return any(config.get('sources', {}).values()) | ||
|
||
def perform_enrichment(vulnerabilities: List[VulnerabilityIntelligence], config: dict) -> List[VulnerabilityIntelligence]: | ||
enrichment_manager = VulnerabilityIntelligenceEnrichmentManager(vulnerabilities, config) | ||
return enrichment_manager.enrich() | ||
|
||
def collect_results(manager, keywords: List[str], max_results: int) -> List[Vulnerability]: | ||
collected_results = [] | ||
with ThreadPoolExecutor(max_workers=256) as executor: | ||
futures = { | ||
executor.submit(collect_from_source_with_retries, manager, source, keywords, max_results): source | ||
for source in manager.sources | ||
} | ||
for future in as_completed(futures): | ||
source = futures[future] | ||
try: | ||
results = future.result() | ||
collected_results.extend(results) | ||
except Exception as e: | ||
print(f"[!] Error with source {source.__class__.__name__}: {e}") | ||
return collected_results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
def filter_by_severity(vulnerabilities, severities): | ||
return [ | ||
vuln for vuln in vulnerabilities | ||
if set(sev['severity'].lower() for sev in vuln.severities).intersection(severities) | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
def prepare_descriptions(vulnerabilities): | ||
for vuln in vulnerabilities: | ||
seen = set() | ||
unique_descriptions = [] | ||
for description in vuln.descriptions: | ||
clean_text = description["text"].replace("\n", " ") | ||
if clean_text not in seen: | ||
seen.add(clean_text) | ||
description["text"] = clean_text | ||
unique_descriptions.append(description) | ||
vuln.descriptions = unique_descriptions | ||
return vulnerabilities |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from threading import Lock | ||
|
||
class ProgressManager: | ||
def __init__(self): | ||
self._lock = Lock() | ||
self._progress_counter = 0 | ||
|
||
def increment_progress(self, source_name: str, result_count: int, total_sources: int): | ||
with self._lock: | ||
self._progress_counter += 1 | ||
print(f"+ Progress: [{self._progress_counter}/{total_sources}] - Source {source_name} completed with {result_count} results.") | ||
|
||
def reset_progress(self): | ||
self._progress_counter = 0 |
Oops, something went wrong.