AI Recon
Intermediate
T1595 T1592

AI-Powered Reconnaissance

AI-powered reconnaissance transforms traditional sequential manual enumeration into intelligent, adaptive pipelines where large language models correlate data from multiple sources, prioritize targets by exploitability, and surface findings a human analyst might miss — faster, deeper, and more comprehensive attack surface mapping with less operator effort.

Chapter Scope

This chapter covers the leading AI-enhanced recon tools (BBOT, Subfinder, Katana, Caido), techniques for enriching traditional tool output with LLM analysis, and a complete automated pipeline that chains subdomain enumeration, port scanning, technology detection, vulnerability scanning, and AI-driven reporting into a single workflow.

1. AI Reconnaissance Overview

AI Reconnaissance Pipeline

graph TB subgraph Input["Target Input"] T[Target Domain / Scope] end subgraph Passive["Passive Recon Layer"] DNS[DNS Enumeration] WHOIS[WHOIS Lookup] CT[Certificate Transparency] OSINT[OSINT Collection] end subgraph Active["Active Recon Layer"] SUB[Subdomain Discovery] PORT[Port Scanning] TECH[Technology Detection] CRAWL[Web Crawling] end subgraph AI["AI Analysis Layer"] LLM[LLM Correlation Engine] RISK[Risk Scoring Model] PRIO[Target Prioritization] REPORT[Structured Report] end T --> DNS T --> WHOIS T --> CT T --> OSINT DNS --> SUB WHOIS --> SUB CT --> SUB OSINT --> LLM SUB --> PORT PORT --> TECH TECH --> CRAWL CRAWL --> LLM PORT --> LLM TECH --> LLM LLM --> RISK RISK --> PRIO PRIO --> REPORT style Input fill:#1a1a2e,stroke:#00ff41,color:#fff style Passive fill:#16213e,stroke:#0ff,color:#fff style Active fill:#0f3460,stroke:#e94560,color:#fff style AI fill:#1a1a2e,stroke:#00ff41,color:#fff

2. BBOT — AI-Enhanced Recon Framework

BBOT (Bighuge BLS OSINT Tool) is the premier open-source reconnaissance framework built for modern attack surface mapping. Its modular architecture supports 100+ modules covering subdomain enumeration, port scanning, web crawling, technology detection, and vulnerability identification — all coordinated through a single event-driven engine.

BBOT Architecture and Module System

graph LR subgraph Core["BBOT Core"] SCAN[Scan Engine] MOD[Module Loader] DB[Event Database] end subgraph Modules["Module Categories"] ENUM[Enumeration Modules] ACTIVE[Active Modules] PASSIVE[Passive Modules] OUTPUT[Output Modules] end subgraph AI_Layer["AI Integration"] LLMA[LLM Analysis Module] CLASSIFY[Classification Engine] ENRICH[Enrichment Pipeline] end SCAN --> MOD MOD --> ENUM MOD --> ACTIVE MOD --> PASSIVE MOD --> OUTPUT ENUM --> DB ACTIVE --> DB PASSIVE --> DB DB --> LLMA LLMA --> CLASSIFY CLASSIFY --> ENRICH ENRICH --> OUTPUT style Core fill:#1a1a2e,stroke:#00ff41,color:#fff style Modules fill:#0f3460,stroke:#e94560,color:#fff style AI_Layer fill:#16213e,stroke:#0ff,color:#fff

Installation & Setup

bbot-install.sh
bash
# Install BBOT
pipx install bbot

# Verify installation
bbot --version

# List all available modules
bbot -l

# List modules by type
bbot -l -t subdomain-enum
bbot -l -t active
bbot -l -t passive
# Install BBOT
pipx install bbot

# Verify installation
bbot --version

# List all available modules
bbot -l

# List modules by type
bbot -l -t subdomain-enum
bbot -l -t active
bbot -l -t passive

API Keys

BBOT becomes significantly more powerful with API keys for services like Shodan, Censys, SecurityTrails, VirusTotal, and GitHub. Configure them in ~/.config/bbot/secrets.yml to unlock passive enumeration modules that require authentication.

Scanning Profiles & Module System

BBOT organizes modules into flags (categories like subdomain-enum, web-thorough, active, passive) that let you compose scans ranging from light passive enumeration to aggressive active reconnaissance. Each module emits typed events (DNS_NAME, OPEN_TCP_PORT, URL, TECHNOLOGY, FINDING, VULNERABILITY) that flow through the pipeline and trigger downstream modules.

bbot-scanning.sh
bash
# Basic subdomain enumeration
bbot -t example.com -f subdomain-enum -o /tmp/bbot-results

# Comprehensive scan with passive + active modules
bbot -t example.com \
  -f subdomain-enum \
  -m httpx naabu gowitness \
  -c modules.naabu.top_ports=1000 \
  -o /tmp/bbot-full

# Aggressive web scan (active recon)
bbot -t example.com \
  -f subdomain-enum web-thorough \
  -m httpx nuclei gowitness wappalyzer \
  --allow-deadly \
  -o /tmp/bbot-aggressive

# BBOT with custom config file
bbot -t example.com -c /path/to/bbot.yml

# Scan multiple targets from a file
bbot -t targets.txt -f subdomain-enum -m httpx
# Basic subdomain enumeration
bbot -t example.com -f subdomain-enum -o /tmp/bbot-results

# Comprehensive scan with passive + active modules
bbot -t example.com \
  -f subdomain-enum \
  -m httpx naabu gowitness \
  -c modules.naabu.top_ports=1000 \
  -o /tmp/bbot-full

# Aggressive web scan (active recon)
bbot -t example.com \
  -f subdomain-enum web-thorough \
  -m httpx nuclei gowitness wappalyzer \
  --allow-deadly \
  -o /tmp/bbot-aggressive

# BBOT with custom config file
bbot -t example.com -c /path/to/bbot.yml

# Scan multiple targets from a file
bbot -t targets.txt -f subdomain-enum -m httpx

AI-Assisted Analysis of BBOT Results

BBOT outputs structured NDJSON that maps directly to LLM analysis. The following script loads BBOT scan results and sends them to GPT-4o for automated prioritization, attack vector identification, and next-step recommendations.

bbot_ai_analysis.py
python
#!/usr/bin/env python3
"""Post-process BBOT scan results with LLM analysis."""

import json
import subprocess
from pathlib import Path
from openai import OpenAI

client = OpenAI()

def load_bbot_results(scan_dir: str) -> dict:
    """Load BBOT output events from JSON lines file."""
    events = []
    output_file = Path(scan_dir) / "output.ndjson"
    
    with open(output_file) as f:
        for line in f:
            event = json.loads(line.strip())
            events.append(event)
    
    # Categorize events
    subdomains = [e for e in events if e.get("type") == "DNS_NAME"]
    open_ports = [e for e in events if e.get("type") == "OPEN_TCP_PORT"]
    urls = [e for e in events if e.get("type") == "URL"]
    techs = [e for e in events if e.get("type") == "TECHNOLOGY"]
    findings = [e for e in events if e.get("type") == "FINDING"]
    
    return {
        "subdomains": subdomains,
        "open_ports": open_ports,
        "urls": urls,
        "technologies": techs,
        "findings": findings
    }

