Logging & Monitoring Best Practices

Risk Severity
๐ŸŸก Medium
Fix Effort
โšก Low (Quick Fix)
Est. Time
โฑ๏ธ 2-4 hours
Reference
A09:2021 CWE-117, CWE-532

Proper logging is critical for detecting attacks, investigating incidents, and meeting compliance requirements. However, logs can introduce vulnerabilities through injection, sensitive data exposure, and insufficient monitoring.

Security Logging Triad

Log the right things: Security events + errors
Protect the logs: No injection or sensitive data
Monitor effectively: Alerting + incident response

Log Injection Prevention

Log injection occurs when attackers insert malicious content into logs, potentially leading to log forging, CRLF injection, or exploitation of log analysis tools.

Vulnerable Logging

python
import logging

# โŒ VULNERABLE: Direct string interpolation
def vulnerable_login(username, password):
    logging.info(f"Login attempt for user: {username}")
    # Attacker username: "admin\n[2024-01-01] CRITICAL: System compromised"
    # Creates fake log entry!

# โŒ VULNERABLE: CRLF injection
def vulnerable_log(user_input):
    logger.info("User input: " + user_input)
    # Attacker input: "test\r\nINFO: Malicious entry"
    
# โŒ VULNERABLE: Format string injection (older Python)
def vulnerable_format(username):
    logging.info("Login: %s" % username)
    # Can be exploited with format specifiers

Secure Logging

python
import logging
import re
import json

# โœ… SECURE: Use structured logging with parameterized messages
def secure_login(username, password):
    # Use parameter substitution - logger handles escaping
    logging.info("Login attempt for user: %s", username)
    # OR use structured logging (preferred)
    logging.info("Login attempt", extra={
        'username': sanitize_for_log(username),
        'event': 'login_attempt',
        'source_ip': get_client_ip()
    })

# โœ… SECURE: Sanitize user input in logs
def sanitize_for_log(user_input: str) -> str:
    """Remove CRLF and control characters"""
    if not isinstance(user_input, str):
        return str(user_input)
    
    # Remove newlines, carriage returns, and control characters
    sanitized = re.sub(r'[\r\n\x00-\x1f\x7f]', '', user_input)
    
    # Limit length
    if len(sanitized) > 200:
        sanitized = sanitized[:200] + "..."
    
    return sanitized

# โœ… SECURE: Structured JSON logging
import logging.config
import json_log_formatter

class CustomJSONFormatter(json_log_formatter.JSONFormatter):
    def json_record(self, message, extra, record):
        extra['timestamp'] = record.created
        extra['level'] = record.levelname
        extra['logger'] = record.name
        
        # Sanitize all string values
        for key, value in extra.items():
            if isinstance(value, str):
                extra[key] = sanitize_for_log(value)
        
        return extra

# Configure structured logging
handler = logging.StreamHandler()
handler.setFormatter(CustomJSONFormatter())
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Usage - automatically sanitized and structured
logger.info("User action", extra={
    'event': 'file_upload',
    'username': username,
    'filename': filename,
    'size': filesize
})
# Output: {"timestamp": 1234567890, "level": "INFO", ...}

Node.js: Winston Secure Logging

javascript
const winston = require('winston');

// โŒ VULNERABLE: String concatenation
logger.info('Login attempt: ' + username);

// โœ… SECURE: Use winston with structured logging
const logger = winston.createLogger({
    level: 'info',
    format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.errors({ stack: true }),
        winston.format.json()
    ),
    transports: [
        new winston.transports.File({ filename: 'error.log', level: 'error' }),
        new winston.transports.File({ filename: 'combined.log' })
    ]
});

// Sanitization function
function sanitizeForLog(input) {
    if (typeof input !== 'string') return input;
    
    // Remove CRLF and control characters
    return input
        .replace(/[\r\n\x00-\x1f\x7f]/g, '')
        .substring(0, 200);
}

// โœ… SECURE: Structured logging with metadata
logger.info('Login attempt', {
    event: 'login',
    username: sanitizeForLog(username),
    sourceIP: req.ip,
    userAgent: req.get('user-agent')
});

// โœ… SECURE: Custom formatter with automatic sanitization
const sanitizingFormat = winston.format((info) => {
    Object.keys(info).forEach(key => {
        if (typeof info[key] === 'string') {
            info[key] = sanitizeForLog(info[key]);
        }
    });
    return info;
});

const secureLogger = winston.createLogger({
    format: winston.format.combine(
        sanitizingFormat(),
        winston.format.timestamp(),
        winston.format.json()
    ),
    transports: [new winston.transports.Console()]
});

Protecting Sensitive Data in Logs

Never Log These

โ€ข Passwords (even hashed!)
โ€ข Session tokens / API keys
โ€ข Credit card numbers
โ€ข Social security numbers
โ€ข Personal health information
โ€ข Encryption keys

