Logging & Monitoring Best Practices
Risk Severity
๐ก Medium Fix Effort
โก Low (Quick Fix) Est. Time
โฑ๏ธ 2-4 hours Reference
A09:2021 CWE-117, CWE-532
Proper logging is critical for detecting attacks, investigating incidents, and meeting compliance requirements. However, logs can introduce vulnerabilities through injection, sensitive data exposure, and insufficient monitoring.
Security Logging Triad
Log the right things: Security events + errors
Protect the logs: No injection or sensitive data
Monitor effectively: Alerting + incident response
Protect the logs: No injection or sensitive data
Monitor effectively: Alerting + incident response
Log Injection Prevention
Log injection occurs when attackers insert malicious content into logs, potentially leading to log forging, CRLF injection, or exploitation of log analysis tools.
Vulnerable Logging
python
import logging
# โ VULNERABLE: Direct string interpolation
def vulnerable_login(username, password):
logging.info(f"Login attempt for user: {username}")
# Attacker username: "admin\n[2024-01-01] CRITICAL: System compromised"
# Creates fake log entry!
# โ VULNERABLE: CRLF injection
def vulnerable_log(user_input):
logger.info("User input: " + user_input)
# Attacker input: "test\r\nINFO: Malicious entry"
# โ VULNERABLE: Format string injection (older Python)
def vulnerable_format(username):
logging.info("Login: %s" % username)
# Can be exploited with format specifiersSecure Logging
python
import logging
import re
import json
# โ
SECURE: Use structured logging with parameterized messages
def secure_login(username, password):
# Use parameter substitution - logger handles escaping
logging.info("Login attempt for user: %s", username)
# OR use structured logging (preferred)
logging.info("Login attempt", extra={
'username': sanitize_for_log(username),
'event': 'login_attempt',
'source_ip': get_client_ip()
})
# โ
SECURE: Sanitize user input in logs
def sanitize_for_log(user_input: str) -> str:
"""Remove CRLF and control characters"""
if not isinstance(user_input, str):
return str(user_input)
# Remove newlines, carriage returns, and control characters
sanitized = re.sub(r'[\r\n\x00-\x1f\x7f]', '', user_input)
# Limit length
if len(sanitized) > 200:
sanitized = sanitized[:200] + "..."
return sanitized
# โ
SECURE: Structured JSON logging
import logging.config
import json_log_formatter
class CustomJSONFormatter(json_log_formatter.JSONFormatter):
def json_record(self, message, extra, record):
extra['timestamp'] = record.created
extra['level'] = record.levelname
extra['logger'] = record.name
# Sanitize all string values
for key, value in extra.items():
if isinstance(value, str):
extra[key] = sanitize_for_log(value)
return extra
# Configure structured logging
handler = logging.StreamHandler()
handler.setFormatter(CustomJSONFormatter())
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage - automatically sanitized and structured
logger.info("User action", extra={
'event': 'file_upload',
'username': username,
'filename': filename,
'size': filesize
})
# Output: {"timestamp": 1234567890, "level": "INFO", ...}Node.js: Winston Secure Logging
javascript
const winston = require('winston');
// โ VULNERABLE: String concatenation
logger.info('Login attempt: ' + username);
// โ
SECURE: Use winston with structured logging
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
// Sanitization function
function sanitizeForLog(input) {
if (typeof input !== 'string') return input;
// Remove CRLF and control characters
return input
.replace(/[\r\n\x00-\x1f\x7f]/g, '')
.substring(0, 200);
}
// โ
SECURE: Structured logging with metadata
logger.info('Login attempt', {
event: 'login',
username: sanitizeForLog(username),
sourceIP: req.ip,
userAgent: req.get('user-agent')
});
// โ
SECURE: Custom formatter with automatic sanitization
const sanitizingFormat = winston.format((info) => {
Object.keys(info).forEach(key => {
if (typeof info[key] === 'string') {
info[key] = sanitizeForLog(info[key]);
}
});
return info;
});
const secureLogger = winston.createLogger({
format: winston.format.combine(
sanitizingFormat(),
winston.format.timestamp(),
winston.format.json()
),
transports: [new winston.transports.Console()]
});Protecting Sensitive Data in Logs
Never Log These
โข Passwords (even hashed!)
โข Session tokens / API keys
โข Credit card numbers
โข Social security numbers
โข Personal health information
โข Encryption keys
โข Session tokens / API keys
โข Credit card numbers
โข Social security numbers
โข Personal health information
โข Encryption keys
Automatic Sensitive Data Redaction
python
import logging
import re
class SensitiveDataFilter(logging.Filter):
"""Filter to redact sensitive information from logs"""
# Patterns for sensitive data
PATTERNS = {
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'password': r'(password|passwd|pwd)["']?\s*[:=]\s*["']?([^"'\s]+)',
'api_key': r'(api[_-]?key|apikey|access[_-]?token)["']?\s*[:=]\s*["']?([A-Za-z0-9-_]{20,})',
'jwt': r'eyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_]+'
}
def filter(self, record):
"""Redact sensitive data from log message"""
if isinstance(record.msg, str):
record.msg = self.redact(record.msg)
# Also redact from args
if record.args:
record.args = tuple(
self.redact(str(arg)) if isinstance(arg, str) else arg
for arg in record.args
)
return True
def redact(self, text):
"""Replace sensitive patterns with [REDACTED]"""
for name, pattern in self.PATTERNS.items():
text = re.sub(pattern, f'[REDACTED-{name.upper()}]', text, flags=re.IGNORECASE)
return text
# Apply filter to logger
logger = logging.getLogger()
logger.addFilter(SensitiveDataFilter())
# Test - these will be automatically redacted
logger.info("User payment: 4532-1234-5678-9010")
# Output: User payment: [REDACTED-CREDIT_CARD]
logger.info("API request with key: sk_live_abc123xyz789")
# Output: API request with key: [REDACTED-API_KEY]
# โ
SECURE: Explicit field masking
def mask_sensitive_fields(data: dict) -> dict:
"""Mask sensitive fields in dictionary for logging"""
sensitive_keys = {'password', 'ssn', 'credit_card', 'api_key', 'token'}
masked = data.copy()
for key in masked:
if any(sensitive in key.lower() for sensitive in sensitive_keys):
if masked[key]:
# Show first 4 chars only
masked[key] = masked[key][:4] + '*' * (len(str(masked[key])) - 4)
return masked
# Usage
user_data = {
'username': 'john',
'password': 'secret123',
'email': 'john@example.com',
'credit_card': '4532123456789010'
}
logger.info("User data: %s", mask_sensitive_fields(user_data))
# Output: User data: {'username': 'john', 'password': 'secr*****', ...}Java: Log4j2 Pattern Masking
java
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.pattern.PatternConverter;
// log4j2.xml configuration
/*
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout>
<Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
<Replace regex="password["']?\s*[:=]\s*["']?([^"'\s]+)"
replacement="password=[REDACTED]"/>
<Replace regex="\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b"
replacement="[REDACTED-CC]"/>
</PatternLayout>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
*/
// โ
SECURE: Custom masking in Java
public class SensitiveDataMasker {
private static final Logger logger = LogManager.getLogger();
public static String maskCreditCard(String cc) {
if (cc == null || cc.length() < 8) return cc;
return "****-****-****-" + cc.substring(cc.length() - 4);
}
public static String maskEmail(String email) {
if (email == null || !email.contains("@")) return email;
String[] parts = email.split("@");
return parts[0].substring(0, 2) + "***@" + parts[1];
}
public static void logUserAction(String username, String action, Map<String, Object> data) {
// Mask sensitive fields
Map<String, Object> safeData = new HashMap<>(data);
if (safeData.containsKey("creditCard")) {
safeData.put("creditCard", maskCreditCard((String) safeData.get("creditCard")));
}
if (safeData.containsKey("email")) {
safeData.put("email", maskEmail((String) safeData.get("email")));
}
logger.info("User action: username={}, action={}, data={}",
username, action, safeData);
}
}What to Log for Security
โ Always Log
- โข Authentication events (success/failure)
- โข Authorization failures (403 Forbidden)
- โข Input validation failures
- โข Session management events
- โข Security-relevant config changes
- โข Data access (especially sensitive data)
- โข Application errors & exceptions
- โข Admin actions
๐ Include Context
- โข Timestamp (ISO 8601 format)
- โข User ID / Session ID
- โข Source IP address
- โข User agent
- โข Request ID (correlation)
- โข Action performed
- โข Result (success/failure)
- โข Resource accessed
Comprehensive Security Logging Example
python
import logging
import json
from datetime import datetime
from flask import request, g
class SecurityLogger:
"""Centralized security event logging"""
def __init__(self):
self.logger = logging.getLogger('security')
self.logger.setLevel(logging.INFO)
# JSON formatter for structured logs
handler = logging.FileHandler('security.log')
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def _get_context(self):
"""Get request context for logging"""
return {
'timestamp': datetime.utcnow().isoformat(),
'ip': request.remote_addr if request else None,
'user_id': g.user.id if hasattr(g, 'user') else None,
'session_id': g.session_id if hasattr(g, 'session_id') else None,
'user_agent': request.headers.get('User-Agent') if request else None,
'request_id': g.request_id if hasattr(g, 'request_id') else None
}
def log_authentication(self, username, success, reason=None):
"""Log authentication attempt"""
event = {
**self._get_context(),
'event_type': 'authentication',
'username': username,
'success': success,
'reason': reason
}
self.logger.info(json.dumps(event))
def log_authorization_failure(self, user_id, resource, action):
"""Log authorization failure (403)"""
event = {
**self._get_context(),
'event_type': 'authorization_failure',
'user_id': user_id,
'resource': resource,
'action': action
}
self.logger.warning(json.dumps(event))
def log_sensitive_data_access(self, user_id, data_type, record_id):
"""Log access to sensitive data"""
event = {
**self._get_context(),
'event_type': 'sensitive_data_access',
'user_id': user_id,
'data_type': data_type,
'record_id': record_id
}
self.logger.info(json.dumps(event))
def log_input_validation_failure(self, field, value, reason):
"""Log input validation failure (potential attack)"""
event = {
**self._get_context(),
'event_type': 'input_validation_failure',
'field': field,
'value': sanitize_for_log(value),
'reason': reason
}
self.logger.warning(json.dumps(event))
def log_admin_action(self, user_id, action, target, details=None):
"""Log administrative action"""
event = {
**self._get_context(),
'event_type': 'admin_action',
'user_id': user_id,
'action': action,
'target': target,
'details': details
}
self.logger.info(json.dumps(event))
# Usage
security_logger = SecurityLogger()
@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
password = request.form['password']
user = authenticate(username, password)
if user:
security_logger.log_authentication(username, True)
return "Login successful"
else:
security_logger.log_authentication(username, False, "Invalid credentials")
return "Login failed", 401
@app.route('/admin/delete-user/<user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
security_logger.log_admin_action(
g.user.id,
'delete_user',
user_id,
details={'reason': request.form.get('reason')}
)
# Delete user...
return "User deleted"Security Monitoring & Alerting
ELK Stack (Elasticsearch, Logstash, Kibana)
ruby
# Logstash filter configuration for security logs
filter {
# Parse JSON logs
json {
source => "message"
}
# Detect brute force attempts
if [event_type] == "authentication" and [success] == false {
# Count failed attempts per IP
aggregate {
task_id => "%{ip}"
code => "
map['failed_attempts'] ||= 0
map['failed_attempts'] += 1
# Alert if > 5 failed attempts in 5 minutes
if map['failed_attempts'] > 5
event.set('alert', 'brute_force_detected')
end
"
timeout => 300
}
}
# Detect suspicious patterns
if [event_type] == "input_validation_failure" {
if [reason] =~ /sql_injection|xss|path_traversal/ {
mutate {
add_field => { "alert" => "attack_detected" }
add_field => { "severity" => "high" }
}
}
}
# GeoIP lookup for suspicious countries
geoip {
source => "ip"
target => "geoip"
}
}
# Alert output
output {
# Send to Elasticsearch
elasticsearch {
hosts => ["localhost:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
}
# Send alerts to SIEM/Slack/Email
if [alert] {
http {
url => "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
format => "json"
http_method => "post"
}
}
}Simple Python Alerting
python
from collections import defaultdict
from datetime import datetime, timedelta
import smtplib
from email.message import EmailMessage
class SecurityAlertManager:
"""Detect and alert on suspicious patterns"""
def __init__(self):
self.failed_logins = defaultdict(list) # IP -> [timestamps]
self.alert_threshold = 5 # failures
self.time_window = timedelta(minutes=5)
def check_brute_force(self, ip_address):
"""Check for brute force attack"""
now = datetime.utcnow()
# Add current attempt
self.failed_logins[ip_address].append(now)
# Remove old attempts outside time window
cutoff = now - self.time_window
self.failed_logins[ip_address] = [
ts for ts in self.failed_logins[ip_address]
if ts > cutoff
]
# Check if threshold exceeded
if len(self.failed_logins[ip_address]) >= self.alert_threshold:
self.send_alert(
f"Brute force detected from {ip_address}",
f"{len(self.failed_logins[ip_address])} failed attempts in 5 minutes"
)
# Block IP or trigger other response
return True
return False
def send_alert(self, subject, body):
"""Send email alert"""
msg = EmailMessage()
msg.set_content(body)
msg['Subject'] = f"[SECURITY ALERT] {subject}"
msg['From'] = "security@example.com"
msg['To'] = "admin@example.com"
# Send via SMTP
with smtplib.SMTP('localhost') as s:
s.send_message(msg)
# Also log
logging.critical(f"ALERT: {subject} - {body}")
# Usage with Flask
alert_manager = SecurityAlertManager()
@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
password = request.form['password']
if not authenticate(username, password):
# Check for brute force
if alert_manager.check_brute_force(request.remote_addr):
return "Too many failed attempts", 429
return "Login failed", 401
return "Success"๐งช Testing Verification
Test Log Injection Prevention
python
import logging
import unittest
class TestLogInjection(unittest.TestCase):
def setUp(self):
# Capture logs
self.log_capture = []
handler = logging.Handler()
handler.emit = lambda record: self.log_capture.append(record.getMessage())
logging.getLogger().addHandler(handler)
def test_crlf_injection(self):
"""Test that CRLF characters are removed"""
malicious_input = "admin\r\n[CRITICAL] Fake entry"
logging.info("User: %s", sanitize_for_log(malicious_input))
# Check that newline was removed
logged = self.log_capture[-1]
self.assertNotIn('\r', logged)
self.assertNotIn('\n', logged)
self.assertNotIn('CRITICAL', logged)
def test_sensitive_data_redaction(self):
"""Test that credit cards are redacted"""
message = "Payment with card 4532-1234-5678-9010"
logging.info(message)
logged = self.log_capture[-1]
self.assertNotIn('4532-1234-5678-9010', logged)
self.assertIn('[REDACTED', logged)
# Manual testing
if __name__ == '__main__':
# Test log injection
test_inputs = [
"normal_input",
"with\nnewline",
"with\rcarriage",
"admin\r\n[ERROR] Fake",
"a" * 1000 # Very long input
]
for test_input in test_inputs:
logging.info("Test: %s", sanitize_for_log(test_input))
# Verify logs don't contain injection
with open('app.log', 'r') as f:
logs = f.read()
assert '\r\n[ERROR]' not in logs
print("โ Log injection tests passed")โ ๏ธ Common Mistakes
โ Logging Passwords
NEVER log passwords, even if hashed. This includes debug logs and error messages.
python
# โ WRONG - logs password!
logging.debug(f"Auth attempt: {username}:{password}")
# โ
CORRECT - never log password
logging.debug(f"Auth attempt for user: {username}")โ Insufficient Logging
Not logging security events makes incident investigation impossible.
python
# โ WRONG - no security logging
if not user.has_permission(resource):
return "Forbidden", 403
# โ
CORRECT - log authorization failure
if not user.has_permission(resource):
security_logger.log_authorization_failure(
user.id, resource, request.method
)
return "Forbidden", 403โ Not Protecting Log Files
Log files should have restricted permissions and be stored securely.
bash
# โ WRONG - world-readable logs
chmod 666 /var/log/app.log
# โ
CORRECT - restricted permissions
chmod 640 /var/log/app.log
chown app:app /var/log/app.log
# Rotate logs regularly
logrotate -f /etc/logrotate.d/appLog Rotation & Retention
bash
# /etc/logrotate.d/app
/var/log/app/*.log {
daily # Rotate daily
missingok # OK if log file missing
rotate 90 # Keep 90 days
compress # Compress old logs
delaycompress # Compress after 2nd rotation
notifempty # Don't rotate if empty
create 640 app app # Permissions for new file
sharedscripts
postrotate
# Reload app to use new log file
systemctl reload app
endscript
}
# Python: Use RotatingFileHandler
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
'app.log',
maxBytes=10*1024*1024, # 10 MB
backupCount=10 # Keep 10 old files
)
logger.addHandler(handler)