def analyze_with_llm(results: dict, target: str) -> str:
    """Send BBOT results to LLM for analysis and prioritization."""
    
    summary = f"""Target: {target}
Subdomains found: {len(results['subdomains'])}
Open ports: {len(results['open_ports'])}
URLs discovered: {len(results['urls'])}
Technologies detected: {len(results['technologies'])}
Findings: {len(results['findings'])}

Key subdomains:
{json.dumps([s['data'] for s in results['subdomains'][:30]], indent=2)}

Open ports:
{json.dumps([p['data'] for p in results['open_ports'][:30]], indent=2)}

Technologies:
{json.dumps([t['data'] for t in results['technologies'][:20]], indent=2)}

Findings:
{json.dumps([f['data'] for f in results['findings'][:15]], indent=2)}"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """You are an expert penetration tester 
analyzing reconnaissance results. Identify:
1. High-value targets (admin panels, APIs, staging envs)
2. Potential attack vectors based on tech stack
3. Misconfigurations or exposed services
4. Prioritized next steps for exploitation
Be specific and actionable. Reference actual findings."""},
            {"role": "user", "content": summary}
        ],
        temperature=0.3
    )
    return response.choices[0].message.content

# Usage
results = load_bbot_results("/tmp/bbot-full/scan_name")
analysis = analyze_with_llm(results, "example.com")
print(analysis)
#!/usr/bin/env python3
"""Post-process BBOT scan results with LLM analysis."""

import json
import subprocess
from pathlib import Path
from openai import OpenAI

client = OpenAI()

def load_bbot_results(scan_dir: str) -> dict:
    """Load BBOT output events from JSON lines file."""
    events = []
    output_file = Path(scan_dir) / "output.ndjson"
    
    with open(output_file) as f:
        for line in f:
            event = json.loads(line.strip())
            events.append(event)
    
    # Categorize events
    subdomains = [e for e in events if e.get("type") == "DNS_NAME"]
    open_ports = [e for e in events if e.get("type") == "OPEN_TCP_PORT"]
    urls = [e for e in events if e.get("type") == "URL"]
    techs = [e for e in events if e.get("type") == "TECHNOLOGY"]
    findings = [e for e in events if e.get("type") == "FINDING"]
    
    return {
        "subdomains": subdomains,
        "open_ports": open_ports,
        "urls": urls,
        "technologies": techs,
        "findings": findings
    }

def analyze_with_llm(results: dict, target: str) -> str:
    """Send BBOT results to LLM for analysis and prioritization."""
    
    summary = f"""Target: {target}
Subdomains found: {len(results['subdomains'])}
Open ports: {len(results['open_ports'])}
URLs discovered: {len(results['urls'])}
Technologies detected: {len(results['technologies'])}
Findings: {len(results['findings'])}

Key subdomains:
{json.dumps([s['data'] for s in results['subdomains'][:30]], indent=2)}

Open ports:
{json.dumps([p['data'] for p in results['open_ports'][:30]], indent=2)}

Technologies:
{json.dumps([t['data'] for t in results['technologies'][:20]], indent=2)}

