Skip to content

Python Integration Guide

Complete Python examples for common BreachSpider workflows.

Installation

pip install requests

Client Setup

import requests
from typing import Optional, Iterator

class BreachSpiderClient:
    BASE_URL = "https://breachspider.com/api/v1"

    def __init__(self, api_key: str):
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json",
        })

    def get_cve(self, cve_id: str) -> dict:
        response = self.session.get(f"{self.BASE_URL}/cves/{cve_id}")
        response.raise_for_status()
        return response.json()["data"]

    def search_cves(
        self,
        severity: Optional[str] = None,
        kev_only: bool = False,
        vendor: Optional[str] = None,
        limit: int = 20,
    ) -> Iterator[dict]:
        page = 1
        while True:
            params = {
                "page": page,
                "limit": limit,
                "kev_only": kev_only,
            }
            if severity:
                params["severity"] = severity
            if vendor:
                params["vendor"] = vendor

            response = self.session.get(f"{self.BASE_URL}/cves", params=params)
            response.raise_for_status()
            data = response.json()

            yield from data["data"]

            if not data["pagination"]["has_next"]:
                break
            page += 1

    def get_kev(self, limit: int = 100) -> list:
        response = self.session.get(
            f"{self.BASE_URL}/cves/kev",
            params={"limit": limit}
        )
        response.raise_for_status()
        return response.json()["data"]

    def get_dashboard(self) -> dict:
        response = self.session.get(f"{self.BASE_URL}/dashboard")
        response.raise_for_status()
        return response.json()["data"]

    def get_environments(self) -> list:
        response = self.session.get(f"{self.BASE_URL}/environments")
        response.raise_for_status()
        return response.json()["data"]

    def create_environment(self, name: str, description: str = "") -> dict:
        response = self.session.post(
            f"{self.BASE_URL}/environments",
            json={"name": name, "description": description}
        )
        response.raise_for_status()
        return response.json()["data"]

    def add_asset(self, env_id: int, asset: dict) -> dict:
        response = self.session.post(
            f"{self.BASE_URL}/environments/{env_id}/assets",
            json=asset
        )
        response.raise_for_status()
        return response.json()["data"]

Common Workflows

Fetch All KEV Entries for Your Stack

client = BreachSpiderClient("bs_live_your_key_here")

kev_entries = client.get_kev(limit=200)

critical_kev = [
    cve for cve in kev_entries
    if cve["cvss_score"] and cve["cvss_score"] >= 9.0
]

print(f"Total KEV: {len(kev_entries)}")
print(f"Critical KEV: {len(critical_kev)}")

for cve in critical_kev[:5]:
    print(f"{cve['cve_id']} | CVSS {cve['cvss_score']} | {cve['primary_vendor']}")

Monitor Your Environment for New Critical CVEs

import time
from datetime import datetime

def monitor_environment(client, env_id: int, check_interval_seconds: int = 300):
    while True:
        response = client.session.get(
            f"{client.BASE_URL}/environments/{env_id}/summary"
        )
        summary = response.json()["data"]

        critical_count = summary.get("critical_count", 0)
        kev_count = summary.get("kev_count", 0)

        print(f"[{datetime.utcnow().isoformat()}] "
              f"Critical: {critical_count} | KEV: {kev_count}")

        time.sleep(check_interval_seconds)

Export CVE Report to CSV

import csv

def export_cves_to_csv(client, output_file: str, severity: str = "CRITICAL"):
    with open(output_file, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=[
            "cve_id", "bsid", "severity", "cvss_score", "bcs_score",
            "epss_percentile", "kev_flagged", "exploit_maturity",
            "patch_status", "primary_vendor", "published_at"
        ])
        writer.writeheader()

        for cve in client.search_cves(severity=severity):
            writer.writerow({
                "cve_id": cve["cve_id"],
                "bsid": cve.get("bsid", ""),
                "severity": cve["severity"],
                "cvss_score": cve["cvss_score"],
                "bcs_score": cve.get("bcs_score", ""),
                "epss_percentile": cve.get("epss_percentile", ""),
                "kev_flagged": cve["kev_flagged"],
                "exploit_maturity": cve.get("exploit_maturity", ""),
                "patch_status": cve["patch_status"],
                "primary_vendor": cve.get("primary_vendor", ""),
                "published_at": cve["published_at"],
            })

    print(f"Exported to {output_file}")

Webhook Consumer Server

from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib

app = FastAPI()
WEBHOOK_SECRET = "your-webhook-secret"

def verify_signature(payload: bytes, signature: str) -> bool:
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.post("/breachspider-webhook")
async def handle_webhook(request: Request):
    payload = await request.body()
    signature = request.headers.get("X-BreachSpider-Signature", "")

    if not verify_signature(payload, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = await request.json()
    event_type = event["event"]
    cve_data = event["data"]

    if event_type == "kev.new":
        print(f"New KEV: {cve_data['cve_id']} | {cve_data['primary_vendor']}")

    elif event_type == "cve.critical":
        print(f"Critical CVE: {cve_data['cve_id']} | CVSS {cve_data['cvss_score']}")

    return {"status": "received"}