Automatic Sensitive Data Redaction

python
import logging
import re

class SensitiveDataFilter(logging.Filter):
    """Filter to redact sensitive information from logs"""
    
    # Patterns for sensitive data
    PATTERNS = {
        'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
        'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
        'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
        'password': r'(password|passwd|pwd)["']?\s*[:=]\s*["']?([^"'\s]+)',
        'api_key': r'(api[_-]?key|apikey|access[_-]?token)["']?\s*[:=]\s*["']?([A-Za-z0-9-_]{20,})',
        'jwt': r'eyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_]+'
    }
    
    def filter(self, record):
        """Redact sensitive data from log message"""
        if isinstance(record.msg, str):
            record.msg = self.redact(record.msg)
        
        # Also redact from args
        if record.args:
            record.args = tuple(
                self.redact(str(arg)) if isinstance(arg, str) else arg
                for arg in record.args
            )
        
        return True
    
    def redact(self, text):
        """Replace sensitive patterns with [REDACTED]"""
        for name, pattern in self.PATTERNS.items():
            text = re.sub(pattern, f'[REDACTED-{name.upper()}]', text, flags=re.IGNORECASE)
        return text

# Apply filter to logger
logger = logging.getLogger()
logger.addFilter(SensitiveDataFilter())

# Test - these will be automatically redacted
logger.info("User payment: 4532-1234-5678-9010")  
# Output: User payment: [REDACTED-CREDIT_CARD]

logger.info("API request with key: sk_live_abc123xyz789")
# Output: API request with key: [REDACTED-API_KEY]

# โœ… SECURE: Explicit field masking
def mask_sensitive_fields(data: dict) -> dict:
    """Mask sensitive fields in dictionary for logging"""
    sensitive_keys = {'password', 'ssn', 'credit_card', 'api_key', 'token'}
    
    masked = data.copy()
    for key in masked:
        if any(sensitive in key.lower() for sensitive in sensitive_keys):
            if masked[key]:
                # Show first 4 chars only
                masked[key] = masked[key][:4] + '*' * (len(str(masked[key])) - 4)
    
    return masked

# Usage
user_data = {
    'username': 'john',
    'password': 'secret123',
    'email': 'john@example.com',
    'credit_card': '4532123456789010'
}

logger.info("User data: %s", mask_sensitive_fields(user_data))
# Output: User data: {'username': 'john', 'password': 'secr*****', ...}

Java: Log4j2 Pattern Masking

java
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.pattern.PatternConverter;

// log4j2.xml configuration
/*
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout>
                <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
                <Replace regex="password["']?\s*[:=]\s*["']?([^"'\s]+)" 
                         replacement="password=[REDACTED]"/>
                <Replace regex="\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b" 
                         replacement="[REDACTED-CC]"/>
            </PatternLayout>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>
*/

// โœ… SECURE: Custom masking in Java
public class SensitiveDataMasker {
    private static final Logger logger = LogManager.getLogger();
    
    public static String maskCreditCard(String cc) {
        if (cc == null || cc.length() < 8) return cc;
        return "****-****-****-" + cc.substring(cc.length() - 4);
    }
    
    public static String maskEmail(String email) {
        if (email == null || !email.contains("@")) return email;
        String[] parts = email.split("@");
        return parts[0].substring(0, 2) + "***@" + parts[1];
    }
    
    public static void logUserAction(String username, String action, Map<String, Object> data) {
        // Mask sensitive fields
        Map<String, Object> safeData = new HashMap<>(data);
        if (safeData.containsKey("creditCard")) {
            safeData.put("creditCard", maskCreditCard((String) safeData.get("creditCard")));
        }
        if (safeData.containsKey("email")) {
            safeData.put("email", maskEmail((String) safeData.get("email")));
        }
        
        logger.info("User action: username={}, action={}, data={}", 
                    username, action, safeData);
    }
}

What to Log for Security

โœ… Always Log

  • โ€ข Authentication events (success/failure)
  • โ€ข Authorization failures (403 Forbidden)
  • โ€ข Input validation failures
  • โ€ข Session management events
  • โ€ข Security-relevant config changes
  • โ€ข Data access (especially sensitive data)
  • โ€ข Application errors & exceptions
  • โ€ข Admin actions

๐Ÿ“Š Include Context

  • โ€ข Timestamp (ISO 8601 format)
  • โ€ข User ID / Session ID
  • โ€ข Source IP address
  • โ€ข User agent
  • โ€ข Request ID (correlation)
  • โ€ข Action performed
  • โ€ข Result (success/failure)
  • โ€ข Resource accessed

Comprehensive Security Logging Example

python
import logging
import json
from datetime import datetime
from flask import request, g