Findings:
{json.dumps([f['data'] for f in results['findings'][:15]], indent=2)}"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """You are an expert penetration tester 
analyzing reconnaissance results. Identify:
1. High-value targets (admin panels, APIs, staging envs)
2. Potential attack vectors based on tech stack
3. Misconfigurations or exposed services
4. Prioritized next steps for exploitation
Be specific and actionable. Reference actual findings."""},
            {"role": "user", "content": summary}
        ],
        temperature=0.3
    )
    return response.choices[0].message.content

# Usage
results = load_bbot_results("/tmp/bbot-full/scan_name")
analysis = analyze_with_llm(results, "example.com")
print(analysis)

Scope and Authorization

Always ensure your BBOT scans are within authorized scope. Active modules like nuclei and naabu send traffic to target hosts. Use --allow-deadly only with explicit written authorization. BBOT respects scope boundaries — configure them carefully.

3. Subfinder + AI Enrichment

Subfinder is ProjectDiscovery's fast passive subdomain enumeration tool. While powerful on its own, pairing Subfinder's output with an AI enrichment layer transforms raw domain lists into prioritized, actionable intelligence.

Subfinder Setup & Advanced Usage

subfinder-usage.sh
bash
# Install Subfinder
go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest

# Basic subdomain enumeration
subfinder -d example.com -o subs.txt

# With all sources and higher concurrency
subfinder -d example.com -all -t 100 -o subs.txt

# Multiple domains from file
subfinder -dL domains.txt -o all-subs.txt

# JSON output with source attribution
subfinder -d example.com -json -o subs.json

# Silent mode, pipe to other tools
subfinder -d example.com -silent | httpx -silent | nuclei -t cves/
# Install Subfinder
go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest

# Basic subdomain enumeration
subfinder -d example.com -o subs.txt

# With all sources and higher concurrency
subfinder -d example.com -all -t 100 -o subs.txt

# Multiple domains from file
subfinder -dL domains.txt -o all-subs.txt

# JSON output with source attribution
subfinder -d example.com -json -o subs.json

# Silent mode, pipe to other tools
subfinder -d example.com -silent | httpx -silent | nuclei -t cves/

AI Enrichment Pipeline

This pipeline runs Subfinder, probes discovered subdomains with httpx for live host detection and technology fingerprinting, then sends the enriched results to an LLM for risk-based classification and exploitation recommendations.

subfinder_ai_enrich.py
python
#!/usr/bin/env python3
"""Subfinder + AI enrichment pipeline."""

import subprocess
import json
from openai import OpenAI

client = OpenAI()

def run_subfinder(domain: str) -> list[str]:
    """Run Subfinder and return discovered subdomains."""
    result = subprocess.run(
        ["subfinder", "-d", domain, "-silent", "-all"],
        capture_output=True, text=True, timeout=300
    )
    return [s.strip() for s in result.stdout.strip().split("\n") if s.strip()]

def enrich_with_httpx(subdomains: list[str]) -> list[dict]:
    """Probe subdomains with httpx for live hosts and tech."""
    input_data = "\n".join(subdomains)
    result = subprocess.run(
        ["httpx", "-json", "-silent",
         "-status-code", "-title", "-tech-detect",
         "-follow-redirects", "-timeout", "10"],
        input=input_data,
        capture_output=True, text=True, timeout=600
    )
    hosts = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            hosts.append(json.loads(line))
    return hosts

def ai_prioritize(domain: str, hosts: list[dict]) -> str:
    """Use LLM to prioritize discovered hosts."""
    host_summary = json.dumps(hosts[:50], indent=2)
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """Analyze these subdomain results 
for a penetration test. Classify each into:
- CRITICAL: Admin panels, staging, internal tools, APIs
- HIGH: Login pages, file uploads, old software versions
- MEDIUM: Standard web apps with potential attack surface
- LOW: Static content, CDN, marketing pages
Explain your reasoning and suggest exploitation approaches."""},
            {"role": "user", "content": f"Domain: {domain}\n\nHosts:\n{host_summary}"}
        ],
        temperature=0.2
    )
    return response.choices[0].message.content

# Pipeline execution
domain = "example.com"
subs = run_subfinder(domain)
print(f"[*] Found {len(subs)} subdomains")

live_hosts = enrich_with_httpx(subs)
print(f"[*] {len(live_hosts)} live hosts detected")

analysis = ai_prioritize(domain, live_hosts)
print(analysis)
#!/usr/bin/env python3
"""Subfinder + AI enrichment pipeline."""

import subprocess
import json
from openai import OpenAI

client = OpenAI()

def run_subfinder(domain: str) -> list[str]:
    """Run Subfinder and return discovered subdomains."""
    result = subprocess.run(
        ["subfinder", "-d", domain, "-silent", "-all"],
        capture_output=True, text=True, timeout=300
    )
    return [s.strip() for s in result.stdout.strip().split("\n") if s.strip()]

def enrich_with_httpx(subdomains: list[str]) -> list[dict]:
    """Probe subdomains with httpx for live hosts and tech."""
    input_data = "\n".join(subdomains)
    result = subprocess.run(
        ["httpx", "-json", "-silent",
         "-status-code", "-title", "-tech-detect",
         "-follow-redirects", "-timeout", "10"],
        input=input_data,
        capture_output=True, text=True, timeout=600
    )
    hosts = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            hosts.append(json.loads(line))
    return hosts

def ai_prioritize(domain: str, hosts: list[dict]) -> str:
    """Use LLM to prioritize discovered hosts."""
    host_summary = json.dumps(hosts[:50], indent=2)
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """Analyze these subdomain results 
for a penetration test. Classify each into:
- CRITICAL: Admin panels, staging, internal tools, APIs
- HIGH: Login pages, file uploads, old software versions
- MEDIUM: Standard web apps with potential attack surface
- LOW: Static content, CDN, marketing pages
Explain your reasoning and suggest exploitation approaches."""},
            {"role": "user", "content": f"Domain: {domain}\n\nHosts:\n{host_summary}"}
        ],
        temperature=0.2
    )
    return response.choices[0].message.content

# Pipeline execution
domain = "example.com"
subs = run_subfinder(domain)
print(f"[*] Found {len(subs)} subdomains")

live_hosts = enrich_with_httpx(subs)
print(f"[*] {len(live_hosts)} live hosts detected")

analysis = ai_prioritize(domain, live_hosts)
print(analysis)

Pipeline Optimization

For large target scopes, run Subfinder with -t 200 for higher thread count and use httpx with -rl 100 to rate-limit probing. Feed only high-confidence subdomains to the LLM to reduce token usage and improve analysis quality.

4. Katana — Intelligent Web Crawling

Katana is ProjectDiscovery's next-generation web crawler designed for offensive security. It combines standard crawling with JavaScript rendering, making it effective against single-page applications (SPAs) and modern JS-heavy frameworks that hide endpoints from traditional crawlers.

Key Features

  • Headless browser crawling — renders JavaScript to discover dynamically generated endpoints
  • Automatic form detection — identifies input fields, hidden parameters, and form actions
  • Scope-aware crawling — respects domain boundaries and depth limits
  • Field extraction — pulls URLs, paths, FQDNs, parameters, and endpoints
  • Extension filtering — skip static assets, focus on dynamic content
  • Pipeline integration — designed to chain with Subfinder, httpx, Nuclei
katana-usage.sh
bash
# Install Katana
go install github.com/projectdiscovery/katana/cmd/katana@latest

# Basic crawl
katana -u https://example.com -o crawl.txt

# Deep crawl with JavaScript rendering
katana -u https://example.com \
  -js-crawl \
  -headless \
  -depth 5 \
  -js-render-wait 3 \
  -known-files all \
  -o deep-crawl.txt

# Extract endpoints and parameters
katana -u https://example.com \
  -js-crawl \
  -headless \
  -field url,path,fqdn,endpoint \
  -o endpoints.txt

# Crawl multiple targets from Subfinder
subfinder -d example.com -silent | \
  httpx -silent | \
  katana -js-crawl -headless -depth 3 -o full-crawl.txt

# Output as JSON with all fields
katana -u https://example.com \
  -json \
  -js-crawl \
  -headless \
  -field-config /path/to/field-config.yaml \
  -o crawl.json

# Extract specific patterns (API keys, secrets)
katana -u https://example.com \
  -js-crawl \
  -headless \
  -extension-filter png,jpg,gif,svg,css,woff \
  -ef ttf,woff2,eot \
  | grep -iE "(api[_-]?key|secret|token|password|auth)"
# Install Katana
go install github.com/projectdiscovery/katana/cmd/katana@latest

# Basic crawl
katana -u https://example.com -o crawl.txt

# Deep crawl with JavaScript rendering
katana -u https://example.com \
  -js-crawl \
  -headless \
  -depth 5 \
  -js-render-wait 3 \
  -known-files all \
  -o deep-crawl.txt

# Extract endpoints and parameters
katana -u https://example.com \
  -js-crawl \
  -headless \
  -field url,path,fqdn,endpoint \
  -o endpoints.txt

# Crawl multiple targets from Subfinder
subfinder -d example.com -silent | \
  httpx -silent | \
  katana -js-crawl -headless -depth 3 -o full-crawl.txt

# Output as JSON with all fields
katana -u https://example.com \
  -json \
  -js-crawl \
  -headless \
  -field-config /path/to/field-config.yaml \
  -o crawl.json

# Extract specific patterns (API keys, secrets)
katana -u https://example.com \
  -js-crawl \
  -headless \
  -extension-filter png,jpg,gif,svg,css,woff \
  -ef ttf,woff2,eot \
  | grep -iE "(api[_-]?key|secret|token|password|auth)"

AI-Enhanced Crawling

Katana's output is ideal for LLM analysis. Pipe crawl results through the same AI enrichment pattern shown in the Subfinder section — the LLM can identify interesting parameter names, potential injection points, and API patterns that warrant deeper testing.

5. Caido AI for Reconnaissance

Caido is a modern web security testing proxy with built-in AI capabilities. While its full penetration testing features are covered in the PentestGPT & Caido AI chapter, its reconnaissance features deserve attention here.

Recon-Relevant Features

  • Passive spider — captures all traffic flowing through the proxy for automated endpoint cataloging
  • Technology fingerprinting — identifies frameworks, servers, and libraries from response headers and content
  • HTTPQL queries — powerful query language for filtering and analyzing captured traffic
  • Automate workflows — chainable discovery and fuzzing pipelines
  • AI assistant — natural language interface for analyzing captured traffic and suggesting next steps
caido-recon.sh
bash
# Caido AI — Automated Recon Features
# Caido provides an AI-powered web proxy with intelligent recon

# 1. Start Caido and configure target scope
# Navigate to: Settings > Scope > Add target domain

# 2. Use the HTTPQL query language for targeted recon
# Find all API endpoints:
#   req.path.regex:"^/api/" AND resp.code:200

# 3. Technology fingerprinting via response analysis
#   resp.header.regex:"(X-Powered-By|Server):" 

# 4. Automated endpoint discovery with the Automate tab
# Create a workflow:
#   - Passive spider: capture all traffic
#   - Active discovery: fuzz common paths
#   - AI analysis: classify endpoints by risk

# 5. Export results for further AI analysis
# Caido API — export findings programmatically
curl -s http://localhost:8080/api/findings \
  -H "Authorization: Bearer CAIDO_TOKEN" | \
  jq '.findings[] | {url, method, status, risk}'
# Caido AI — Automated Recon Features
# Caido provides an AI-powered web proxy with intelligent recon

# 1. Start Caido and configure target scope
# Navigate to: Settings > Scope > Add target domain

# 2. Use the HTTPQL query language for targeted recon
# Find all API endpoints:
#   req.path.regex:"^/api/" AND resp.code:200

# 3. Technology fingerprinting via response analysis
#   resp.header.regex:"(X-Powered-By|Server):" 

# 4. Automated endpoint discovery with the Automate tab
# Create a workflow:
#   - Passive spider: capture all traffic
#   - Active discovery: fuzz common paths
#   - AI analysis: classify endpoints by risk

# 5. Export results for further AI analysis
# Caido API — export findings programmatically
curl -s http://localhost:8080/api/findings \
  -H "Authorization: Bearer CAIDO_TOKEN" | \
  jq '.findings[] | {url, method, status, risk}'

Complementary Approach

Use Caido alongside command-line tools: run Subfinder and Katana for broad subdomain and crawl coverage, then route specific targets through Caido for interactive analysis with its AI assistant. This combines breadth (CLI tools) with depth (proxy-based inspection).

6. AI-Powered OSINT

Open source intelligence gathering generates vast amounts of unstructured data — DNS records, WHOIS registrations, social media profiles, code repositories, paste sites, and certificate logs. LLMs excel at correlating this data, extracting actionable intelligence, and identifying patterns that would take hours of manual analysis.

AI OSINT Pipeline Architecture

graph TB subgraph Sources["Data Sources"] LI[LinkedIn Profiles] GH[GitHub Repositories] DNS2[DNS Records] PASTE[Paste Sites] SHODAN[Shodan / Censys] end subgraph Collection["Collection Layer"] SCRAPE[Scraper Engine] API[API Integrations] CACHE[Local Cache] end subgraph Analysis["AI Analysis"] NER[Named Entity Recognition] SENT[Sentiment Analysis] GRAPH[Relationship Graphing] VULN[Vulnerability Correlation] end subgraph Output["Output"] JSON[JSON Report] ATTACK[Attack Surface Map] SE[Social Engineering Dossier] end LI --> SCRAPE GH --> API DNS2 --> API PASTE --> SCRAPE SHODAN --> API SCRAPE --> CACHE API --> CACHE CACHE --> NER CACHE --> SENT CACHE --> GRAPH CACHE --> VULN NER --> SE SENT --> SE GRAPH --> ATTACK VULN --> JSON style Sources fill:#1a1a2e,stroke:#00ff41,color:#fff style Collection fill:#0f3460,stroke:#e94560,color:#fff style Analysis fill:#16213e,stroke:#0ff,color:#fff style Output fill:#1a1a2e,stroke:#888,color:#fff

LinkedIn + LLM for Social Engineering Prep

Employee enumeration through LinkedIn provides names, roles, technologies used, and organizational structure. An LLM can synthesize this into social engineering dossiers — identifying likely phishing targets, crafting role-appropriate pretexts, and mapping reporting chains. Always ensure OSINT gathering is within your rules of engagement.

DNS / WHOIS Analysis with AI

Raw DNS and WHOIS data contains implicit information: name server patterns reveal hosting providers, TXT records expose email security posture (SPF, DKIM, DMARC), MX records identify email platforms, and WHOIS registration patterns can link related domains. LLMs can interpret all of this in seconds.

GitHub / GitLab Recon

Code repositories are gold mines for reconnaissance. AI-assisted GitHub recon looks for: exposed API keys and secrets in commit history, infrastructure configuration files (Terraform, Kubernetes manifests, Docker Compose), internal domain names and IP addresses in code comments, and technology stack details from dependency files.

Automated OSINT Pipeline

The following script chains DNS enumeration, WHOIS analysis, GitHub code search, and certificate transparency log queries into a single pipeline, then sends all collected data to an LLM for comprehensive analysis.

ai_osint_pipeline.py
python
#!/usr/bin/env python3
"""AI-Powered OSINT Pipeline for Reconnaissance."""

import json
import subprocess
import re
from pathlib import Path
from dataclasses import dataclass, asdict
from openai import OpenAI

client = OpenAI()

@dataclass
class OSINTResult:
    source: str
    data_type: str
    content: str
    confidence: float = 0.0
    risk_score: float = 0.0

class AIOSINTPipeline:
    def __init__(self, target_domain: str):
        self.target = target_domain
        self.results: list[OSINTResult] = []
    
    def dns_recon(self) -> list[OSINTResult]:
        """Enumerate DNS records and analyze with AI."""
        records = []
        for rtype in ["A", "AAAA", "MX", "NS", "TXT", "CNAME", "SOA"]:
            result = subprocess.run(
                ["dig", "+short", self.target, rtype],
                capture_output=True, text=True, timeout=30
            )
            if result.stdout.strip():
                records.append(OSINTResult(
                    source="DNS",
                    data_type=rtype,
                    content=result.stdout.strip()
                ))
        self.results.extend(records)
        return records
    
    def whois_analysis(self) -> OSINTResult:
        """WHOIS lookup with AI interpretation."""
        result = subprocess.run(
            ["whois", self.target],
            capture_output=True, text=True, timeout=30
        )
        whois_data = OSINTResult(
            source="WHOIS",
            data_type="registration",
            content=result.stdout[:3000]  # Truncate for LLM context
        )
        self.results.append(whois_data)
        return whois_data
    
    def github_recon(self) -> list[OSINTResult]:
        """Search GitHub for exposed secrets and architecture clues."""
        search_terms = [
            f'"{self.target}" password',
            f'"{self.target}" api_key OR secret_key',
            f'"{self.target}" internal OR staging',
            f'org:{self.target.split(".")[0]} filename:.env',
        ]
        findings = []
        for term in search_terms:
            result = subprocess.run(
                ["gh", "search", "code", term, "--json",
                 "repository,path,textMatches", "-L", "10"],
                capture_output=True, text=True, timeout=30
            )
            if result.stdout.strip():
                findings.append(OSINTResult(
                    source="GitHub",
                    data_type="code_search",
                    content=result.stdout[:2000]
                ))
        self.results.extend(findings)
        return findings
    
    def certificate_transparency(self) -> list[OSINTResult]:
        """Query Certificate Transparency logs."""
        result = subprocess.run(
            ["curl", "-s",
             f"https://crt.sh/?q=%.{self.target}&output=json"],
            capture_output=True, text=True, timeout=30
        )
        if result.stdout.strip():
            try:
                certs = json.loads(result.stdout)
                unique_names = list(set(
                    c.get("name_value", "") for c in certs
                ))
                ct_result = OSINTResult(
                    source="CertTransparency",
                    data_type="certificates",
                    content=json.dumps(unique_names[:100])
                )
                self.results.append(ct_result)
                return [ct_result]
            except json.JSONDecodeError:
                pass
        return []
    
    def ai_analyze_all(self) -> str:
        """Send all collected OSINT to LLM for analysis."""
        results_summary = json.dumps(
            [asdict(r) for r in self.results], indent=2
        )
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """You are an expert OSINT 
analyst performing reconnaissance for an authorized penetration test. 
Analyze all collected data and produce:

1. ORGANIZATION PROFILE: Key facts about the target
2. ATTACK SURFACE: Identified entry points and services
3. EXPOSED SECRETS: Any credentials, keys, or sensitive data found
4. SOCIAL ENGINEERING VECTORS: Information useful for phishing/SE
5. RISK ASSESSMENT: Overall exposure rating with justification
6. RECOMMENDED NEXT STEPS: Prioritized actions for the engagement

Be thorough and reference specific findings."""},
                {"role": "user", "content": f"""Target: {self.target}
                
OSINT Collection Results:
{results_summary}"""}
            ],
            temperature=0.2
        )
        return response.choices[0].message.content

# Execute pipeline
pipeline = AIOSINTPipeline("example.com")
pipeline.dns_recon()
pipeline.whois_analysis()
pipeline.github_recon()
pipeline.certificate_transparency()
report = pipeline.ai_analyze_all()
print(report)
#!/usr/bin/env python3
"""AI-Powered OSINT Pipeline for Reconnaissance."""

import json
import subprocess
import re
from pathlib import Path
from dataclasses import dataclass, asdict
from openai import OpenAI

client = OpenAI()

@dataclass
class OSINTResult:
    source: str
    data_type: str
    content: str
    confidence: float = 0.0
    risk_score: float = 0.0

class AIOSINTPipeline:
    def __init__(self, target_domain: str):
        self.target = target_domain
        self.results: list[OSINTResult] = []
    
    def dns_recon(self) -> list[OSINTResult]:
        """Enumerate DNS records and analyze with AI."""
        records = []
        for rtype in ["A", "AAAA", "MX", "NS", "TXT", "CNAME", "SOA"]:
            result = subprocess.run(
                ["dig", "+short", self.target, rtype],
                capture_output=True, text=True, timeout=30
            )
            if result.stdout.strip():
                records.append(OSINTResult(
                    source="DNS",
                    data_type=rtype,
                    content=result.stdout.strip()
                ))
        self.results.extend(records)
        return records
    
    def whois_analysis(self) -> OSINTResult:
        """WHOIS lookup with AI interpretation."""
        result = subprocess.run(
            ["whois", self.target],
            capture_output=True, text=True, timeout=30
        )
        whois_data = OSINTResult(
            source="WHOIS",
            data_type="registration",
            content=result.stdout[:3000]  # Truncate for LLM context
        )
        self.results.append(whois_data)
        return whois_data
    
    def github_recon(self) -> list[OSINTResult]:
        """Search GitHub for exposed secrets and architecture clues."""
        search_terms = [
            f'"{self.target}" password',
            f'"{self.target}" api_key OR secret_key',
            f'"{self.target}" internal OR staging',
            f'org:{self.target.split(".")[0]} filename:.env',
        ]
        findings = []
        for term in search_terms:
            result = subprocess.run(
                ["gh", "search", "code", term, "--json",
                 "repository,path,textMatches", "-L", "10"],
                capture_output=True, text=True, timeout=30
            )
            if result.stdout.strip():
                findings.append(OSINTResult(
                    source="GitHub",
                    data_type="code_search",
                    content=result.stdout[:2000]
                ))
        self.results.extend(findings)
        return findings
    
    def certificate_transparency(self) -> list[OSINTResult]:
        """Query Certificate Transparency logs."""
        result = subprocess.run(
            ["curl", "-s",
             f"https://crt.sh/?q=%.{self.target}&output=json"],
            capture_output=True, text=True, timeout=30
        )
        if result.stdout.strip():
            try:
                certs = json.loads(result.stdout)
                unique_names = list(set(
                    c.get("name_value", "") for c in certs
                ))
                ct_result = OSINTResult(
                    source="CertTransparency",
                    data_type="certificates",
                    content=json.dumps(unique_names[:100])
                )
                self.results.append(ct_result)
                return [ct_result]
            except json.JSONDecodeError:
                pass
        return []
    
    def ai_analyze_all(self) -> str:
        """Send all collected OSINT to LLM for analysis."""
        results_summary = json.dumps(
            [asdict(r) for r in self.results], indent=2
        )
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """You are an expert OSINT 
analyst performing reconnaissance for an authorized penetration test. 
Analyze all collected data and produce:

1. ORGANIZATION PROFILE: Key facts about the target
2. ATTACK SURFACE: Identified entry points and services
3. EXPOSED SECRETS: Any credentials, keys, or sensitive data found
4. SOCIAL ENGINEERING VECTORS: Information useful for phishing/SE
5. RISK ASSESSMENT: Overall exposure rating with justification
6. RECOMMENDED NEXT STEPS: Prioritized actions for the engagement

Be thorough and reference specific findings."""},
                {"role": "user", "content": f"""Target: {self.target}
                
OSINT Collection Results:
{results_summary}"""}
            ],
            temperature=0.2
        )
        return response.choices[0].message.content

# Execute pipeline
pipeline = AIOSINTPipeline("example.com")
pipeline.dns_recon()
pipeline.whois_analysis()
pipeline.github_recon()
pipeline.certificate_transparency()
report = pipeline.ai_analyze_all()
print(report)

7. Building an AI Recon Pipeline

The complete automated reconnaissance pipeline chains five stages: subdomain enumeration (Subfinder + Amass + CT logs), live host probing (httpx), port scanning (Naabu), vulnerability scanning (Nuclei), and LLM-powered analysis (GPT-4o). Each stage feeds structured data to the next, and the final output is both a machine-readable JSON report and a human-readable AI analysis.

Operational Security

Running this full pipeline generates significant network traffic. In authorized engagements, consider: rate-limiting active scans, using distributed scanning infrastructure, running passive stages first to narrow scope, and scheduling active scans during business hours to blend with normal traffic.
ai_recon_pipeline.py
python
#!/usr/bin/env python3
"""Complete AI Recon Pipeline — chains multiple tools with LLM analysis."""

import json
import subprocess
import asyncio
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field, asdict
from openai import OpenAI

client = OpenAI()

@dataclass
class ReconReport:
    target: str
    timestamp: str = field(
        default_factory=lambda: datetime.now().isoformat()
    )
    subdomains: list[str] = field(default_factory=list)
    live_hosts: list[dict] = field(default_factory=list)
    open_ports: list[dict] = field(default_factory=list)
    technologies: list[dict] = field(default_factory=list)
    vulnerabilities: list[dict] = field(default_factory=list)
    ai_analysis: str = ""

def run_cmd(cmd: list[str], timeout: int = 300) -> str:
    """Run a command and return stdout."""
    result = subprocess.run(
        cmd, capture_output=True, text=True, timeout=timeout
    )
    return result.stdout.strip()

def stage_1_subdomain_enum(target: str) -> list[str]:
    """Stage 1: Subdomain enumeration with multiple tools."""
    print(f"[1/5] Subdomain enumeration for {target}")
    
    # Subfinder
    sf_output = run_cmd(["subfinder", "-d", target, "-silent", "-all"])
    subs = set(sf_output.split("\n")) if sf_output else set()
    
    # Amass (passive)
    amass_output = run_cmd(
        ["amass", "enum", "-passive", "-d", target],
        timeout=600
    )
    if amass_output:
        subs.update(amass_output.split("\n"))
    
    # Certificate Transparency
    ct_output = run_cmd([
        "curl", "-s",
        f"https://crt.sh/?q=%.{target}&output=json"
    ])
    if ct_output:
        try:
            certs = json.loads(ct_output)
            for cert in certs:
                name = cert.get("name_value", "")
                for sub in name.split("\n"):
                    if sub.strip().endswith(target):
                        subs.add(sub.strip())
        except json.JSONDecodeError:
            pass
    
    return sorted(list(subs))

def stage_2_probe_hosts(subdomains: list[str]) -> list[dict]:
    """Stage 2: Probe for live hosts with httpx."""
    print(f"[2/5] Probing {len(subdomains)} subdomains")
    
    input_data = "\n".join(subdomains)
    result = subprocess.run(
        ["httpx", "-json", "-silent",
         "-status-code", "-title", "-tech-detect",
         "-content-length", "-follow-redirects"],
        input=input_data,
        capture_output=True, text=True, timeout=600
    )
    
    hosts = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            try:
                hosts.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return hosts

def stage_3_port_scan(subdomains: list[str]) -> list[dict]:
    """Stage 3: Port scanning with Naabu."""
    print(f"[3/5] Port scanning top targets")
    
    # Scan top 1000 ports on live hosts
    input_data = "\n".join(subdomains[:50])  # Limit scope
    result = subprocess.run(
        ["naabu", "-json", "-silent", "-top-ports", "1000"],
        input=input_data,
        capture_output=True, text=True, timeout=600
    )
    
    ports = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            try:
                ports.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return ports

def stage_4_vuln_scan(live_hosts: list[dict]) -> list[dict]:
    """Stage 4: Vulnerability scanning with Nuclei."""
    print(f"[4/5] Vulnerability scanning {len(live_hosts)} hosts")
    
    urls = [h.get("url", "") for h in live_hosts if h.get("url")]
    input_data = "\n".join(urls)
    
    result = subprocess.run(
        ["nuclei", "-json", "-silent",
         "-severity", "medium,high,critical",
         "-rate-limit", "50"],
        input=input_data,
        capture_output=True, text=True, timeout=900
    )
    
    vulns = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            try:
                vulns.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return vulns

def stage_5_ai_analysis(report: ReconReport) -> str:
    """Stage 5: Comprehensive AI analysis of all findings."""
    print("[5/5] AI analysis of results")
    
    context = f"""Target: {report.target}
