Agent Frameworks
🔥 Advanced
T1059 T1203

AI Agent Frameworks

The landscape of autonomous AI agents has matured dramatically. Early experiments like AutoGPT and BabyAGI (now archived) have been superseded by production-grade frameworks — OpenAI Agents SDK, LangGraph, AutoGen, CrewAI, and Claude MCP — that bring structured orchestration, tool use, guardrails, and multi-agent collaboration to security research workflows.

Authorized Targets Only

Autonomous agents execute real commands. Always run in isolated environments with kill switches, human approval gates, and strict scope controls. Never deploy against systems without explicit written authorization.

The Agent Loop

Every modern agent framework implements a variation of the same core loop: Goal → Plan → Execute → Observe → Reflect. The agent continuously refines its approach based on tool outputs until the objective is met or a termination condition is reached.

Agent Execution Loop

graph TD A[Goal / Objective] --> B[Plan Steps] B --> C[Execute Tool] C --> D[Observe Result] D --> E{Reflect} E -->|Need more info| B E -->|Adjust approach| C E -->|Complete| F[Final Report] style A fill:#0d1117,stroke:#39d353,color:#39d353 style B fill:#0d1117,stroke:#00d4ff,color:#00d4ff style C fill:#0d1117,stroke:#39d353,color:#39d353 style D fill:#0d1117,stroke:#00d4ff,color:#00d4ff style E fill:#0d1117,stroke:#b388ff,color:#b388ff style F fill:#0d1117,stroke:#39d353,color:#39d353

OpenAI Agents SDK (formerly Swarm)

🤖

OpenAI Agents SDK

github.com/openai/openai-agents-python

OpenAI's production agent framework with multi-agent orchestration, typed tool definitions, agent handoffs, and built-in guardrails. The successor to the experimental Swarm project, now a first-party SDK.

Python Multi-Agent Handoffs Requires API Key

Key capabilities: agent handoffs for multi-agent delegation, @function_tool decorator for typed tool definitions, guardrails for input/output validation, and tracing for observability. Agents can delegate sub-tasks to specialist agents and receive structured results.

openai_agents_security.py
python
#!/usr/bin/env python3
"""OpenAI Agents SDK — Multi-agent security orchestration."""
# pip install openai-agents

from agents import Agent, Runner, function_tool
import subprocess, json