class SecurityLogger:
    """Centralized security event logging"""
    
    def __init__(self):
        self.logger = logging.getLogger('security')
        self.logger.setLevel(logging.INFO)
        
        # JSON formatter for structured logs
        handler = logging.FileHandler('security.log')
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
    
    def _get_context(self):
        """Get request context for logging"""
        return {
            'timestamp': datetime.utcnow().isoformat(),
            'ip': request.remote_addr if request else None,
            'user_id': g.user.id if hasattr(g, 'user') else None,
            'session_id': g.session_id if hasattr(g, 'session_id') else None,
            'user_agent': request.headers.get('User-Agent') if request else None,
            'request_id': g.request_id if hasattr(g, 'request_id') else None
        }
    
    def log_authentication(self, username, success, reason=None):
        """Log authentication attempt"""
        event = {
            **self._get_context(),
            'event_type': 'authentication',
            'username': username,
            'success': success,
            'reason': reason
        }
        self.logger.info(json.dumps(event))
    
    def log_authorization_failure(self, user_id, resource, action):
        """Log authorization failure (403)"""
        event = {
            **self._get_context(),
            'event_type': 'authorization_failure',
            'user_id': user_id,
            'resource': resource,
            'action': action
        }
        self.logger.warning(json.dumps(event))
    
    def log_sensitive_data_access(self, user_id, data_type, record_id):
        """Log access to sensitive data"""
        event = {
            **self._get_context(),
            'event_type': 'sensitive_data_access',
            'user_id': user_id,
            'data_type': data_type,
            'record_id': record_id
        }
        self.logger.info(json.dumps(event))
    
    def log_input_validation_failure(self, field, value, reason):
        """Log input validation failure (potential attack)"""
        event = {
            **self._get_context(),
            'event_type': 'input_validation_failure',
            'field': field,
            'value': sanitize_for_log(value),
            'reason': reason
        }
        self.logger.warning(json.dumps(event))
    
    def log_admin_action(self, user_id, action, target, details=None):
        """Log administrative action"""
        event = {
            **self._get_context(),
            'event_type': 'admin_action',
            'user_id': user_id,
            'action': action,
            'target': target,
            'details': details
        }
        self.logger.info(json.dumps(event))

# Usage
security_logger = SecurityLogger()

@app.route('/login', methods=['POST'])
def login():
    username = request.form['username']
    password = request.form['password']
    
    user = authenticate(username, password)
    if user:
        security_logger.log_authentication(username, True)
        return "Login successful"
    else:
        security_logger.log_authentication(username, False, "Invalid credentials")
        return "Login failed", 401

@app.route('/admin/delete-user/<user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
    security_logger.log_admin_action(
        g.user.id, 
        'delete_user', 
        user_id,
        details={'reason': request.form.get('reason')}
    )
    # Delete user...
    return "User deleted"

Security Monitoring & Alerting

ELK Stack (Elasticsearch, Logstash, Kibana)

ruby
# Logstash filter configuration for security logs
filter {
    # Parse JSON logs
    json {
        source => "message"
    }
    
    # Detect brute force attempts
    if [event_type] == "authentication" and [success] == false {
        # Count failed attempts per IP
        aggregate {
            task_id => "%{ip}"
            code => "
                map['failed_attempts'] ||= 0
                map['failed_attempts'] += 1
                
                # Alert if > 5 failed attempts in 5 minutes
                if map['failed_attempts'] > 5
                    event.set('alert', 'brute_force_detected')
                end
            "
            timeout => 300
        }
    }
    
    # Detect suspicious patterns
    if [event_type] == "input_validation_failure" {
        if [reason] =~ /sql_injection|xss|path_traversal/ {
            mutate {
                add_field => { "alert" => "attack_detected" }
                add_field => { "severity" => "high" }
            }
        }
    }
    
    # GeoIP lookup for suspicious countries
    geoip {
        source => "ip"
        target => "geoip"
    }
}

# Alert output
output {
    # Send to Elasticsearch
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "security-logs-%{+YYYY.MM.dd}"
    }
    
    # Send alerts to SIEM/Slack/Email
    if [alert] {
        http {
            url => "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
            format => "json"
            http_method => "post"
        }
    }
}

Simple Python Alerting

python
from collections import defaultdict
from datetime import datetime, timedelta
import smtplib
from email.message import EmailMessage