Scan timestamp: {report.timestamp}

SUBDOMAINS ({len(report.subdomains)} total):
{json.dumps(report.subdomains[:40], indent=2)}

LIVE HOSTS ({len(report.live_hosts)} total):
{json.dumps(report.live_hosts[:30], indent=2)}

OPEN PORTS ({len(report.open_ports)} total):
{json.dumps(report.open_ports[:30], indent=2)}

TECHNOLOGIES:
{json.dumps(report.technologies[:20], indent=2)}

VULNERABILITIES ({len(report.vulnerabilities)} total):
{json.dumps(report.vulnerabilities[:20], indent=2)}"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """You are a senior penetration 
tester analyzing automated reconnaissance results. Produce a structured 
report with:

## Executive Summary
Brief overview of the target attack surface.

## Critical Findings
High-risk items requiring immediate attention.

## Attack Vectors
Specific, actionable exploitation paths based on findings.

## Technology Stack Analysis
Identified technologies and their known vulnerabilities.

## Recommended Exploitation Order
Prioritized list of targets to exploit, with reasoning.

## OPSEC Considerations
Detection risks and how to avoid them.

Be precise. Reference specific subdomains, ports, and vulnerabilities."""},
            {"role": "user", "content": context}
        ],
        temperature=0.2,
        max_tokens=4000
    )
    return response.choices[0].message.content

def run_pipeline(target: str) -> ReconReport:
    """Execute the full recon pipeline."""
    report = ReconReport(target=target)
    
    # Stage 1: Subdomain enumeration
    report.subdomains = stage_1_subdomain_enum(target)
    print(f"    Found {len(report.subdomains)} subdomains")
    
    # Stage 2: Live host detection
    report.live_hosts = stage_2_probe_hosts(report.subdomains)
    print(f"    {len(report.live_hosts)} live hosts")
    
    # Stage 3: Port scanning
    report.open_ports = stage_3_port_scan(report.subdomains)
    print(f"    {len(report.open_ports)} open ports")
    
    # Stage 4: Vulnerability scanning
    report.vulnerabilities = stage_4_vuln_scan(report.live_hosts)
    print(f"    {len(report.vulnerabilities)} vulnerabilities")
    
    # Stage 5: AI analysis
    report.ai_analysis = stage_5_ai_analysis(report)
    
    # Save structured report
    output_path = Path(f"recon_{target}_{report.timestamp[:10]}.json")
    with open(output_path, "w") as f:
        json.dump(asdict(report), f, indent=2)
    print(f"\n[*] Report saved to {output_path}")
    
    return report

# Execute
report = run_pipeline("example.com")
print("\n" + report.ai_analysis)
#!/usr/bin/env python3
"""Complete AI Recon Pipeline — chains multiple tools with LLM analysis."""

import json
import subprocess
import asyncio
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field, asdict
from openai import OpenAI

client = OpenAI()

@dataclass
class ReconReport:
    target: str
    timestamp: str = field(
        default_factory=lambda: datetime.now().isoformat()
    )
    subdomains: list[str] = field(default_factory=list)
    live_hosts: list[dict] = field(default_factory=list)
    open_ports: list[dict] = field(default_factory=list)
    technologies: list[dict] = field(default_factory=list)
    vulnerabilities: list[dict] = field(default_factory=list)
    ai_analysis: str = ""

def run_cmd(cmd: list[str], timeout: int = 300) -> str:
    """Run a command and return stdout."""
    result = subprocess.run(
        cmd, capture_output=True, text=True, timeout=timeout
    )
    return result.stdout.strip()

def stage_1_subdomain_enum(target: str) -> list[str]:
    """Stage 1: Subdomain enumeration with multiple tools."""
    print(f"[1/5] Subdomain enumeration for {target}")
    
    # Subfinder
    sf_output = run_cmd(["subfinder", "-d", target, "-silent", "-all"])
    subs = set(sf_output.split("\n")) if sf_output else set()
    
    # Amass (passive)
    amass_output = run_cmd(
        ["amass", "enum", "-passive", "-d", target],
        timeout=600
    )
    if amass_output:
        subs.update(amass_output.split("\n"))
    
    # Certificate Transparency
    ct_output = run_cmd([
        "curl", "-s",
        f"https://crt.sh/?q=%.{target}&output=json"
    ])
    if ct_output:
        try:
            certs = json.loads(ct_output)
            for cert in certs:
                name = cert.get("name_value", "")
                for sub in name.split("\n"):
                    if sub.strip().endswith(target):
                        subs.add(sub.strip())
        except json.JSONDecodeError:
            pass
    
    return sorted(list(subs))

def stage_2_probe_hosts(subdomains: list[str]) -> list[dict]:
    """Stage 2: Probe for live hosts with httpx."""
    print(f"[2/5] Probing {len(subdomains)} subdomains")
    
    input_data = "\n".join(subdomains)
    result = subprocess.run(
        ["httpx", "-json", "-silent",
         "-status-code", "-title", "-tech-detect",
         "-content-length", "-follow-redirects"],
        input=input_data,
        capture_output=True, text=True, timeout=600
    )
    
    hosts = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            try:
                hosts.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return hosts

def stage_3_port_scan(subdomains: list[str]) -> list[dict]:
    """Stage 3: Port scanning with Naabu."""
    print(f"[3/5] Port scanning top targets")
    
    # Scan top 1000 ports on live hosts
    input_data = "\n".join(subdomains[:50])  # Limit scope
    result = subprocess.run(
        ["naabu", "-json", "-silent", "-top-ports", "1000"],
        input=input_data,
        capture_output=True, text=True, timeout=600
    )
    
    ports = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            try:
                ports.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return ports

def stage_4_vuln_scan(live_hosts: list[dict]) -> list[dict]:
    """Stage 4: Vulnerability scanning with Nuclei."""
    print(f"[4/5] Vulnerability scanning {len(live_hosts)} hosts")
    
    urls = [h.get("url", "") for h in live_hosts if h.get("url")]
    input_data = "\n".join(urls)
    
    result = subprocess.run(
        ["nuclei", "-json", "-silent",
         "-severity", "medium,high,critical",
         "-rate-limit", "50"],
        input=input_data,
        capture_output=True, text=True, timeout=900
    )
    
    vulns = []
    for line in result.stdout.strip().split("\n"):
        if line.strip():
            try:
                vulns.append(json.loads(line))
            except json.JSONDecodeError:
                continue
    return vulns

def stage_5_ai_analysis(report: ReconReport) -> str:
    """Stage 5: Comprehensive AI analysis of all findings."""
    print("[5/5] AI analysis of results")
    
    context = f"""Target: {report.target}
