🔥 Advanced

Access Control Remediation

Risk Severity
🔴 Critical
Fix Effort
🏗️ High (Significant Work)
Est. Time
⏱️ 4-8 hours
Reference
A01:2021 CWE-639

Broken Access Control is the #1 vulnerability in the OWASP Top 10. It occurs when users can act outside their intended permissions, leading to unauthorized data access, modification, or privilege escalation.

Access Control Decision Flow

flowchart LR A[Request] --> B{Authed?} B -->|No| C[401] B -->|Yes| D{Owner?} D -->|Yes| E[Allow] D -->|No| F{Role?} F -->|No| G[403] F -->|Yes| H{Policy?} H -->|Fail| G H -->|Pass| E

Common Vulnerabilities

Horizontal Privilege Escalation

  • • IDOR: /api/users/123/api/users/456
  • • Parameter manipulation: user_id=other
  • • Missing ownership validation
  • • Predictable resource identifiers

Vertical Privilege Escalation

  • • Accessing admin endpoints without auth
  • • Role parameter tampering
  • • Forced browsing to restricted URLs
  • • Missing function-level access control

IDOR Prevention

Always Verify Ownership

Never trust that a user should have access to a resource just because they know its ID. Always verify ownership or permissions server-side.

Vulnerable vs Secure Pattern

python
# ❌ VULNERABLE - No ownership check
@app.route('/api/documents/<int:doc_id>')
def get_document(doc_id):
    document = Document.query.get_or_404(doc_id)
    return jsonify(document.to_dict())

# ✅ SECURE - Ownership verification
@app.route('/api/documents/<int:doc_id>')
@login_required
def get_document(doc_id):
    document = Document.query.filter_by(
        id=doc_id,
        owner_id=current_user.id  # Verify ownership
    ).first_or_404()
    return jsonify(document.to_dict())

# ✅ SECURE - With shared access support
@app.route('/api/documents/<int:doc_id>')
@login_required
def get_document(doc_id):
    document = Document.query.get_or_404(doc_id)
    
    # Check if user has any form of access
    if not has_document_access(current_user, document):
        abort(403)
    
    return jsonify(document.to_dict())

def has_document_access(user, document):
    """Check if user can access document"""
    # Owner always has access
    if document.owner_id == user.id:
        return True
    
    # Check if document is shared with user
    share = DocumentShare.query.filter_by(
        document_id=document.id,
        user_id=user.id
    ).first()
    
    return share is not None
# ❌ VULNERABLE - No ownership check
@app.route('/api/documents/<int:doc_id>')
def get_document(doc_id):
    document = Document.query.get_or_404(doc_id)
    return jsonify(document.to_dict())

# ✅ SECURE - Ownership verification
@app.route('/api/documents/<int:doc_id>')
@login_required
def get_document(doc_id):
    document = Document.query.filter_by(
        id=doc_id,
        owner_id=current_user.id  # Verify ownership
    ).first_or_404()
    return jsonify(document.to_dict())

# ✅ SECURE - With shared access support
@app.route('/api/documents/<int:doc_id>')
@login_required
def get_document(doc_id):
    document = Document.query.get_or_404(doc_id)
    
    # Check if user has any form of access
    if not has_document_access(current_user, document):
        abort(403)
    
    return jsonify(document.to_dict())

def has_document_access(user, document):
    """Check if user can access document"""
    # Owner always has access
    if document.owner_id == user.id:
        return True
    
    # Check if document is shared with user
    share = DocumentShare.query.filter_by(
        document_id=document.id,
        user_id=user.id
    ).first()
    
    return share is not None

Use UUIDs Instead of Sequential IDs

python
import uuid
from sqlalchemy.dialects.postgresql import UUID

class Document(db.Model):
    # Use UUID instead of auto-increment
    id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    owner_id = db.Column(UUID(as_uuid=True), db.ForeignKey('user.id'), nullable=False)
    title = db.Column(db.String(255))
    
    # URL would be: /documents/550e8400-e29b-41d4-a716-446655440000
    # Not easily guessable like /documents/123

# Still verify ownership even with UUIDs!
# UUIDs add obscurity but are NOT a security control
import uuid
from sqlalchemy.dialects.postgresql import UUID