class SecurityAlertManager:
    """Detect and alert on suspicious patterns"""
    
    def __init__(self):
        self.failed_logins = defaultdict(list)  # IP -> [timestamps]
        self.alert_threshold = 5  # failures
        self.time_window = timedelta(minutes=5)
    
    def check_brute_force(self, ip_address):
        """Check for brute force attack"""
        now = datetime.utcnow()
        
        # Add current attempt
        self.failed_logins[ip_address].append(now)
        
        # Remove old attempts outside time window
        cutoff = now - self.time_window
        self.failed_logins[ip_address] = [
            ts for ts in self.failed_logins[ip_address] 
            if ts > cutoff
        ]
        
        # Check if threshold exceeded
        if len(self.failed_logins[ip_address]) >= self.alert_threshold:
            self.send_alert(
                f"Brute force detected from {ip_address}",
                f"{len(self.failed_logins[ip_address])} failed attempts in 5 minutes"
            )
            # Block IP or trigger other response
            return True
        
        return False
    
    def send_alert(self, subject, body):
        """Send email alert"""
        msg = EmailMessage()
        msg.set_content(body)
        msg['Subject'] = f"[SECURITY ALERT] {subject}"
        msg['From'] = "security@example.com"
        msg['To'] = "admin@example.com"
        
        # Send via SMTP
        with smtplib.SMTP('localhost') as s:
            s.send_message(msg)
        
        # Also log
        logging.critical(f"ALERT: {subject} - {body}")

# Usage with Flask
alert_manager = SecurityAlertManager()

@app.route('/login', methods=['POST'])
def login():
    username = request.form['username']
    password = request.form['password']
    
    if not authenticate(username, password):
        # Check for brute force
        if alert_manager.check_brute_force(request.remote_addr):
            return "Too many failed attempts", 429
        
        return "Login failed", 401
    
    return "Success"

๐Ÿงช Testing Verification

Test Log Injection Prevention

python
import logging
import unittest

class TestLogInjection(unittest.TestCase):
    def setUp(self):
        # Capture logs
        self.log_capture = []
        handler = logging.Handler()
        handler.emit = lambda record: self.log_capture.append(record.getMessage())
        logging.getLogger().addHandler(handler)
    
    def test_crlf_injection(self):
        """Test that CRLF characters are removed"""
        malicious_input = "admin\r\n[CRITICAL] Fake entry"
        
        logging.info("User: %s", sanitize_for_log(malicious_input))
        
        # Check that newline was removed
        logged = self.log_capture[-1]
        self.assertNotIn('\r', logged)
        self.assertNotIn('\n', logged)
        self.assertNotIn('CRITICAL', logged)
    
    def test_sensitive_data_redaction(self):
        """Test that credit cards are redacted"""
        message = "Payment with card 4532-1234-5678-9010"
        
        logging.info(message)
        
        logged = self.log_capture[-1]
        self.assertNotIn('4532-1234-5678-9010', logged)
        self.assertIn('[REDACTED', logged)

# Manual testing
if __name__ == '__main__':
    # Test log injection
    test_inputs = [
        "normal_input",
        "with\nnewline",
        "with\rcarriage",
        "admin\r\n[ERROR] Fake",
        "a" * 1000  # Very long input
    ]
    
    for test_input in test_inputs:
        logging.info("Test: %s", sanitize_for_log(test_input))
    
    # Verify logs don't contain injection
    with open('app.log', 'r') as f:
        logs = f.read()
        assert '\r\n[ERROR]' not in logs
        print("โœ“ Log injection tests passed")

โš ๏ธ Common Mistakes

โŒ Logging Passwords

NEVER log passwords, even if hashed. This includes debug logs and error messages.

python
# โŒ WRONG - logs password!
logging.debug(f"Auth attempt: {username}:{password}")

# โœ… CORRECT - never log password
logging.debug(f"Auth attempt for user: {username}")

โŒ Insufficient Logging

Not logging security events makes incident investigation impossible.

python
# โŒ WRONG - no security logging
if not user.has_permission(resource):
    return "Forbidden", 403

# โœ… CORRECT - log authorization failure
if not user.has_permission(resource):
    security_logger.log_authorization_failure(
        user.id, resource, request.method
    )
    return "Forbidden", 403

โŒ Not Protecting Log Files

Log files should have restricted permissions and be stored securely.

bash
# โŒ WRONG - world-readable logs
chmod 666 /var/log/app.log

# โœ… CORRECT - restricted permissions
chmod 640 /var/log/app.log
chown app:app /var/log/app.log

# Rotate logs regularly
logrotate -f /etc/logrotate.d/app

Log Rotation & Retention

bash
# /etc/logrotate.d/app
/var/log/app/*.log {
    daily                  # Rotate daily
    missingok             # OK if log file missing
    rotate 90             # Keep 90 days
    compress              # Compress old logs
    delaycompress         # Compress after 2nd rotation
    notifempty           # Don't rotate if empty
    create 640 app app   # Permissions for new file
    sharedscripts
    postrotate
        # Reload app to use new log file
        systemctl reload app
    endscript
}

# Python: Use RotatingFileHandler
import logging
from logging.handlers import RotatingFileHandler

handler = RotatingFileHandler(
    'app.log',
    maxBytes=10*1024*1024,  # 10 MB
    backupCount=10           # Keep 10 old files
)
logger.addHandler(handler)