Scan timestamp: {report.timestamp}

SUBDOMAINS ({len(report.subdomains)} total):
{json.dumps(report.subdomains[:40], indent=2)}

LIVE HOSTS ({len(report.live_hosts)} total):
{json.dumps(report.live_hosts[:30], indent=2)}

OPEN PORTS ({len(report.open_ports)} total):
{json.dumps(report.open_ports[:30], indent=2)}

TECHNOLOGIES:
{json.dumps(report.technologies[:20], indent=2)}

VULNERABILITIES ({len(report.vulnerabilities)} total):
{json.dumps(report.vulnerabilities[:20], indent=2)}"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """You are a senior penetration 
tester analyzing automated reconnaissance results. Produce a structured 
report with:

## Executive Summary
Brief overview of the target attack surface.

## Critical Findings
High-risk items requiring immediate attention.

## Attack Vectors
Specific, actionable exploitation paths based on findings.

## Technology Stack Analysis
Identified technologies and their known vulnerabilities.

## Recommended Exploitation Order
Prioritized list of targets to exploit, with reasoning.

## OPSEC Considerations
Detection risks and how to avoid them.

Be precise. Reference specific subdomains, ports, and vulnerabilities."""},
            {"role": "user", "content": context}
        ],
        temperature=0.2,
        max_tokens=4000
    )
    return response.choices[0].message.content

def run_pipeline(target: str) -> ReconReport:
    """Execute the full recon pipeline."""
    report = ReconReport(target=target)
    
    # Stage 1: Subdomain enumeration
    report.subdomains = stage_1_subdomain_enum(target)
    print(f"    Found {len(report.subdomains)} subdomains")
    
    # Stage 2: Live host detection
    report.live_hosts = stage_2_probe_hosts(report.subdomains)
    print(f"    {len(report.live_hosts)} live hosts")
    
    # Stage 3: Port scanning
    report.open_ports = stage_3_port_scan(report.subdomains)
    print(f"    {len(report.open_ports)} open ports")
    
    # Stage 4: Vulnerability scanning
    report.vulnerabilities = stage_4_vuln_scan(report.live_hosts)
    print(f"    {len(report.vulnerabilities)} vulnerabilities")
    
    # Stage 5: AI analysis
    report.ai_analysis = stage_5_ai_analysis(report)
    
    # Save structured report
    output_path = Path(f"recon_{target}_{report.timestamp[:10]}.json")
    with open(output_path, "w") as f:
        json.dump(asdict(report), f, indent=2)
    print(f"\n[*] Report saved to {output_path}")
    
    return report

# Execute
report = run_pipeline("example.com")
print("\n" + report.ai_analysis)

MCP Integration

Each stage of this pipeline can be wrapped as an MCP tool, allowing AI agents like HexStrike or custom LangChain agents to call them autonomously. See the MCP Security chapter for implementation patterns and security considerations when exposing recon tools via MCP.

8. Attack Surface Management with AI

Point-in-time reconnaissance gives a snapshot; attack surface management (ASM) provides continuous monitoring. AI enhances ASM by automatically detecting changes, classifying their risk level, and alerting operators to newly exposed assets that expand the attack surface.

Continuous Monitoring Pipeline

The following script implements a basic ASM monitor that compares current state against previous scans, detects changes (new subdomains, removed hosts, changed technologies), and uses an LLM to assess the security impact of each change.

asm_monitor.py
python
#!/usr/bin/env python3
"""AI-Powered Attack Surface Management — Continuous Monitoring."""

import json
import hashlib
import subprocess
from datetime import datetime
from pathlib import Path
from openai import OpenAI

client = OpenAI()

class AttackSurfaceMonitor:
    def __init__(self, target: str, state_dir: str = "./asm_state"):
        self.target = target
        self.state_dir = Path(state_dir)
        self.state_dir.mkdir(exist_ok=True)
        self.state_file = self.state_dir / f"{target}.json"
    
    def load_previous_state(self) -> dict:
        """Load the previous scan state for comparison."""
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"subdomains": [], "hosts": [], "ports": [], "timestamp": ""}
    
    def save_state(self, state: dict):
        """Save current scan state."""
        state["timestamp"] = datetime.now().isoformat()
        self.state_file.write_text(json.dumps(state, indent=2))
    
    def discover_assets(self) -> dict:
        """Run discovery tools and return current state."""
        # Subdomain enumeration
        sf = subprocess.run(
            ["subfinder", "-d", self.target, "-silent", "-all"],
            capture_output=True, text=True, timeout=300
        )
        subdomains = sorted(set(
            s.strip() for s in sf.stdout.split("\n") if s.strip()
        ))
        
        # Live host probing
        input_data = "\n".join(subdomains)
        httpx_out = subprocess.run(
            ["httpx", "-json", "-silent", "-status-code", "-title",
             "-tech-detect", "-content-length"],
            input=input_data,
            capture_output=True, text=True, timeout=600
        )
        hosts = []
        for line in httpx_out.stdout.strip().split("\n"):
            if line.strip():
                try:
                    hosts.append(json.loads(line))
                except json.JSONDecodeError:
                    continue
        
        return {
            "subdomains": subdomains,
            "hosts": hosts,
            "ports": [],  # Add naabu scan if needed
        }
    
    def detect_changes(self, previous: dict, current: dict) -> dict:
        """Compare states and identify changes."""
        prev_subs = set(previous.get("subdomains", []))
        curr_subs = set(current.get("subdomains", []))
        
        prev_urls = set(
            h.get("url", "") for h in previous.get("hosts", [])
        )
        curr_urls = set(
            h.get("url", "") for h in current.get("hosts", [])
        )
        
        return {
            "new_subdomains": sorted(curr_subs - prev_subs),
            "removed_subdomains": sorted(prev_subs - curr_subs),
            "new_hosts": sorted(curr_urls - prev_urls),
            "removed_hosts": sorted(prev_urls - curr_urls),
        }
    
    def ai_risk_assessment(self, changes: dict, current: dict) -> str:
        """Use AI to assess risk of detected changes."""
        if not any(changes.values()):
            return "No changes detected since last scan."
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """You are an attack surface 
management analyst. Assess the security impact of infrastructure changes.
Rate each change: CRITICAL / HIGH / MEDIUM / LOW / INFO.
Recommend immediate actions for high-risk changes."""},
                {"role": "user", "content": f"""Target: {self.target}

