Python Integration Guide
Complete Python examples for common BreachSpider workflows.
Installation
pip install requests
Client Setup
import requests
from typing import Optional, Iterator
class BreachSpiderClient:
BASE_URL = "https://breachspider.com/api/v1"
def __init__(self, api_key: str):
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
})
def get_cve(self, cve_id: str) -> dict:
response = self.session.get(f"{self.BASE_URL}/cves/{cve_id}")
response.raise_for_status()
return response.json()["data"]
def search_cves(
self,
severity: Optional[str] = None,
kev_only: bool = False,
vendor: Optional[str] = None,
limit: int = 20,
) -> Iterator[dict]:
page = 1
while True:
params = {
"page": page,
"limit": limit,
"kev_only": kev_only,
}
if severity:
params["severity"] = severity
if vendor:
params["vendor"] = vendor
response = self.session.get(f"{self.BASE_URL}/cves", params=params)
response.raise_for_status()
data = response.json()
yield from data["data"]
if not data["pagination"]["has_next"]:
break
page += 1
def get_kev(self, limit: int = 100) -> list:
response = self.session.get(
f"{self.BASE_URL}/cves/kev",
params={"limit": limit}
)
response.raise_for_status()
return response.json()["data"]
def get_dashboard(self) -> dict:
response = self.session.get(f"{self.BASE_URL}/dashboard")
response.raise_for_status()
return response.json()["data"]
def get_environments(self) -> list:
response = self.session.get(f"{self.BASE_URL}/environments")
response.raise_for_status()
return response.json()["data"]
def create_environment(self, name: str, description: str = "") -> dict:
response = self.session.post(
f"{self.BASE_URL}/environments",
json={"name": name, "description": description}
)
response.raise_for_status()
return response.json()["data"]
def add_asset(self, env_id: int, asset: dict) -> dict:
response = self.session.post(
f"{self.BASE_URL}/environments/{env_id}/assets",
json=asset
)
response.raise_for_status()
return response.json()["data"]
Common Workflows
Fetch All KEV Entries for Your Stack
client = BreachSpiderClient("bs_live_your_key_here")
kev_entries = client.get_kev(limit=200)
critical_kev = [
cve for cve in kev_entries
if cve["cvss_score"] and cve["cvss_score"] >= 9.0
]
print(f"Total KEV: {len(kev_entries)}")
print(f"Critical KEV: {len(critical_kev)}")
for cve in critical_kev[:5]:
print(f"{cve['cve_id']} | CVSS {cve['cvss_score']} | {cve['primary_vendor']}")
Monitor Your Environment for New Critical CVEs
import time
from datetime import datetime
def monitor_environment(client, env_id: int, check_interval_seconds: int = 300):
while True:
response = client.session.get(
f"{client.BASE_URL}/environments/{env_id}/summary"
)
summary = response.json()["data"]
critical_count = summary.get("critical_count", 0)
kev_count = summary.get("kev_count", 0)
print(f"[{datetime.utcnow().isoformat()}] "
f"Critical: {critical_count} | KEV: {kev_count}")
time.sleep(check_interval_seconds)
Export CVE Report to CSV
import csv
def export_cves_to_csv(client, output_file: str, severity: str = "CRITICAL"):
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=[
"cve_id", "bsid", "severity", "cvss_score", "bcs_score",
"epss_percentile", "kev_flagged", "exploit_maturity",
"patch_status", "primary_vendor", "published_at"
])
writer.writeheader()
for cve in client.search_cves(severity=severity):
writer.writerow({
"cve_id": cve["cve_id"],
"bsid": cve.get("bsid", ""),
"severity": cve["severity"],
"cvss_score": cve["cvss_score"],
"bcs_score": cve.get("bcs_score", ""),
"epss_percentile": cve.get("epss_percentile", ""),
"kev_flagged": cve["kev_flagged"],
"exploit_maturity": cve.get("exploit_maturity", ""),
"patch_status": cve["patch_status"],
"primary_vendor": cve.get("primary_vendor", ""),
"published_at": cve["published_at"],
})
print(f"Exported to {output_file}")
Webhook Consumer Server
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
app = FastAPI()
WEBHOOK_SECRET = "your-webhook-secret"
def verify_signature(payload: bytes, signature: str) -> bool:
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/breachspider-webhook")
async def handle_webhook(request: Request):
payload = await request.body()
signature = request.headers.get("X-BreachSpider-Signature", "")
if not verify_signature(payload, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = await request.json()
event_type = event["event"]
cve_data = event["data"]
if event_type == "kev.new":
print(f"New KEV: {cve_data['cve_id']} | {cve_data['primary_vendor']}")
elif event_type == "cve.critical":
print(f"Critical CVE: {cve_data['cve_id']} | CVSS {cve_data['cvss_score']}")
return {"status": "received"}