class Document(db.Model):
    # Use UUID instead of auto-increment
    id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    owner_id = db.Column(UUID(as_uuid=True), db.ForeignKey('user.id'), nullable=False)
    title = db.Column(db.String(255))
    
    # URL would be: /documents/550e8400-e29b-41d4-a716-446655440000
    # Not easily guessable like /documents/123

# Still verify ownership even with UUIDs!
# UUIDs add obscurity but are NOT a security control

Role-Based Access Control (RBAC)

python
from functools import wraps
from flask import abort

# Define permissions
class Permission:
    READ = 'read'
    WRITE = 'write'
    DELETE = 'delete'
    ADMIN = 'admin'

# Role definitions
ROLE_PERMISSIONS = {
    'viewer': [Permission.READ],
    'editor': [Permission.READ, Permission.WRITE],
    'admin': [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
}

def requires_permission(permission):
    """Decorator to check permissions"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not current_user.is_authenticated:
                abort(401)
            
            user_permissions = ROLE_PERMISSIONS.get(current_user.role, [])
            if permission not in user_permissions:
                abort(403)
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# Usage
@app.route('/api/users', methods=['DELETE'])
@requires_permission(Permission.ADMIN)
def delete_user():
    # Only admins can reach here
    pass

@app.route('/api/documents', methods=['POST'])
@requires_permission(Permission.WRITE)
def create_document():
    # Editors and admins can create
    pass
from functools import wraps
from flask import abort

# Define permissions
class Permission:
    READ = 'read'
    WRITE = 'write'
    DELETE = 'delete'
    ADMIN = 'admin'

# Role definitions
ROLE_PERMISSIONS = {
    'viewer': [Permission.READ],
    'editor': [Permission.READ, Permission.WRITE],
    'admin': [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
}

def requires_permission(permission):
    """Decorator to check permissions"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if not current_user.is_authenticated:
                abort(401)
            
            user_permissions = ROLE_PERMISSIONS.get(current_user.role, [])
            if permission not in user_permissions:
                abort(403)
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# Usage
@app.route('/api/users', methods=['DELETE'])
@requires_permission(Permission.ADMIN)
def delete_user():
    # Only admins can reach here
    pass

@app.route('/api/documents', methods=['POST'])
@requires_permission(Permission.WRITE)
def create_document():
    # Editors and admins can create
    pass

Attribute-Based Access Control (ABAC)

ABAC provides more granular control by evaluating attributes of the user, resource, action, and environment.

python
from dataclasses import dataclass
from typing import Any, Dict

@dataclass
class AccessRequest:
    subject: Dict[str, Any]    # User attributes
    resource: Dict[str, Any]   # Resource attributes  
    action: str                # Requested action
    environment: Dict[str, Any] = None  # Context (time, IP, etc.)

class PolicyEngine:
    def evaluate(self, request: AccessRequest) -> bool:
        """Evaluate access request against policies"""
        
        # Policy 1: Users can only access resources in their department
        if request.resource.get('department') != request.subject.get('department'):
            # Exception for admins
            if 'admin' not in request.subject.get('roles', []):
                return False
        
        # Policy 2: Sensitive documents require manager role
        if request.resource.get('classification') == 'sensitive':
            if 'manager' not in request.subject.get('roles', []):
                return False
        
        # Policy 3: Write access only during business hours
        if request.action in ['write', 'delete']:
            hour = request.environment.get('hour', 12)
            if hour < 9 or hour > 17:
                if 'admin' not in request.subject.get('roles', []):
                    return False
        
        # Policy 4: Contractors cannot delete
        if request.action == 'delete':
            if request.subject.get('user_type') == 'contractor':
                return False
        
        return True

# Usage
policy = PolicyEngine()

def check_access(user, resource, action):
    request = AccessRequest(
        subject={
            'id': user.id,
            'department': user.department,
            'roles': user.roles,
            'user_type': user.user_type,
        },
        resource={
            'id': resource.id,
            'department': resource.department,
            'classification': resource.classification,
        },
        action=action,
        environment={
            'hour': datetime.now().hour,
            'ip': request.remote_addr,
        }
    )
    return policy.evaluate(request)
from dataclasses import dataclass
from typing import Any, Dict

@dataclass
class AccessRequest:
    subject: Dict[str, Any]    # User attributes
    resource: Dict[str, Any]   # Resource attributes  
    action: str                # Requested action
    environment: Dict[str, Any] = None  # Context (time, IP, etc.)

class PolicyEngine:
    def evaluate(self, request: AccessRequest) -> bool:
        """Evaluate access request against policies"""
        
        # Policy 1: Users can only access resources in their department
        if request.resource.get('department') != request.subject.get('department'):
            # Exception for admins
            if 'admin' not in request.subject.get('roles', []):
                return False
        
        # Policy 2: Sensitive documents require manager role
        if request.resource.get('classification') == 'sensitive':
            if 'manager' not in request.subject.get('roles', []):
                return False
        
        # Policy 3: Write access only during business hours
        if request.action in ['write', 'delete']:
            hour = request.environment.get('hour', 12)
            if hour < 9 or hour > 17:
                if 'admin' not in request.subject.get('roles', []):
                    return False
        
        # Policy 4: Contractors cannot delete
        if request.action == 'delete':
            if request.subject.get('user_type') == 'contractor':
                return False
        
        return True

# Usage
policy = PolicyEngine()

def check_access(user, resource, action):
    request = AccessRequest(
        subject={
            'id': user.id,
            'department': user.department,
            'roles': user.roles,
            'user_type': user.user_type,
        },
        resource={
            'id': resource.id,
            'department': resource.department,
            'classification': resource.classification,
        },
        action=action,
        environment={
            'hour': datetime.now().hour,
            'ip': request.remote_addr,
        }
    )
    return policy.evaluate(request)

API Authorization Patterns

Express.js Middleware

javascript
const express = require('express');

// Authorization middleware
const authorize = (...allowedRoles) => {
  return (req, res, next) => {
    if (!req.user) {
      return res.status(401).json({ error: 'Authentication required' });
    }
    
    if (!allowedRoles.includes(req.user.role)) {
      return res.status(403).json({ error: 'Insufficient permissions' });
    }
    
    next();
  };
};

// Resource ownership middleware
const ownsResource = (resourceGetter) => {
  return async (req, res, next) => {
    const resource = await resourceGetter(req);
    
    if (!resource) {
      return res.status(404).json({ error: 'Resource not found' });
    }
    
    // Admin bypass
    if (req.user.role === 'admin') {
      req.resource = resource;
      return next();
    }
    
    // Ownership check
    if (resource.ownerId !== req.user.id) {
      return res.status(403).json({ error: 'Access denied' });
    }
    
    req.resource = resource;
    next();
  };
};

// Usage
app.get('/api/admin/users', 
  authorize('admin'),
  adminController.listUsers
);

app.get('/api/documents/:id',
  authorize('user', 'admin'),
  ownsResource(req => Document.findById(req.params.id)),
  documentController.getDocument
);

app.delete('/api/documents/:id',
  authorize('user', 'admin'),
  ownsResource(req => Document.findById(req.params.id)),
  documentController.deleteDocument
);
const express = require('express');

// Authorization middleware
const authorize = (...allowedRoles) => {
  return (req, res, next) => {
    if (!req.user) {
      return res.status(401).json({ error: 'Authentication required' });
    }
    
    if (!allowedRoles.includes(req.user.role)) {
      return res.status(403).json({ error: 'Insufficient permissions' });
    }
    
    next();
  };
};

// Resource ownership middleware
const ownsResource = (resourceGetter) => {
  return async (req, res, next) => {
    const resource = await resourceGetter(req);
    
    if (!resource) {
      return res.status(404).json({ error: 'Resource not found' });
    }
    
    // Admin bypass
    if (req.user.role === 'admin') {
      req.resource = resource;
      return next();
    }
    
    // Ownership check
    if (resource.ownerId !== req.user.id) {
      return res.status(403).json({ error: 'Access denied' });
    }
    
    req.resource = resource;
    next();
  };
};

// Usage
app.get('/api/admin/users', 
  authorize('admin'),
  adminController.listUsers
);

app.get('/api/documents/:id',
  authorize('user', 'admin'),
  ownsResource(req => Document.findById(req.params.id)),
  documentController.getDocument
);

app.delete('/api/documents/:id',
  authorize('user', 'admin'),
  ownsResource(req => Document.findById(req.params.id)),
  documentController.deleteDocument
);

Database-Level Controls

sql
-- PostgreSQL Row-Level Security (RLS)

-- Enable RLS on table
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Policy: Users can only see their own documents
CREATE POLICY documents_owner_policy ON documents
    FOR ALL
    USING (owner_id = current_setting('app.current_user_id')::uuid);

-- Policy: Admins can see all documents
CREATE POLICY documents_admin_policy ON documents
    FOR ALL
    USING (
        EXISTS (
            SELECT 1 FROM users 
            WHERE id = current_setting('app.current_user_id')::uuid 
            AND role = 'admin'
        )
    );

-- Policy: Shared documents
CREATE POLICY documents_shared_policy ON documents
    FOR SELECT
    USING (
        EXISTS (
            SELECT 1 FROM document_shares
            WHERE document_id = documents.id
            AND user_id = current_setting('app.current_user_id')::uuid
        )
    );

-- Set user context in application
-- Python/psycopg2:
-- cursor.execute("SET app.current_user_id = %s", [user_id])
-- PostgreSQL Row-Level Security (RLS)

-- Enable RLS on table
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Policy: Users can only see their own documents
CREATE POLICY documents_owner_policy ON documents
    FOR ALL
    USING (owner_id = current_setting('app.current_user_id')::uuid);

-- Policy: Admins can see all documents
CREATE POLICY documents_admin_policy ON documents
    FOR ALL
    USING (
        EXISTS (
            SELECT 1 FROM users 
            WHERE id = current_setting('app.current_user_id')::uuid 
            AND role = 'admin'
        )
    );

-- Policy: Shared documents
CREATE POLICY documents_shared_policy ON documents
    FOR SELECT
    USING (
        EXISTS (
            SELECT 1 FROM document_shares
            WHERE document_id = documents.id
            AND user_id = current_setting('app.current_user_id')::uuid
        )
    );

-- Set user context in application
-- Python/psycopg2:
-- cursor.execute("SET app.current_user_id = %s", [user_id])

Testing Verification

python
import pytest

class TestDocumentAccessControl:
    """Test IDOR and access control vulnerabilities"""
    
    def test_user_cannot_access_others_document(self, client):
        """Horizontal privilege escalation test"""
        # User A creates document
        user_a = create_user()
        doc = create_document(owner=user_a)
        
        # User B tries to access
        user_b = create_user()
        client.login(user_b)
        
        response = client.get(f'/api/documents/{doc.id}')
        assert response.status_code == 403
    
    def test_user_cannot_delete_others_document(self, client):
        """Test IDOR on destructive actions"""
        user_a = create_user()
        doc = create_document(owner=user_a)
        
        user_b = create_user()
        client.login(user_b)
        
        response = client.delete(f'/api/documents/{doc.id}')
        assert response.status_code == 403
        assert Document.query.get(doc.id) is not None  # Not deleted
    
    def test_regular_user_cannot_access_admin_endpoint(self, client):
        """Vertical privilege escalation test"""
        user = create_user(role='user')
        client.login(user)
        
        response = client.get('/api/admin/users')
        assert response.status_code == 403
    
    def test_parameter_tampering_blocked(self, client):
        """Test role parameter tampering"""
        user = create_user(role='user')
        client.login(user)
        
        # Try to elevate own role
        response = client.put('/api/profile', json={'role': 'admin'})
        
        user.refresh()
        assert user.role == 'user'  # Role unchanged
import pytest

class TestDocumentAccessControl:
    """Test IDOR and access control vulnerabilities"""
    
    def test_user_cannot_access_others_document(self, client):
        """Horizontal privilege escalation test"""
        # User A creates document
        user_a = create_user()
        doc = create_document(owner=user_a)
        
        # User B tries to access
        user_b = create_user()
        client.login(user_b)
        
        response = client.get(f'/api/documents/{doc.id}')
        assert response.status_code == 403
    
    def test_user_cannot_delete_others_document(self, client):
        """Test IDOR on destructive actions"""
        user_a = create_user()
        doc = create_document(owner=user_a)
        
        user_b = create_user()
        client.login(user_b)
        
        response = client.delete(f'/api/documents/{doc.id}')
        assert response.status_code == 403
        assert Document.query.get(doc.id) is not None  # Not deleted
    
    def test_regular_user_cannot_access_admin_endpoint(self, client):
        """Vertical privilege escalation test"""
        user = create_user(role='user')
        client.login(user)
        
        response = client.get('/api/admin/users')
        assert response.status_code == 403
    
    def test_parameter_tampering_blocked(self, client):
        """Test role parameter tampering"""
        user = create_user(role='user')
        client.login(user)
        
        # Try to elevate own role
        response = client.put('/api/profile', json={'role': 'admin'})
        
        user.refresh()
        assert user.role == 'user'  # Role unchanged

Manual Testing

IDOR Test Cases

bash
# Horizontal privilege escalation tests
# Login as user A, try to access user B's resources

# Test 1: Direct object reference
GET /api/users/123/profile  # Your ID
GET /api/users/456/profile  # Other user's ID - should fail

# Test 2: Sequential ID enumeration
for id in {1..1000}; do
  curl -H "Cookie: session=your_session" \
    https://api.site.com/documents/$id
done
# Should only return your documents

# Test 3: UUID guessing (if predictable)
GET /api/orders/550e8400-e29b-41d4-a716-446655440000
# Horizontal privilege escalation tests
# Login as user A, try to access user B's resources

# Test 1: Direct object reference
GET /api/users/123/profile  # Your ID
GET /api/users/456/profile  # Other user's ID - should fail

# Test 2: Sequential ID enumeration
for id in {1..1000}; do
  curl -H "Cookie: session=your_session" \
    https://api.site.com/documents/$id
done
# Should only return your documents

# Test 3: UUID guessing (if predictable)
GET /api/orders/550e8400-e29b-41d4-a716-446655440000

Vertical Privilege Escalation Tests

bash
# Test admin endpoints as regular user
curl -X GET https://api.site.com/admin/users \
  -H "Cookie: session=regular_user_session"
# Expected: 403 Forbidden

# Test role manipulation
curl -X PUT https://api.site.com/api/profile \
  -H "Cookie: session=regular_user_session" \
  -d '{"role": "admin"}'
# Expected: Role field should be ignored

# Test hidden admin paths
/admin
/admin.php
/administrator
/manage
/console
# Test admin endpoints as regular user
curl -X GET https://api.site.com/admin/users \
  -H "Cookie: session=regular_user_session"
# Expected: 403 Forbidden

# Test role manipulation
curl -X PUT https://api.site.com/api/profile \
  -H "Cookie: session=regular_user_session" \
  -d '{"role": "admin"}'
# Expected: Role field should be ignored

# Test hidden admin paths
/admin
/admin.php
/administrator
/manage
/console

Common Mistakes

❌ Client-Side Only Checks

javascript
// DON'T: Only hide in UI
if (user.isAdmin) {
  showAdminButton();
}
// Endpoint still accessible!
// DON'T: Only hide in UI
if (user.isAdmin) {
  showAdminButton();
}
// Endpoint still accessible!

Attackers can call endpoints directly

✅ Correct Approach

python
# DO: Server-side authorization
@require_role('admin')
def admin_endpoint():
    # Decorator checks role on server
# DO: Server-side authorization
@require_role('admin')
def admin_endpoint():
    # Decorator checks role on server

Always enforce on server side

❌ Trusting User ID from Request

python
# DON'T: User controls the ID
user_id = request.args['user_id']
return get_user_data(user_id)
# DON'T: User controls the ID
user_id = request.args['user_id']
return get_user_data(user_id)

Attacker changes user_id parameter

✅ Correct Approach

python
# DO: Get user ID from session
user_id = session['user_id']
return get_user_data(user_id)
# DO: Get user ID from session
user_id = session['user_id']
return get_user_data(user_id)

Session data is server-controlled

Verification Checklist

  • ☐ All endpoints require authentication (except public resources)
  • ☐ Authorization checked on every request (not just UI)
  • ☐ Resource ownership verified before access
  • ☐ Role/permission checks implemented consistently
  • ☐ User IDs taken from session, not request parameters
  • ☐ Direct object references validated
  • ☐ Admin functions protected by role checks
  • ☐ API and web endpoints have matching controls
  • ☐ Deny by default policy implemented
  • ☐ Access control failures logged and monitored
  • ☐ Automated tests for access control in place
  • ☐ Rate limiting on authentication endpoints
🎯

Access Control Labs

Practice exploiting and fixing broken access control.

🔶
Access Control Vulnerabilities PortSwigger medium
IDORHorizontal privilege escalationVertical privilege escalation
Open Lab