Changes detected:
{json.dumps(changes, indent=2)}

Current state summary:
- Total subdomains: {len(current['subdomains'])}
- Live hosts: {len(current['hosts'])}"""}
            ],
            temperature=0.2
        )
        return response.choices[0].message.content
    
    def run(self):
        """Execute monitoring cycle."""
        print(f"[*] ASM scan for {self.target}")
        previous = self.load_previous_state()
        current = self.discover_assets()
        changes = self.detect_changes(previous, current)
        
        # AI risk assessment
        risk_report = self.ai_risk_assessment(changes, current)
        
        # Save new state
        self.save_state(current)
        
        print(f"[+] Subdomains: {len(current['subdomains'])}")
        print(f"[+] Live hosts: {len(current['hosts'])}")
        print(f"[+] New subdomains: {len(changes['new_subdomains'])}")
        print(f"[+] Removed subdomains: {len(changes['removed_subdomains'])}")
        print(f"\n{risk_report}")

# Usage — run on a cron schedule
monitor = AttackSurfaceMonitor("example.com")
monitor.run()
#!/usr/bin/env python3
"""AI-Powered Attack Surface Management — Continuous Monitoring."""

import json
import hashlib
import subprocess
from datetime import datetime
from pathlib import Path
from openai import OpenAI

client = OpenAI()

class AttackSurfaceMonitor:
    def __init__(self, target: str, state_dir: str = "./asm_state"):
        self.target = target
        self.state_dir = Path(state_dir)
        self.state_dir.mkdir(exist_ok=True)
        self.state_file = self.state_dir / f"{target}.json"
    
    def load_previous_state(self) -> dict:
        """Load the previous scan state for comparison."""
        if self.state_file.exists():
            return json.loads(self.state_file.read_text())
        return {"subdomains": [], "hosts": [], "ports": [], "timestamp": ""}
    
    def save_state(self, state: dict):
        """Save current scan state."""
        state["timestamp"] = datetime.now().isoformat()
        self.state_file.write_text(json.dumps(state, indent=2))
    
    def discover_assets(self) -> dict:
        """Run discovery tools and return current state."""
        # Subdomain enumeration
        sf = subprocess.run(
            ["subfinder", "-d", self.target, "-silent", "-all"],
            capture_output=True, text=True, timeout=300
        )
        subdomains = sorted(set(
            s.strip() for s in sf.stdout.split("\n") if s.strip()
        ))
        
        # Live host probing
        input_data = "\n".join(subdomains)
        httpx_out = subprocess.run(
            ["httpx", "-json", "-silent", "-status-code", "-title",
             "-tech-detect", "-content-length"],
            input=input_data,
            capture_output=True, text=True, timeout=600
        )
        hosts = []
        for line in httpx_out.stdout.strip().split("\n"):
            if line.strip():
                try:
                    hosts.append(json.loads(line))
                except json.JSONDecodeError:
                    continue
        
        return {
            "subdomains": subdomains,
            "hosts": hosts,
            "ports": [],  # Add naabu scan if needed
        }
    
    def detect_changes(self, previous: dict, current: dict) -> dict:
        """Compare states and identify changes."""
        prev_subs = set(previous.get("subdomains", []))
        curr_subs = set(current.get("subdomains", []))
        
        prev_urls = set(
            h.get("url", "") for h in previous.get("hosts", [])
        )
        curr_urls = set(
            h.get("url", "") for h in current.get("hosts", [])
        )
        
        return {
            "new_subdomains": sorted(curr_subs - prev_subs),
            "removed_subdomains": sorted(prev_subs - curr_subs),
            "new_hosts": sorted(curr_urls - prev_urls),
            "removed_hosts": sorted(prev_urls - curr_urls),
        }
    
    def ai_risk_assessment(self, changes: dict, current: dict) -> str:
        """Use AI to assess risk of detected changes."""
        if not any(changes.values()):
            return "No changes detected since last scan."
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """You are an attack surface 
management analyst. Assess the security impact of infrastructure changes.
Rate each change: CRITICAL / HIGH / MEDIUM / LOW / INFO.
Recommend immediate actions for high-risk changes."""},
                {"role": "user", "content": f"""Target: {self.target}

Changes detected:
{json.dumps(changes, indent=2)}

Current state summary:
- Total subdomains: {len(current['subdomains'])}
- Live hosts: {len(current['hosts'])}"""}
            ],
            temperature=0.2
        )
        return response.choices[0].message.content
    
    def run(self):
        """Execute monitoring cycle."""
        print(f"[*] ASM scan for {self.target}")
        previous = self.load_previous_state()
        current = self.discover_assets()
        changes = self.detect_changes(previous, current)
        
        # AI risk assessment
        risk_report = self.ai_risk_assessment(changes, current)
        
        # Save new state
        self.save_state(current)
        
        print(f"[+] Subdomains: {len(current['subdomains'])}")
        print(f"[+] Live hosts: {len(current['hosts'])}")
        print(f"[+] New subdomains: {len(changes['new_subdomains'])}")
        print(f"[+] Removed subdomains: {len(changes['removed_subdomains'])}")
        print(f"\n{risk_report}")

# Usage — run on a cron schedule
monitor = AttackSurfaceMonitor("example.com")
monitor.run()

Commercial ASM Platforms

Several commercial platforms provide enterprise-grade ASM with AI capabilities:

  • Sniper (formerly XM Cyber) — attack path management with AI-driven risk scoring and continuous reconnaissance
  • Pentera — automated security validation that continuously discovers and tests the attack surface
  • CrowdStrike Falcon Surface — external attack surface management with adversary intelligence
  • Palo Alto Cortex Xpanse — internet-scale asset discovery and risk assessment

DIY vs. Commercial

The open-source pipeline shown above handles most recon automation needs for individual engagements. Commercial ASM tools add value for organizations needing continuous monitoring across large, dynamic asset inventories — particularly when combined with internal threat intelligence feeds.
🎯

AI Reconnaissance Labs

Hands-on exercises for building and using AI-enhanced reconnaissance pipelines.

🔧
BBOT Scan with AI Analysis Custom Lab medium
BBOT module configurationNDJSON parsingLLM result analysisTarget prioritization
🔧
Build a Subfinder-to-LLM Pipeline Custom Lab easy
Subfinder enumerationhttpx probingAI enrichmentRisk classification
🔧
Full Automated Recon Pipeline Custom Lab hard
Multi-tool chainingStructured JSON reportingLLM-driven analysisPipeline orchestration
🔧
AI-Powered Attack Surface Monitoring Custom Lab hard
Change detectionState comparisonContinuous monitoringAI risk scoring