Skip to content

Commit

Permalink
Merge pull request #20 from krystianbajno/feature/caching-more-source…
Browse files Browse the repository at this point in the history
…s-refactor

Feature/caching more sources refactor
  • Loading branch information
krystianbajno authored Nov 24, 2024
2 parents fc68f8b + 2fa2091 commit 5265f9e
Show file tree
Hide file tree
Showing 29 changed files with 881 additions and 205 deletions.
28 changes: 25 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,25 @@
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fkrystianbajno%2Fcveseeker.svg?type=shield)](https://app.fossa.com/projects/git%2Bgithub.com%2Fkrystianbajno%2Fcveseeker?ref=badge_shield)
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fkrystianbajno%2Fcveseeker.svg?type=shield&issueType=security)](https://app.fossa.com/projects/git%2Bgithub.com%2Fkrystianbajno%2Fcveseeker?ref=badge_shield&issueType=security)

This tool functions similarly to SearchSploit, allowing to search for known vulnerabilities by utilizing keywords and integrating multiple online services.
A powerful, modular, and extensible vulnerability assessment and vulnerability intelligence tool searching for CVEs and exploits using keywords across multiple sources. It collects, analyzes, and enriches CVE data from multiple trusted sources, empowering security researchers, and organizations to keep vulnerabilities close and actions proactive.

<img src="https://raw.githubusercontent.com/krystianbajno/krystianbajno/main/img/cveseekerino.png"/>
# Features

- **Multi-Source Aggregation**: Fetch data from a variety of sources.
- **Enrichment Capabilities**: Enhance CVE details with severity metrics, reference URLs, available exploits, and mitigations, enabling you to produce actionable intelligence.
- **Caching and Optimization**: Uses smart caching to minimize API requests and optimize performance, enabling you to use it in air-gapped networks.
- **Flexible Configuration**: Enable or disable providers and enrichment sources as needed.
- **CLI Simplicity**: Intuitive command-line interface for streamlined operations.
- **Reports**: Create vulnerability reports with ease.

# Use Case Scenarios

- Research vulnerabilities and associated metadata.
- Product vulnerability background checking.
- Automate vulnerability triaging and reporting.
- Gain insights for security monitoring and proactive threat mitigation.

<img src="https://raw.githubusercontent.com/krystianbajno/krystianbajno/main/img/cveseekerino-6.png"/>

# How to use
```bash
Expand All @@ -19,20 +35,22 @@ python3 cveseeker.py cve-2024
python3 cveseeker.py cve-2024 --max-per-provider 2000 # max results per provider, default 100
python3 cveseeker.py cve-2024 --report # generate CSV, JSON and HTML report
python3 cveseeker.py cve-2024 --critical --high --medium --low # include critical, high, medium, and low severities

```

# Sources
- [www.exploit-db.com](https://www.exploit-db.com) (IMPLEMENTED)
- [services.nvd.nist.gov](https://services.nvd.nist.gov/rest/json/cves/2.0?noRejected) (IMPLEMENTED)
- [services.nvd.nist.gov/cached:mirror/fkie-cad/nvd-json-data-feeds](https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download/CVE-all.json.xz) (IMPLEMENTED)
- [www.opencve.io](https://www.opencve.io) (IMPLEMENTED)
- [www.packetstormsecurity.com](https://packetstormsecurity.com) (IMPLEMENTED)
- [vulners.com](https://vulners.com/search) (IMPLEMENTED)
- [www.cisa.gov - KEV](https://www.cisa.gov/known-exploited-vulnerabilities-catalog) (IMPLEMENTED)
- [www.rapid7.com](https://www.rapid7.com) (IMPLEMENTED)
- [cve.mitre.org](https://cve.mitre.org/cve/search_cve_list.html) (WIP)
- [github.com PoC](https://github.com/nomi-sec/PoC-in-GitHub) (IMPLEMENTED)
- [github.com PoC - Cached](https://github.com/nomi-sec/PoC-in-GitHub) (IMPLEMENTED)
- [github.com advisories](https://github.com/advisories) (IMPLEMENTED)
- [github.com/trickest/cve - Cached](https://github.com/search?q=repo%3Atrickest%2Fcve%20cve-2024&type=code) (IMPLEMENTED)
- [github.com/trickest/cve](https://github.com/search?q=repo%3Atrickest%2Fcve%20cve-2024&type=code) (IMPLEMENTED)

# Reporting
Expand All @@ -47,3 +65,7 @@ python3 cveseeker.py smbghost --report

## License
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fkrystianbajno%2Fcveseeker.svg?type=large)](https://app.fossa.com/projects/git%2Bgithub.com%2Fkrystianbajno%2Fcveseeker?ref=badge_large)


## TODO:
Full CVSSv2 support
8 changes: 6 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
providers:
NistAPI: false
NistCachedAPI: true
PacketStormSecurityAPI: true
OpenCVEAPI: true
ExploitDBAPI: true
Expand All @@ -11,6 +12,9 @@ providers:
enrichment:
sources:
vulners: true
github: true
github: false
github_cached: true
cisa_kev: true
github_poc: true
github_poc: false
github_poc_cached: true
nist_cached: true
9 changes: 6 additions & 3 deletions providers/search_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
from services.api.sources.exploitdb import ExploitDBAPI
from services.api.sources.github_advisories import GitHubAdvisoryAPI
from services.api.sources.nist import NistAPI
from services.api.sources.nist_cached import NistCachedAPI
from services.api.sources.opencve import OpenCVEAPI
from services.api.sources.packetstormsecurity import PacketStormSecurityAPI
from services.api.sources.rapid7 import RAPID7
from services.api.sources.vulners import VulnersAPI

from typing import Dict

from services.search.engine.progress_factory import ProgressManagerFactory
from services.search.search_manager import SearchManager
from services.search.engine.progress import ProgressManager
from services.search.engine.progress_manager import ProgressManager

class SearchProvider:
def __init__(self, playwright_enabled=False, config_file='config.yaml'):
Expand All @@ -22,6 +24,7 @@ def __init__(self, playwright_enabled=False, config_file='config.yaml'):

self.provider_registry = {
'NistAPI': NistAPI,
"NistCachedAPI": NistCachedAPI,
'PacketStormSecurityAPI': PacketStormSecurityAPI,
'OpenCVEAPI': OpenCVEAPI,
'ExploitDBAPI': ExploitDBAPI,
Expand Down Expand Up @@ -57,8 +60,8 @@ def boot(self):
playwright_providers = []
providers.extend(playwright_providers)

progress_manager = ProgressManager()
self.search_service = SearchManager(providers, enrichment_config, progress_manager=progress_manager)
progress_manager_factory = ProgressManagerFactory()
self.search_service = SearchManager(providers, enrichment_config, progress_manager_factory=progress_manager_factory)

def load_config(self) -> Dict:
try:
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ Jinja2==3.1.4
python_dateutil==2.8.2
PyYAML==6.0.1
Requests==2.32.3
lzma
zipfile
56 changes: 31 additions & 25 deletions services/api/sources/cisa_kev.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,48 @@
class CISAKEVAPI(Source):
CACHE_DIR = "cache"
CACHE_FILE = os.path.join(CACHE_DIR, "cisa_kev_cache.json")
CACHE_DURATION = 600
CACHE_DURATION = 600 # 10 minutes
CISA_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"

def __init__(self):
self.url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
self.ensure_cache_dir()

def ensure_cache_dir(self):
if not os.path.exists(self.CACHE_DIR):
try:
os.makedirs(self.CACHE_DIR)
print(f"[*] Created cache directory at '{self.CACHE_DIR}'.")
print(f"Created cache directory at '{self.CACHE_DIR}'.")
except Exception as e:
print(f"[!] Failed to create cache directory '{self.CACHE_DIR}': {e}")

def is_cache_valid(self) -> bool:
if os.path.exists(self.CACHE_FILE):
cache_mtime = os.path.getmtime(self.CACHE_FILE)
current_time = time.time()
if (current_time - cache_mtime) < self.CACHE_DURATION:
return True
return (current_time - cache_mtime) < self.CACHE_DURATION
return False

def load_cache(self) -> Dict:
try:
with open(self.CACHE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
print("[*] Loaded CISA KEV catalog from cache.")
return data
return json.load(f)
except Exception as e:
print(f"[!] Error reading CISA KEV cache: {e}")
print(f"[!] Error reading cache file '{self.CACHE_FILE}': {e}")
return {}

def update_cache(self, data: Dict):
try:
with open(self.CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print("[+] CISA KEV catalog downloaded and cached.")
print(f"[+] Cache file '{self.CACHE_FILE}' updated successfully.")
except Exception as e:
print(f"[!] Error updating CISA KEV cache: {e}")
print(f"[!] Error updating cache file '{self.CACHE_FILE}': {e}")

def fetch_data(self) -> Dict:
try:
print("[*] Downloading CISA KEV catalog...")
response = httpx.get(self.url, timeout=15)
response = httpx.get(self.CISA_URL, timeout=15)
if response.status_code == 200:
data = response.json()
self.update_cache(data)
Expand All @@ -72,19 +70,23 @@ def get_data(self) -> Dict:
else:
return self.fetch_data()

def search(self, keywords: List[str], max_results: int) -> List[Vulnerability]:

def search(self, keywords: List[str], max_results: int = 10) -> List[Vulnerability]:
vulnerabilities = []

try:
data = self.get_data()
kev_vulnerabilities = data.get("vulnerabilities", [])
keyword_set = set(keyword.lower() for keyword in keywords)

for item in kev_vulnerabilities:
cve_id = item.get("cveID")

if not cve_id:
continue

description = item.get("shortDescription", "N/A").lower()
if not any(keyword in description for keyword in keyword_set):
continue

date_added = item.get("dateAdded")
try:
parsed_date = dateutil_parser.parse(date_added)
Expand All @@ -96,18 +98,22 @@ def search(self, keywords: List[str], max_results: int) -> List[Vulnerability]:
reference_urls = [url.strip() for url in notes.split(" ; ") if url.strip()]
weaknesses = item.get("cwes", [])

vulnerability = VulnerabilityFactory.make(
id=cve_id,
source=self.__class__.__name__,
url="https://www.cisa.gov/known-exploited-vulnerabilities-catalog",
date=date,
reference_urls=reference_urls,
description=item.get("shortDescription", "N/A"),
vulnerable_components=[item.get("product", "N/A")],
tags=[item.get("vendorProject", "N/A")],
weaknesses=weaknesses
vulnerabilities.append(
VulnerabilityFactory.make(
id=cve_id,
source=self.__class__.__name__,
url="https://www.cisa.gov/known-exploited-vulnerabilities-catalog",
date=date,
reference_urls=reference_urls,
description=item.get("shortDescription", "N/A"),
vulnerable_components=[item.get("product", "N/A")],
tags=[item.get("vendorProject", "N/A")],
weaknesses=weaknesses
)
)
vulnerabilities.append(vulnerability)

if max_results and len(vulnerabilities) >= max_results:
break

except Exception as e:
print(f"[!] Error processing CISA KEV data: {e}")
Expand Down
4 changes: 3 additions & 1 deletion services/api/sources/cveproject.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# https://github.com/CVEProject
# https://github.com/CVEProject
# https://github.com/CVEProject/cvelistV5/releases/download/cve_2024-11-24_1300Z/2024-11-24_all_CVEs_at_midnight.zip.zip
# caching
77 changes: 77 additions & 0 deletions services/api/sources/nist_cached.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from models.vulnerability import Vulnerability
from services.search.nist_cache_manager import get_cve_data_cache
from services.vulnerabilities.factories.vulnerability_factory import VulnerabilityFactory, DEFAULT_VALUES
from dateutil import parser as dateutil_parser
from typing import List

class NistCachedAPI:
def search(self, keywords: List[str], max_results: int) -> List[Vulnerability]:
cve_data_cache = get_cve_data_cache()

if not cve_data_cache:
print("[!] CVE data is not available. Returning empty results.")
return []

vulnerabilities = []
cve_items = cve_data_cache.get("cve_items", [])

cve_items.sort(key=lambda item: item.get('published', ''), reverse=True)
keyword_set = {keyword.lower() for keyword in keywords}

for item in cve_items:
cve_id = item.get("id", DEFAULT_VALUES["id"])
descriptions = item.get("descriptions", [])
description = next(
(desc.get("value") for desc in descriptions if desc.get("lang") == "en"), DEFAULT_VALUES["description"]
)

published_date = item.get("published", DEFAULT_VALUES["date"])
parsed_date = dateutil_parser.parse(published_date)
date = parsed_date.strftime('%Y-%m-%d')

reference_urls = [ref.get("url", DEFAULT_VALUES["url"]) for ref in item.get("references", [])]

metrics = item.get("metrics", {}).get("cvssMetricV2", [])
if metrics:
metric = metrics[0]
base_score = str(metric.get("cvssData", {}).get("baseScore", DEFAULT_VALUES["base_score"]))
base_severity = metric.get("baseSeverity", DEFAULT_VALUES["base_severity"])
else:
base_score = DEFAULT_VALUES["base_score"]
base_severity = DEFAULT_VALUES["base_severity"]

vulnerable_components = []
configurations = item.get("configurations", [])

for config in configurations:
for node in config.get("nodes", []):
for cpe_match in node.get("cpeMatch", []):
if cpe_match.get("vulnerable", False):
vulnerable_components.append(cpe_match.get("criteria", DEFAULT_VALUES["url"]))

if not (
any(keyword in description.lower() for keyword in keyword_set)
or any(keyword in cve_id.lower() for keyword in keyword_set)
or any(keyword in url.lower() for keyword in keyword_set for url in reference_urls)
or any(keyword in component.lower() for keyword in keyword_set for component in vulnerable_components)
):
continue

vulnerabilities.append(
VulnerabilityFactory.make(
id=cve_id,
url="https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download/CVE-all.json.xz",
source=self.__class__.__name__,
date=date,
reference_urls=reference_urls,
base_score=base_score,
base_severity=base_severity,
description=description,
vulnerable_components=vulnerable_components
)
)

if max_results and len(vulnerabilities) >= max_results:
break

return vulnerabilities
Loading

0 comments on commit 5265f9e

Please sign in to comment.