@function_tool
def nmap_scan(target: str, ports: str = "1-1000") -> str:
    """Run an Nmap service scan against a target."""
    result = subprocess.run(
        ["nmap", "-sV", "-p", ports, "--open", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

@function_tool
def nuclei_scan(target: str, severity: str = "medium,high,critical") -> str:
    """Run Nuclei vulnerability scanner against a target URL."""
    result = subprocess.run(
        ["nuclei", "-u", target, "-severity", severity, "-silent", "-jsonl"],
        capture_output=True, text=True, timeout=600
    )
    return result.stdout

@function_tool
def subfinder_enum(domain: str) -> str:
    """Enumerate subdomains for a given domain."""
    result = subprocess.run(
        ["subfinder", "-d", domain, "-silent"],
        capture_output=True, text=True, timeout=120
    )
    return result.stdout

# Define specialist agents
recon_agent = Agent(
    name="ReconAgent",
    instructions="""You are a reconnaissance specialist. Enumerate subdomains
    and identify open ports/services. Be thorough but stay in scope.""",
    tools=[subfinder_enum, nmap_scan],
)

vuln_agent = Agent(
    name="VulnAgent",
    instructions="""You are a vulnerability analyst. Given recon data, run
    targeted vulnerability scans and analyze findings. Prioritize by severity.""",
    tools=[nuclei_scan],
)

# Orchestrator delegates to specialists
orchestrator = Agent(
    name="SecurityOrchestrator",
    instructions="""You coordinate a security assessment. First delegate recon
    to ReconAgent, then pass findings to VulnAgent for scanning.
    Compile a final summary with severity ratings.""",
    handoffs=[recon_agent, vuln_agent],
)

# Run the assessment
result = Runner.run_sync(
    orchestrator,
    "Assess authorized-target.com: enumerate subdomains, scan services, "
    "check for vulnerabilities, and produce a findings summary."
)
print(result.final_output)
#!/usr/bin/env python3
"""OpenAI Agents SDK — Multi-agent security orchestration."""
# pip install openai-agents

from agents import Agent, Runner, function_tool
import subprocess, json

@function_tool
def nmap_scan(target: str, ports: str = "1-1000") -> str:
    """Run an Nmap service scan against a target."""
    result = subprocess.run(
        ["nmap", "-sV", "-p", ports, "--open", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

@function_tool
def nuclei_scan(target: str, severity: str = "medium,high,critical") -> str:
    """Run Nuclei vulnerability scanner against a target URL."""
    result = subprocess.run(
        ["nuclei", "-u", target, "-severity", severity, "-silent", "-jsonl"],
        capture_output=True, text=True, timeout=600
    )
    return result.stdout

@function_tool
def subfinder_enum(domain: str) -> str:
    """Enumerate subdomains for a given domain."""
    result = subprocess.run(
        ["subfinder", "-d", domain, "-silent"],
        capture_output=True, text=True, timeout=120
    )
    return result.stdout

# Define specialist agents
recon_agent = Agent(
    name="ReconAgent",
    instructions="""You are a reconnaissance specialist. Enumerate subdomains
    and identify open ports/services. Be thorough but stay in scope.""",
    tools=[subfinder_enum, nmap_scan],
)

vuln_agent = Agent(
    name="VulnAgent",
    instructions="""You are a vulnerability analyst. Given recon data, run
    targeted vulnerability scans and analyze findings. Prioritize by severity.""",
    tools=[nuclei_scan],
)

# Orchestrator delegates to specialists
orchestrator = Agent(
    name="SecurityOrchestrator",
    instructions="""You coordinate a security assessment. First delegate recon
    to ReconAgent, then pass findings to VulnAgent for scanning.
    Compile a final summary with severity ratings.""",
    handoffs=[recon_agent, vuln_agent],
)

# Run the assessment
result = Runner.run_sync(
    orchestrator,
    "Assess authorized-target.com: enumerate subdomains, scan services, "
    "check for vulnerabilities, and produce a findings summary."
)
print(result.final_output)

LangGraph (LangChain)

🔗

LangGraph

github.com/langchain-ai/langgraph

Graph-based agent orchestration from LangChain. Model workflows as state machines with conditional edges, persistence, and human-in-the-loop breakpoints. Ideal for complex multi-step security assessments that need deterministic control flow.

Python State Machine Conditional Routing Persistence

LangGraph excels at workflows where order matters: recon → scan → exploit → report. Nodes are Python functions, edges define transitions, and conditional edges enable dynamic routing based on findings. Built-in persistence means interrupted workflows can resume.

langgraph_security.py
python
#!/usr/bin/env python3
"""LangGraph — Graph-based security assessment workflow."""
# pip install langgraph langchain-openai

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
import subprocess, json

class AssessmentState(TypedDict):
    target: str
    subdomains: list[str]
    open_ports: dict
    vulnerabilities: list[dict]
    report: str

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

def recon_node(state: AssessmentState) -> AssessmentState:
    """Enumerate subdomains and open ports."""
    target = state["target"]
    subs = subprocess.run(
        ["subfinder", "-d", target, "-silent"],
        capture_output=True, text=True
    ).stdout.strip().split("\n")
    state["subdomains"] = [s for s in subs if s]

    ports = {}
    for sub in state["subdomains"][:10]:  # Limit scope
        result = subprocess.run(
            ["nmap", "-sV", "--top-ports", "100", "-T4", sub],
            capture_output=True, text=True, timeout=120
        )
        ports[sub] = result.stdout
    state["open_ports"] = ports
    return state

def scan_node(state: AssessmentState) -> AssessmentState:
    """Run vulnerability scans on discovered hosts."""
    vulns = []
    for sub in state["subdomains"][:10]:
        result = subprocess.run(
            ["nuclei", "-u", f"https://{sub}", "-severity", "medium,high,critical",
             "-silent", "-jsonl"],
            capture_output=True, text=True, timeout=300
        )
        for line in result.stdout.strip().split("\n"):
            if line:
                vulns.append(json.loads(line))
    state["vulnerabilities"] = vulns
    return state

def should_exploit(state: AssessmentState) -> str:
    """Route based on findings — only proceed if vulns found."""
    return "report" if state["vulnerabilities"] else "report_clean"

def report_node(state: AssessmentState) -> AssessmentState:
    """Generate assessment report using LLM."""
    prompt = f"""Generate a security assessment report:
Target: {state['target']}
Subdomains found: {len(state['subdomains'])}
Vulnerabilities: {json.dumps(state['vulnerabilities'], indent=2)[:4000]}

Format: Executive summary, findings table, remediation steps."""
    response = llm.invoke(prompt)
    state["report"] = response.content
    return state

def report_clean_node(state: AssessmentState) -> AssessmentState:
    state["report"] = f"No significant vulnerabilities found for {state['target']}."
    return state

# Build the graph
graph = StateGraph(AssessmentState)
graph.add_node("recon", recon_node)
graph.add_node("scan", scan_node)
graph.add_node("report", report_node)
graph.add_node("report_clean", report_clean_node)

graph.set_entry_point("recon")
graph.add_edge("recon", "scan")
graph.add_conditional_edges("scan", should_exploit, {
    "report": "report",
    "report_clean": "report_clean",
})
graph.add_edge("report", END)
graph.add_edge("report_clean", END)

workflow = graph.compile()
result = workflow.invoke({"target": "authorized-target.com"})
print(result["report"])
#!/usr/bin/env python3
"""LangGraph — Graph-based security assessment workflow."""
# pip install langgraph langchain-openai

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
import subprocess, json

class AssessmentState(TypedDict):
    target: str
    subdomains: list[str]
    open_ports: dict
    vulnerabilities: list[dict]
    report: str

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

def recon_node(state: AssessmentState) -> AssessmentState:
    """Enumerate subdomains and open ports."""
    target = state["target"]
    subs = subprocess.run(
        ["subfinder", "-d", target, "-silent"],
        capture_output=True, text=True
    ).stdout.strip().split("\n")
    state["subdomains"] = [s for s in subs if s]

    ports = {}
    for sub in state["subdomains"][:10]:  # Limit scope
        result = subprocess.run(
            ["nmap", "-sV", "--top-ports", "100", "-T4", sub],
            capture_output=True, text=True, timeout=120
        )
        ports[sub] = result.stdout
    state["open_ports"] = ports
    return state

def scan_node(state: AssessmentState) -> AssessmentState:
    """Run vulnerability scans on discovered hosts."""
    vulns = []
    for sub in state["subdomains"][:10]:
        result = subprocess.run(
            ["nuclei", "-u", f"https://{sub}", "-severity", "medium,high,critical",
             "-silent", "-jsonl"],
            capture_output=True, text=True, timeout=300
        )
        for line in result.stdout.strip().split("\n"):
            if line:
                vulns.append(json.loads(line))
    state["vulnerabilities"] = vulns
    return state

def should_exploit(state: AssessmentState) -> str:
    """Route based on findings — only proceed if vulns found."""
    return "report" if state["vulnerabilities"] else "report_clean"

def report_node(state: AssessmentState) -> AssessmentState:
    """Generate assessment report using LLM."""
    prompt = f"""Generate a security assessment report:
Target: {state['target']}
Subdomains found: {len(state['subdomains'])}
Vulnerabilities: {json.dumps(state['vulnerabilities'], indent=2)[:4000]}

Format: Executive summary, findings table, remediation steps."""
    response = llm.invoke(prompt)
    state["report"] = response.content
    return state

def report_clean_node(state: AssessmentState) -> AssessmentState:
    state["report"] = f"No significant vulnerabilities found for {state['target']}."
    return state

# Build the graph
graph = StateGraph(AssessmentState)
graph.add_node("recon", recon_node)
graph.add_node("scan", scan_node)
graph.add_node("report", report_node)
graph.add_node("report_clean", report_clean_node)

graph.set_entry_point("recon")
graph.add_edge("recon", "scan")
graph.add_conditional_edges("scan", should_exploit, {
    "report": "report",
    "report_clean": "report_clean",
})
graph.add_edge("report", END)
graph.add_edge("report_clean", END)

workflow = graph.compile()
result = workflow.invoke({"target": "authorized-target.com"})
print(result["report"])

AutoGen v0.4 (Microsoft)

🧠

AutoGen v0.4

github.com/microsoft/autogen

Microsoft's multi-agent conversation framework, rebuilt from the ground up in v0.4. Agents communicate via structured messages, execute code in sandboxed environments, and support human-in-the-loop approval for sensitive operations.

Python Multi-Agent Chat Sandboxed Execution Human Proxy

AutoGen enables agent teams that collaborate on security tasks through structured conversation. The UserProxyAgent acts as an approval gate — critical for offensive operations. v0.4 introduced a new async-first API, better tool handling, and group chat patterns.

autogen_security_team.py
python
#!/usr/bin/env python3
"""AutoGen v0.4 — Multi-agent security team with human approval."""
# pip install autogen-agentchat autogen-ext[openai]

from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient

model = OpenAIChatCompletionClient(model="gpt-4o")

# Recon specialist
recon_agent = AssistantAgent(
    name="ReconSpecialist",
    model_client=model,
    system_message="""You are a reconnaissance specialist. You enumerate
    targets using subfinder, amass, and nmap. Always verify scope before
    scanning. Output structured JSON results.""",
)

# Exploit analyst
exploit_agent = AssistantAgent(
    name="ExploitAnalyst",
    model_client=model,
    system_message="""You are a vulnerability exploitation specialist.
    Given recon data, identify attack vectors and suggest exploitation
    strategies. Never execute without human approval. Rate findings
    using CVSS v4.0 scoring.""",
)

# Report writer
report_agent = AssistantAgent(
    name="ReportWriter",
    model_client=model,
    system_message="""You are a pentest report writer. Take findings from
    the team and produce a professional penetration testing report with
    executive summary, technical details, and remediation guidance.""",
)

# Human proxy — requires approval for sensitive actions
human_proxy = UserProxyAgent(
    name="HumanApprover",
    description="A human operator who reviews and approves actions.",
)

# Create the security team
team = RoundRobinGroupChat(
    participants=[recon_agent, exploit_agent, report_agent, human_proxy],
    max_turns=12,
)

# Run the team assessment
import asyncio
async def main():
    result = await team.run(
        task="Perform an authorized security assessment of target.example.com. "
             "Start with recon, analyze findings, and produce a report."
    )
    print(result)

asyncio.run(main())
#!/usr/bin/env python3
"""AutoGen v0.4 — Multi-agent security team with human approval."""
# pip install autogen-agentchat autogen-ext[openai]

from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient

model = OpenAIChatCompletionClient(model="gpt-4o")

# Recon specialist
recon_agent = AssistantAgent(
    name="ReconSpecialist",
    model_client=model,
    system_message="""You are a reconnaissance specialist. You enumerate
    targets using subfinder, amass, and nmap. Always verify scope before
    scanning. Output structured JSON results.""",
)

# Exploit analyst
exploit_agent = AssistantAgent(
    name="ExploitAnalyst",
    model_client=model,
    system_message="""You are a vulnerability exploitation specialist.
    Given recon data, identify attack vectors and suggest exploitation
    strategies. Never execute without human approval. Rate findings
    using CVSS v4.0 scoring.""",
)

# Report writer
report_agent = AssistantAgent(
    name="ReportWriter",
    model_client=model,
    system_message="""You are a pentest report writer. Take findings from
    the team and produce a professional penetration testing report with
    executive summary, technical details, and remediation guidance.""",
)

# Human proxy — requires approval for sensitive actions
human_proxy = UserProxyAgent(
    name="HumanApprover",
    description="A human operator who reviews and approves actions.",
)

# Create the security team
team = RoundRobinGroupChat(
    participants=[recon_agent, exploit_agent, report_agent, human_proxy],
    max_turns=12,
)

# Run the team assessment
import asyncio
async def main():
    result = await team.run(
        task="Perform an authorized security assessment of target.example.com. "
             "Start with recon, analyze findings, and produce a report."
    )
    print(result)

asyncio.run(main())

CrewAI

👥

CrewAI

github.com/crewAIInc/crewAI

Role-based multi-agent framework where you define agents with roles, goals, and backstories. CrewAI supports sequential and hierarchical task execution, making it intuitive for pentest team simulations where each agent has a clear specialty.

Python Role-Based Sequential / Hierarchical

CrewAI's role-based model maps naturally to pentest teams: a recon specialist, vulnerability analyst, and report writer each bring domain expertise. Agents can use custom tools and delegate sub-tasks to teammates.

crewai_pentest_crew.py
python
#!/usr/bin/env python3
"""CrewAI — Role-based pentest crew with sequential task execution."""
# pip install crewai crewai-tools

from crewai import Agent, Task, Crew, Process
from crewai_tools import tool
import subprocess

@tool
def nmap_scanner(target: str) -> str:
    """Scan target for open ports and services."""
    result = subprocess.run(
        ["nmap", "-sV", "--top-ports", "1000", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

@tool
def nuclei_scanner(url: str) -> str:
    """Run Nuclei vulnerability templates against a URL."""
    result = subprocess.run(
        ["nuclei", "-u", url, "-severity", "medium,high,critical", "-silent"],
        capture_output=True, text=True, timeout=600
    )
    return result.stdout

# Define the crew members
recon_agent = Agent(
    role="Reconnaissance Specialist",
    goal="Map the attack surface of the target",
    backstory="A veteran OSINT analyst with 10+ years mapping corporate "
              "infrastructure. Methodical and thorough.",
    tools=[nmap_scanner],
    verbose=True,
)

exploit_agent = Agent(
    role="Vulnerability Analyst",
    goal="Identify and validate exploitable vulnerabilities",
    backstory="A senior pentester who specializes in web app security "
              "and network exploitation. OSCP and OSWE certified.",
    tools=[nuclei_scanner],
    verbose=True,
)

report_agent = Agent(
    role="Security Report Writer",
    goal="Produce a professional penetration testing report",
    backstory="A technical writer who has authored 500+ pentest reports "
              "for Fortune 500 clients. Knows PTES and OWASP frameworks.",
    verbose=True,
)

# Define tasks
recon_task = Task(
    description="Enumerate open ports and services on {target}. "
                "Identify web services, APIs, and potential entry points.",
    expected_output="JSON list of hosts, ports, services, and versions.",
    agent=recon_agent,
)

exploit_task = Task(
    description="Using the recon data, run vulnerability scans on discovered "
                "web services. Identify CVEs and misconfigurations.",
    expected_output="Prioritized vulnerability list with CVSS scores.",
    agent=exploit_agent,
)

report_task = Task(
    description="Compile all findings into a professional pentest report "
                "with executive summary, technical details, and remediation.",
    expected_output="Markdown-formatted penetration testing report.",
    agent=report_agent,
)

# Assemble and run the crew
crew = Crew(
    agents=[recon_agent, exploit_agent, report_agent],
    tasks=[recon_task, exploit_task, report_task],
    process=Process.sequential,
    verbose=True,
)

result = crew.kickoff(inputs={"target": "authorized-target.com"})
print(result)
#!/usr/bin/env python3
"""CrewAI — Role-based pentest crew with sequential task execution."""
# pip install crewai crewai-tools

from crewai import Agent, Task, Crew, Process
from crewai_tools import tool
import subprocess

@tool
def nmap_scanner(target: str) -> str:
    """Scan target for open ports and services."""
    result = subprocess.run(
        ["nmap", "-sV", "--top-ports", "1000", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

@tool
def nuclei_scanner(url: str) -> str:
    """Run Nuclei vulnerability templates against a URL."""
    result = subprocess.run(
        ["nuclei", "-u", url, "-severity", "medium,high,critical", "-silent"],
        capture_output=True, text=True, timeout=600
    )
    return result.stdout

# Define the crew members
recon_agent = Agent(
    role="Reconnaissance Specialist",
    goal="Map the attack surface of the target",
    backstory="A veteran OSINT analyst with 10+ years mapping corporate "
              "infrastructure. Methodical and thorough.",
    tools=[nmap_scanner],
    verbose=True,
)

exploit_agent = Agent(
    role="Vulnerability Analyst",
    goal="Identify and validate exploitable vulnerabilities",
    backstory="A senior pentester who specializes in web app security "
              "and network exploitation. OSCP and OSWE certified.",
    tools=[nuclei_scanner],
    verbose=True,
)

report_agent = Agent(
    role="Security Report Writer",
    goal="Produce a professional penetration testing report",
    backstory="A technical writer who has authored 500+ pentest reports "
              "for Fortune 500 clients. Knows PTES and OWASP frameworks.",
    verbose=True,
)

# Define tasks
recon_task = Task(
    description="Enumerate open ports and services on {target}. "
                "Identify web services, APIs, and potential entry points.",
    expected_output="JSON list of hosts, ports, services, and versions.",
    agent=recon_agent,
)

exploit_task = Task(
    description="Using the recon data, run vulnerability scans on discovered "
                "web services. Identify CVEs and misconfigurations.",
    expected_output="Prioritized vulnerability list with CVSS scores.",
    agent=exploit_agent,
)

report_task = Task(
    description="Compile all findings into a professional pentest report "
                "with executive summary, technical details, and remediation.",
    expected_output="Markdown-formatted penetration testing report.",
    agent=report_agent,
)

# Assemble and run the crew
crew = Crew(
    agents=[recon_agent, exploit_agent, report_agent],
    tasks=[recon_task, exploit_task, report_task],
    process=Process.sequential,
    verbose=True,
)

result = crew.kickoff(inputs={"target": "authorized-target.com"})
print(result)

Claude MCP Agents (Anthropic)

🔌

Model Context Protocol (MCP)

modelcontextprotocol.io

Anthropic's Model Context Protocol turns any tool into an AI capability. Build custom MCP servers that expose security tools (nmap, nuclei, subfinder) as structured functions that Claude can invoke directly. This is the foundation of HexStrike AI's 150+ tool ecosystem.

Python / TypeScript Tool Protocol Claude Desktop HexStrike Compatible

MCP servers expose tools via a standardized protocol. Claude (or any MCP client) discovers available tools, their parameters, and invokes them as needed. Build custom servers for your security toolchain or use HexStrike AI's pre-built MCP integration.

mcp_security_server.py
python
#!/usr/bin/env python3
"""Claude MCP Security Server — Custom tool server for AI agents."""
# pip install mcp

from mcp.server.fastmcp import FastMCP
import subprocess, json

mcp = FastMCP("security-tools")

@mcp.tool()
def port_scan(target: str, ports: str = "1-1000") -> str:
    """Scan a target host for open ports and services.

    Args:
        target: IP address or hostname to scan
        ports: Port range (default: 1-1000)
    """
    result = subprocess.run(
        ["nmap", "-sV", "-p", ports, "--open", "-oX", "-", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

@mcp.tool()
def vuln_scan(url: str, severity: str = "medium,high,critical") -> str:
    """Run Nuclei vulnerability scanner against a URL.

    Args:
        url: Target URL to scan
        severity: Comma-separated severity levels
    """
    result = subprocess.run(
        ["nuclei", "-u", url, "-severity", severity, "-jsonl", "-silent"],
        capture_output=True, text=True, timeout=600
    )
    findings = []
    for line in result.stdout.strip().split("\n"):
        if line:
            data = json.loads(line)
            findings.append({
                "template": data.get("template-id"),
                "severity": data.get("info", {}).get("severity"),
                "matched": data.get("matched-at"),
                "name": data.get("info", {}).get("name"),
            })
    return json.dumps(findings, indent=2)

@mcp.tool()
def dns_recon(domain: str) -> str:
    """Enumerate DNS records and subdomains for a domain.

    Args:
        domain: Target domain name
    """
    subs = subprocess.run(
        ["subfinder", "-d", domain, "-silent"],
        capture_output=True, text=True, timeout=120
    )
    dns = subprocess.run(
        ["dig", domain, "ANY", "+short"],
        capture_output=True, text=True, timeout=30
    )
    return json.dumps({
        "subdomains": subs.stdout.strip().split("\n"),
        "dns_records": dns.stdout.strip().split("\n"),
    }, indent=2)

@mcp.tool()
def check_headers(url: str) -> str:
    """Analyze HTTP security headers for a URL.

    Args:
        url: Target URL to check
    """
    result = subprocess.run(
        ["curl", "-sI", url],
        capture_output=True, text=True, timeout=30
    )
    headers = result.stdout
    missing = []
    for h in ["Strict-Transport-Security", "Content-Security-Policy",
              "X-Frame-Options", "X-Content-Type-Options",
              "Permissions-Policy", "Referrer-Policy"]:
        if h.lower() not in headers.lower():
            missing.append(h)
    return json.dumps({
        "raw_headers": headers,
        "missing_security_headers": missing,
    }, indent=2)

if __name__ == "__main__":
    mcp.run(transport="stdio")

# Claude Desktop config (~/.claude/claude_desktop_config.json):
# {
#   "mcpServers": {
#     "security-tools": {
#       "command": "python",
#       "args": ["mcp_security_server.py"]
#     }
#   }
# }
#!/usr/bin/env python3
"""Claude MCP Security Server — Custom tool server for AI agents."""
# pip install mcp

from mcp.server.fastmcp import FastMCP
import subprocess, json

mcp = FastMCP("security-tools")

@mcp.tool()
def port_scan(target: str, ports: str = "1-1000") -> str:
    """Scan a target host for open ports and services.

    Args:
        target: IP address or hostname to scan
        ports: Port range (default: 1-1000)
    """
    result = subprocess.run(
        ["nmap", "-sV", "-p", ports, "--open", "-oX", "-", target],
        capture_output=True, text=True, timeout=300
    )
    return result.stdout

@mcp.tool()
def vuln_scan(url: str, severity: str = "medium,high,critical") -> str:
    """Run Nuclei vulnerability scanner against a URL.

    Args:
        url: Target URL to scan
        severity: Comma-separated severity levels
    """
    result = subprocess.run(
        ["nuclei", "-u", url, "-severity", severity, "-jsonl", "-silent"],
        capture_output=True, text=True, timeout=600
    )
    findings = []
    for line in result.stdout.strip().split("\n"):
        if line:
            data = json.loads(line)
            findings.append({
                "template": data.get("template-id"),
                "severity": data.get("info", {}).get("severity"),
                "matched": data.get("matched-at"),
                "name": data.get("info", {}).get("name"),
            })
    return json.dumps(findings, indent=2)

@mcp.tool()
def dns_recon(domain: str) -> str:
    """Enumerate DNS records and subdomains for a domain.

    Args:
        domain: Target domain name
    """
    subs = subprocess.run(
        ["subfinder", "-d", domain, "-silent"],
        capture_output=True, text=True, timeout=120
    )
    dns = subprocess.run(
        ["dig", domain, "ANY", "+short"],
        capture_output=True, text=True, timeout=30
    )
    return json.dumps({
        "subdomains": subs.stdout.strip().split("\n"),
        "dns_records": dns.stdout.strip().split("\n"),
    }, indent=2)

@mcp.tool()
def check_headers(url: str) -> str:
    """Analyze HTTP security headers for a URL.

    Args:
        url: Target URL to check
    """
    result = subprocess.run(
        ["curl", "-sI", url],
        capture_output=True, text=True, timeout=30
    )
    headers = result.stdout
    missing = []
    for h in ["Strict-Transport-Security", "Content-Security-Policy",
              "X-Frame-Options", "X-Content-Type-Options",
              "Permissions-Policy", "Referrer-Policy"]:
        if h.lower() not in headers.lower():
            missing.append(h)
    return json.dumps({
        "raw_headers": headers,
        "missing_security_headers": missing,
    }, indent=2)

if __name__ == "__main__":
    mcp.run(transport="stdio")

# Claude Desktop config (~/.claude/claude_desktop_config.json):
# {
#   "mcpServers": {
#     "security-tools": {
#       "command": "python",
#       "args": ["mcp_security_server.py"]
#     }
#   }
# }

Semantic Kernel (Microsoft)

⚙️

Semantic Kernel

github.com/microsoft/semantic-kernel

Microsoft's enterprise-grade agent framework with a plugin architecture for tool integration. Strong .NET support alongside Python, with automatic function calling, planning, and execution. Best suited for organizations already in the Microsoft ecosystem.

.NET / Python Plugin Architecture Enterprise
SemanticKernelSecurity.cs
csharp
// Semantic Kernel — .NET security agent with plugin architecture
// dotnet add package Microsoft.SemanticKernel

using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.ChatCompletion;
using Microsoft.SemanticKernel.Connectors.OpenAI;
using System.ComponentModel;
using System.Diagnostics;

public class SecurityPlugin
{
    [KernelFunction, Description("Scan target for open ports")]
    public async Task<string> PortScan(
        [Description("Target IP or hostname")] string target,
        [Description("Port range")] string ports = "1-1000")
    {
        var process = Process.Start(new ProcessStartInfo
        {
            FileName = "nmap", Arguments = $"-sV -p {ports} --open {target}",
            RedirectStandardOutput = true, UseShellExecute = false
        });
        return await process!.StandardOutput.ReadToEndAsync();
    }

    [KernelFunction, Description("Run vulnerability scan")]
    public async Task<string> VulnScan(
        [Description("Target URL")] string url)
    {
        var process = Process.Start(new ProcessStartInfo
        {
            FileName = "nuclei",
            Arguments = $"-u {url} -severity medium,high,critical -silent",
            RedirectStandardOutput = true, UseShellExecute = false
        });
        return await process!.StandardOutput.ReadToEndAsync();
    }
}

// Usage:
var builder = Kernel.CreateBuilder();
builder.AddOpenAIChatCompletion("gpt-4o", Environment.GetEnvironmentVariable("OPENAI_API_KEY")!);
builder.Plugins.AddFromType<SecurityPlugin>();
var kernel = builder.Build();

var settings = new OpenAIPromptExecutionSettings {
    FunctionChoiceBehavior = FunctionChoiceBehavior.Auto()
};

var chat = kernel.GetRequiredService<IChatCompletionService>();
var history = new ChatHistory();
history.AddUserMessage("Scan target.example.com for open ports then check for vulns.");
var response = await chat.GetChatMessageContentAsync(history, settings, kernel);
Console.WriteLine(response);
// Semantic Kernel — .NET security agent with plugin architecture
// dotnet add package Microsoft.SemanticKernel

using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.ChatCompletion;
using Microsoft.SemanticKernel.Connectors.OpenAI;
using System.ComponentModel;
using System.Diagnostics;

public class SecurityPlugin
{
    [KernelFunction, Description("Scan target for open ports")]
    public async Task<string> PortScan(
        [Description("Target IP or hostname")] string target,
        [Description("Port range")] string ports = "1-1000")
    {
        var process = Process.Start(new ProcessStartInfo
        {
            FileName = "nmap", Arguments = $"-sV -p {ports} --open {target}",
            RedirectStandardOutput = true, UseShellExecute = false
        });
        return await process!.StandardOutput.ReadToEndAsync();
    }

    [KernelFunction, Description("Run vulnerability scan")]
    public async Task<string> VulnScan(
        [Description("Target URL")] string url)
    {
        var process = Process.Start(new ProcessStartInfo
        {
            FileName = "nuclei",
            Arguments = $"-u {url} -severity medium,high,critical -silent",
            RedirectStandardOutput = true, UseShellExecute = false
        });
        return await process!.StandardOutput.ReadToEndAsync();
    }
}

// Usage:
var builder = Kernel.CreateBuilder();
builder.AddOpenAIChatCompletion("gpt-4o", Environment.GetEnvironmentVariable("OPENAI_API_KEY")!);
builder.Plugins.AddFromType<SecurityPlugin>();
var kernel = builder.Build();

var settings = new OpenAIPromptExecutionSettings {
    FunctionChoiceBehavior = FunctionChoiceBehavior.Auto()
};

var chat = kernel.GetRequiredService<IChatCompletionService>();
var history = new ChatHistory();
history.AddUserMessage("Scan target.example.com for open ports then check for vulns.");
var response = await chat.GetChatMessageContentAsync(history, settings, kernel);
Console.WriteLine(response);

Security-Focused LLMs

Standard LLMs often refuse security-related queries. These specialized models are tuned for offensive security work and can be used as the backbone of any agent framework via Ollama's OpenAI-compatible API.

WhiteRabbitNeo

A fine-tuned security LLM without typical safety refusals. Understands exploit development, vulnerability analysis, and offensive tooling natively. Available in multiple parameter sizes and runs locally via Ollama or LM Studio.

Uncensored Local Only Ollama

WhiteRabbitNeo

Security-specialized

Best for exploit dev & vuln analysis

Dolphin Mixtral

Uncensored reasoning

Best for complex reasoning chains

DeepSeek Coder V2

Code analysis

Best for exploit code review

Security LLMs via Ollama
bash
# Security-Focused Local LLMs via Ollama

# WhiteRabbitNeo — uncensored security model
ollama pull whiterabbitneo
ollama run whiterabbitneo

# Llama 3.1 70B — strong general model, good for security tasks
ollama pull llama3.1:70b

# Dolphin Mixtral — uncensored, good reasoning
ollama pull dolphin-mixtral:8x7b

# DeepSeek Coder V2 — excellent for exploit code analysis
ollama pull deepseek-coder-v2:16b

# Qwen 2.5 Coder — strong code understanding
ollama pull qwen2.5-coder:32b

# Use with any framework via OpenAI-compatible API
# Ollama exposes: http://localhost:11434/v1/chat/completions
# Set base_url="http://localhost:11434/v1" in any OpenAI client
# Security-Focused Local LLMs via Ollama

# WhiteRabbitNeo — uncensored security model
ollama pull whiterabbitneo
ollama run whiterabbitneo

# Llama 3.1 70B — strong general model, good for security tasks
ollama pull llama3.1:70b

# Dolphin Mixtral — uncensored, good reasoning
ollama pull dolphin-mixtral:8x7b

# DeepSeek Coder V2 — excellent for exploit code analysis
ollama pull deepseek-coder-v2:16b

# Qwen 2.5 Coder — strong code understanding
ollama pull qwen2.5-coder:32b

# Use with any framework via OpenAI-compatible API
# Ollama exposes: http://localhost:11434/v1/chat/completions
# Set base_url="http://localhost:11434/v1" in any OpenAI client

Safety Controls

Critical: Implement Before Deploying

Autonomous agents can cause serious damage if misconfigured. Every agent deployment MUST include kill switches, scope validation, human approval for destructive actions, and comprehensive logging.

🛑 Kill Switch

Signal handler that immediately halts all agent execution. Bind to Ctrl+C and expose via API.

⚠️ Human Approval

Gate destructive actions (exploit, upload, modify) behind interactive human confirmation.

🔒 Sandboxing

Run agents in Docker containers or VMs with restricted network access and no host filesystem mounts.

📊 Audit Logging

Structured logging of every tool invocation, result, and decision for post-engagement review.

🌐 Network Controls

Firewall rules limiting agent egress to authorized target IPs only. Block all other outbound traffic.

⏱️ Rate Limiting

Cap tool invocations per minute to prevent runaway loops and accidental DoS against targets.

agent_safety.py
python
#!/usr/bin/env python3
"""Safety wrapper for autonomous security agents."""

import signal, sys, logging, time
from functools import wraps
from typing import Callable

# Structured logging for audit trail
logging.basicConfig(
    filename="agent_audit.log",
    format="%(asctime)s | %(levelname)s | %(message)s",
    level=logging.INFO,
)
log = logging.getLogger("agent-safety")

# ── Kill Switch ──────────────────────────────
class KillSwitch:
    _active = True

    @classmethod
    def check(cls):
        if not cls._active:
            log.critical("KILL SWITCH ACTIVATED — halting agent")
            sys.exit(1)

    @classmethod
    def kill(cls):
        cls._active = False

# Register Ctrl+C as kill switch
signal.signal(signal.SIGINT, lambda *_: KillSwitch.kill())

# ── Human Approval Gate ──────────────────────
DANGEROUS_ACTIONS = {"exploit", "upload", "delete", "modify", "execute"}

def require_approval(action: str, details: str) -> bool:
    if any(d in action.lower() for d in DANGEROUS_ACTIONS):
        log.warning(f"APPROVAL REQUIRED: {action}{details}")
        resp = input(f"\n⚠️  Approve '{action}'? {details} [y/N]: ")
        approved = resp.strip().lower() == "y"
        log.info(f"APPROVAL {'GRANTED' if approved else 'DENIED'}: {action}")
        return approved
    return True

# ── Rate Limiter ─────────────────────────────
class RateLimiter:
    def __init__(self, max_calls: int = 10, window: int = 60):
        self.max_calls = max_calls
        self.window = window
        self.calls: list[float] = []

    def check(self) -> bool:
        now = time.time()
        self.calls = [c for c in self.calls if now - c < self.window]
        if len(self.calls) >= self.max_calls:
            log.warning("RATE LIMIT hit — throttling agent")
            return False
        self.calls.append(now)
        return True

# ── Scope Guard ──────────────────────────────
ALLOWED_TARGETS = {"authorized-target.com", "10.10.10.0/24"}

def validate_scope(target: str) -> bool:
    in_scope = any(target.endswith(t) or target == t for t in ALLOWED_TARGETS)
    if not in_scope:
        log.critical(f"SCOPE VIOLATION: {target} is NOT in scope")
    return in_scope
#!/usr/bin/env python3
"""Safety wrapper for autonomous security agents."""

import signal, sys, logging, time
from functools import wraps
from typing import Callable

# Structured logging for audit trail
logging.basicConfig(
    filename="agent_audit.log",
    format="%(asctime)s | %(levelname)s | %(message)s",
    level=logging.INFO,
)
log = logging.getLogger("agent-safety")

# ── Kill Switch ──────────────────────────────
class KillSwitch:
    _active = True

    @classmethod
    def check(cls):
        if not cls._active:
            log.critical("KILL SWITCH ACTIVATED — halting agent")
            sys.exit(1)

    @classmethod
    def kill(cls):
        cls._active = False

# Register Ctrl+C as kill switch
signal.signal(signal.SIGINT, lambda *_: KillSwitch.kill())

# ── Human Approval Gate ──────────────────────
DANGEROUS_ACTIONS = {"exploit", "upload", "delete", "modify", "execute"}

def require_approval(action: str, details: str) -> bool:
    if any(d in action.lower() for d in DANGEROUS_ACTIONS):
        log.warning(f"APPROVAL REQUIRED: {action}{details}")
        resp = input(f"\n⚠️  Approve '{action}'? {details} [y/N]: ")
        approved = resp.strip().lower() == "y"
        log.info(f"APPROVAL {'GRANTED' if approved else 'DENIED'}: {action}")
        return approved
    return True

# ── Rate Limiter ─────────────────────────────
class RateLimiter:
    def __init__(self, max_calls: int = 10, window: int = 60):
        self.max_calls = max_calls
        self.window = window
        self.calls: list[float] = []

    def check(self) -> bool:
        now = time.time()
        self.calls = [c for c in self.calls if now - c < self.window]
        if len(self.calls) >= self.max_calls:
            log.warning("RATE LIMIT hit — throttling agent")
            return False
        self.calls.append(now)
        return True

# ── Scope Guard ──────────────────────────────
ALLOWED_TARGETS = {"authorized-target.com", "10.10.10.0/24"}

def validate_scope(target: str) -> bool:
    in_scope = any(target.endswith(t) or target == t for t in ALLOWED_TARGETS)
    if not in_scope:
        log.critical(f"SCOPE VIOLATION: {target} is NOT in scope")
    return in_scope

Framework Comparison

Framework Language Agent Type Tool Use Best For Complexity
OpenAI Agents SDK Python Multi-agent handoffs @function_tool decorator Delegating workflows Medium
LangGraph Python State machine graph Node functions Complex pipelines High
AutoGen v0.4 Python Multi-agent conversation Code execution + tools Collaborative research Medium
CrewAI Python Role-based teams @tool decorator Team simulations Low
Claude MCP Python / TypeScript Tool protocol MCP server tools Tool ecosystems Low
Semantic Kernel .NET / Python Plugin-based agent KernelFunction plugins Enterprise / .NET shops Medium

Lab Exercises

🎯

Practice Labs

🔧
Build an OpenAI Agents Security Bot Custom Lab hard
T1059T1203
Open Lab
🔧
LangGraph Recon Pipeline Custom Lab hard
T1059
Open Lab
🔧
CrewAI Pentest Team Custom Lab medium
T1059
Open Lab
🔧
MCP Security Server Custom Lab medium
T1059
Open Lab
📦
HTB: Agent-Assisted Box Hack The Box hard
T1059T1203
Open Lab
🏠
THM: AI Agents Room TryHackMe medium
T1059
Open Lab