完整的 Python 日志指南:最佳实践与实施
为什么正确的日志记录很重要
在深入探讨技术细节之前,让我们先了解一下为什么正确的日志记录很重要:
Python 日志快速入门
对于 Python 日志记录的新手,这里有一个使用 **logging.basicConfig** 的基本示例:
# Simple python logging example
import logging
# Basic logger in python example
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create a logger
logger = logging.getLogger(__name__)
# Logger in python example
logger.info("This is an information message")
logger.warning("This is a warning message")此示例演示了 python 中日志模块的基础知识,并展示了如何在应用程序中使用 python 记录器日志。
Python 日志模块入门
基本设置
让我们从一个简单的日志配置开始:
import logging
# Basic configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Your first logger
logger = logging.getLogger(__name__)
# Using the logger
logger.info("Application started")
logger.warning("Watch out!")
logger.error("Something went wrong")了解日志级别
Python 日志记录有五个标准级别:
超越 print() 语句
为什么选择记录而不是打印语句?
配置你的日志系统
基本配置选项
logging.basicConfig(
filename='app.log',
filemode='w',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S'
)高级配置
对于更复杂的应用程序:
config = {
'version': 1,
'formatters': {
'detailed': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'detailed'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'app.log',
'level': 'DEBUG',
'formatter': 'detailed'
}
},
'loggers': {
'myapp': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
'propagate': True
}
}
}
logging.config.dictConfig(config)使用高级日志记录
结构化日志
结构化日志记录提供了一致的、机器可读的格式,这对于日志分析和监控至关重要。如需全面了解结构化日志记录模式和最佳实践,请查看结构化日志记录指南。让我们用 Python 实现结构化日志记录:
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
def __init__(self):
super().__init__()
def format(self, record):
# Create base log record
log_obj = {
"timestamp": self.formatTime(record, self.datefmt),
"name": record.name,
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add exception info if present
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
# Add custom fields from extra
if hasattr(record, "extra_fields"):
log_obj.update(record.extra_fields)
return json.dumps(log_obj)
# Usage Example
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# Log with extra fields
logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})错误管理
正确的错误日志记录对于调试生产问题至关重要。以下是一种全面的方法:
import traceback
import sys
from contextlib import contextmanager
class ErrorLogger:
def __init__(self, logger):
self.logger = logger
@contextmanager
def error_context(self, operation_name, **context):
"""Context manager for error logging with additional context"""
try:
yield
except Exception as e:
# Capture the current stack trace
exc_type, exc_value, exc_traceback = sys.exc_info()
# Format error details
error_details = {
"operation": operation_name,
"error_type": exc_type.__name__,
"error_message": str(exc_value),
"context": context,
"stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback)
}
# Log the error with full context
self.logger.error(
f"Error in {operation_name}: {str(exc_value)}",
extra={"error_details": error_details}
)
# Re-raise the exception
raise
# Usage Example
logger = logging.getLogger(__name__)
error_logger = ErrorLogger(logger)
with error_logger.error_context("user_authentication", user_id="123", attempt=2):
# Your code that might raise an exception
authenticate_user(user_id)并发日志
在多线程应用中登录时,需要保证线程安全:
import threading
import logging
from queue import Queue
from logging.handlers import QueueHandler, QueueListener
def setup_thread_safe_logging():
"""Set up thread-safe logging with a queue"""
# Create the queue
log_queue = Queue()
# Create handlers
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('app.log')
# Create queue handler and listener
queue_handler = QueueHandler(log_queue)
listener = QueueListener(
log_queue,
console_handler,
file_handler,
respect_handler_level=True
)
# Configure root logger
root_logger = logging.getLogger()
root_logger.addHandler(queue_handler)
# Start the listener in a separate thread
listener.start()
return listener
# Usage
listener = setup_thread_safe_logging()
def worker_function():
logger = logging.getLogger(__name__)
logger.info(f"Worker thread {threading.current_thread().name} starting")
# Do work...
logger.info(f"Worker thread {threading.current_thread().name} finished")
# Create and start threads
threads = [
threading.Thread(target=worker_function)
for _ in range(3)
]
for thread in threads:
thread.start()不同环境中的日志记录
不同的应用环境需要特定的日志记录方法。无论您使用的是 Web 应用程序、微服务还是后台任务,每个环境都有独特的日志记录要求和最佳实践。让我们探索如何在各种部署场景中实现有效的日志记录。
Web 应用程序日志记录
Django 日志配置
以下是全面的 Django 日志设置:
# settings.py
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'filters': {
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'console': {
'level': 'INFO',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'simple'
},
'file': {
'level': 'ERROR',
'class': 'logging.FileHandler',
'filename': 'django-errors.log',
'formatter': 'verbose'
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'include_html': True,
}
},
'loggers': {
'django': {
'handlers': ['console'],
'propagate': True,
},
'django.request': {
'handlers': ['file', 'mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myapp': {
'handlers': ['console', 'file'],
'level': 'INFO',
}
}
}Flask 日志设置
Flask 提供了自己的可定制的日志系统:
import logging
from logging.handlers import RotatingFileHandler
from flask import Flask, request
app = Flask(__name__)
def setup_logger():
# Create formatter
formatter = logging.Formatter(
'[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
)
# File Handler
file_handler = RotatingFileHandler(
'flask_app.log',
maxBytes=10485760, # 10MB
backupCount=10
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
# Add request context
class RequestFormatter(logging.Formatter):
def format(self, record):
record.url = request.url
record.remote_addr = request.remote_addr
return super().format(record)
# Configure app logger
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
return app.logger
# Usage in routes
@app.route('/api/endpoint')
def api_endpoint():
app.logger.info(f'Request received from {request.remote_addr}')
# Your code here
return jsonify({'status': 'success'})FastAPI 日志记录实践
FastAPI 可以通过一些中间件增强功能来利用 Python 的日志记录:
from fastapi import FastAPI, Request
from typing import Callable
import logging
import time
app = FastAPI()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Middleware for request logging
@app.middleware("http")
async def log_requests(request: Request, call_next: Callable):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
log_dict = {
"url": str(request.url),
"method": request.method,
"client_ip": request.client.host,
"duration": f"{duration:.2f}s",
"status_code": response.status_code
}
logger.info(f"Request processed: {log_dict}")
return response
# Example endpoint with logging
@app.get("/items/{item_id}")
async def read_item(item_id: int):
logger.info(f"Retrieving item {item_id}")
# Your code here
return {"item_id": item_id}微服务日志
对于微服务,分布式跟踪和关联ID至关重要:
import logging
import contextvars
from uuid import uuid4
# Create context variable for trace ID
trace_id_var = contextvars.ContextVar('trace_id', default=None)
class TraceIDFilter(logging.Filter):
def filter(self, record):
trace_id = trace_id_var.get()
record.trace_id = trace_id if trace_id else 'no_trace'
return True
def setup_microservice_logging(service_name):
logger = logging.getLogger(service_name)
# Create formatter with trace ID
formatter = logging.Formatter(
'%(asctime)s - %(name)s - [%(trace_id)s] - %(levelname)s - %(message)s'
)
# Add handlers with trace ID filter
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.addFilter(TraceIDFilter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# Usage in microservice
logger = setup_microservice_logging('order_service')
def process_order(order_data):
# Generate or get trace ID from request
trace_id_var.set(str(uuid4()))
logger.info("Starting order processing", extra={
'order_id': order_data['id'],
'customer_id': order_data['customer_id']
})
# Process order...
logger.info("Order processed successfully")后台任务日志记录
对于后台任务,我们需要确保正确的日志处理和轮换:
from logging.handlers import RotatingFileHandler
import logging
import threading
from datetime import datetime
class BackgroundTaskLogger:
def __init__(self, task_name):
self.logger = logging.getLogger(f'background_task.{task_name}')
self.setup_logging()
def setup_logging(self):
# Create logs directory if it doesn't exist
import os
os.makedirs('logs', exist_ok=True)
# Setup rotating file handler
handler = RotatingFileHandler(
filename=f'logs/task_{datetime.now():%Y%m%d}.log',
maxBytes=5*1024*1024, # 5MB
backupCount=5
)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - [%(threadName)s] - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_task_status(self, status, **kwargs):
"""Log task status with additional context"""
extra = {
'thread_id': threading.get_ident(),
'timestamp': datetime.now().isoformat(),
**kwargs
}
self.logger.info(f"Task status: {status}", extra=extra)
# Usage example
def background_job():
logger = BackgroundTaskLogger('data_processing')
try:
logger.log_task_status('started', job_id=123)
# Do some work...
logger.log_task_status('completed', records_processed=1000)
except Exception as e:
logger.logger.error(f"Task failed: {str(e)}", exc_info=True)常见的日志记录模式和解决方案
请求 ID 跟踪
在您的应用程序中实施请求跟踪:
import logging
from contextlib import contextmanager
import threading
import uuid
# Store request ID in thread-local storage
_request_id = threading.local()
class RequestIDFilter(logging.Filter):
def filter(self, record):
record.request_id = getattr(_request_id, 'id', 'no_request_id')
return True
@contextmanager
def request_context(request_id=None):
"""Context manager for request tracking"""
if request_id is None:
request_id = str(uuid.uuid4())
old_id = getattr(_request_id, 'id', None)
_request_id.id = request_id
try:
yield request_id
finally:
if old_id is None:
del _request_id.id
else:
_request_id.id = old_id
# Setup logging with request ID
def setup_request_logging():
logger = logging.getLogger()
formatter = logging.Formatter(
'%(asctime)s - [%(request_id)s] - %(levelname)s - %(message)s'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.addFilter(RequestIDFilter())
logger.addHandler(handler)
return logger
# Usage example
logger = setup_request_logging()
def process_request(data):
with request_context() as request_id:
logger.info("Processing request", extra={
'data': data,
'operation': 'process_request'
})
# Process the request...
logger.info("Request processed successfully")用户活动记录
安全地跟踪用户操作:
import logging
from datetime import datetime
from typing import Dict, Any
class UserActivityLogger:
def __init__(self):
self.logger = logging.getLogger('user_activity')
self.setup_logger()
def setup_logger(self):
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - '
'[User: %(user_id)s] %(message)s'
)
handler = logging.FileHandler('user_activity.log')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_activity(
self,
user_id: str,
action: str,
resource: str,
details: Dict[str, Any] = None
):
"""Log user activity with context"""
activity_data = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'action': action,
'resource': resource,
'details': details or {}
}
self.logger.info(
f"User performed {action} on {resource}",
extra={
'user_id': user_id,
'activity_data': activity_data
}
)
return activity_data
# Usage example
activity_logger = UserActivityLogger()
def update_user_profile(user_id: str, profile_data: dict):
try:
# Update profile logic here...
activity_logger.log_activity(
user_id=user_id,
action='update_profile',
resource='user_profile',
details={
'updated_fields': list(profile_data.keys()),
'source_ip': '192.168.1.1'
}
)
except Exception as e:
activity_logger.logger.error(
f"Profile update failed for user {user_id}",
extra={'user_id': user_id, 'error': str(e)},
exc_info=True
)故障排除和调试
有效地排除日志问题需要了解常见问题及其解决方案。本节介绍开发人员在实施日志时面临的最常见挑战,并提供了调试日志配置的实用解决方案。
常见日志记录问题
缺少日志条目
# Common problem: Logs not appearing due to incorrect log level
import logging
# Wrong way
logger = logging.getLogger(__name__)
logger.debug("This won't appear") # No handler and wrong level
# Correct way
def setup_proper_logging():
logger = logging.getLogger(__name__)
# Set the base logger level
logger.setLevel(logging.DEBUG)
# Create and configure handler
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
# Add formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
return logger性能瓶颈
import logging
import time
from functools import wraps
class PerformanceLoggingHandler(logging.Handler):
def __init__(self):
super().__init__()
self.log_times = []
def emit(self, record):
self.log_times.append(time.time())
if len(self.log_times) > 1000:
time_diff = self.log_times[-1] - self.log_times[0]
if time_diff < 1: # More than 1000 logs per second
print(f"Warning: High logging rate detected: {len(self.log_times)/time_diff} logs/second")
self.log_times = self.log_times[-100:]
def log_performance(logger):
"""Decorator to measure and log function performance"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
logger.info(
f"Function {func.__name__} took {execution_time:.4f} seconds",
extra={
'execution_time': execution_time,
'function_name': func.__name__
}
)
return result
return wrapper
return decorator常见的日志记录陷阱和解决方案
配置问题
# Common Mistake 1: Not setting the log level properly
logger = logging.getLogger(__name__) # Will not show DEBUG messages
logger.debug("This won't appear")
# Solution: Configure both logger and handler levels
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Common Mistake 2: Incorrect basicConfig usage
logging.basicConfig(level=logging.INFO) # Called after handler was added - won't work
logger.info("Message")
# Solution: Configure logging before creating loggers
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Message")内存和资源问题
# Common Mistake 3: Creating handlers in loops
def process_item(item): # DON'T DO THIS
logger = logging.getLogger(__name__)
handler = logging.FileHandler('app.log') # Memory leak!
logger.addHandler(handler)
logger.info(f"Processing {item}")
# Solution: Create handlers outside loops
logger = logging.getLogger(__name__)
handler = logging.FileHandler('app.log')
logger.addHandler(handler)
def process_item(item):
logger.info(f"Processing {item}")
# Common Mistake 4: Not closing file handlers
handler = logging.FileHandler('app.log')
logger.addHandler(handler)
# ... later
# handler.close() # Forgotten!
# Solution: Use context manager
from contextlib import contextmanager
@contextmanager
def log_file_handler(filename):
handler = logging.FileHandler(filename)
logger.addHandler(handler)
try:
yield
finally:
handler.close()
logger.removeHandler(handler)格式字符串和性能问题
# Common Mistake 5: Inefficient string formatting
logger.error("Error occurred: {}".format(error)) # Inefficient
logger.error(f"Error occurred: {error}") # Less efficient for logging
# Solution: Use %-formatting for better performance
logger.error("Error occurred: %s", error)
# Common Mistake 6: Expensive operations in debug statements
logger.debug(f"Complex data: {expensive_operation()}") # Operation runs even if debug is disabled
# Solution: Use lazy evaluation
logger.debug("Complex data: %s", expensive_operation) if logger.isEnabledFor(logging.DEBUG) else None处理程序配置陷阱
# Common Mistake 7: Multiple handler addition
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.addHandler(handler) # Duplicate handler!
# Solution: Check for existing handlers
if not logger.handlers:
handler = logging.StreamHandler()
logger.addHandler(handler)
# Common Mistake 8: Incorrect propagation handling
child_logger = logging.getLogger('parent.child')
parent_logger = logging.getLogger('parent')
# Messages appear twice due to propagation
# Solution: Control propagation explicitly
child_logger.propagate = False # If you don't want propagation线程安全注意事项
# Common Mistake 9: Unsafe handler modification
def setup_logging(): # DON'T DO THIS
logger = logging.getLogger()
logger.handlers.clear() # Unsafe in multi-threaded environment
# Solution: Use lock for handler modifications
import threading
_handler_lock = threading.Lock()
def setup_logging():
with _handler_lock:
logger = logging.getLogger()
existing_handlers = logger.handlers.copy()
for handler in existing_handlers:
logger.removeHandler(handler)
handler.close()配置文件问题
# Common Mistake 10: Hardcoded paths in config
LOGGING = {
'handlers': {
'file': {
'filename': '/absolute/path/app.log', # Breaks portability
}
}
}
# Solution: Use environment variables or relative paths
import os
LOGGING = {
'handlers': {
'file': {
'filename': os.getenv('LOG_FILE', 'app.log'),
}
}
}测试你的日志记录
使用日志进行单元测试
import unittest
from unittest.mock import patch
import logging
import io
class TestLogging(unittest.TestCase):
def setUp(self):
# Create logger for testing
self.logger = logging.getLogger('test_logger')
self.logger.setLevel(logging.DEBUG)
# Create string IO for capturing log output
self.log_output = io.StringIO()
# Create handler that writes to string IO
self.handler = logging.StreamHandler(self.log_output)
self.handler.setLevel(logging.DEBUG)
# Add handler to logger
self.logger.addHandler(self.handler)
def test_log_level_filtering(self):
"""Test that log levels are properly filtered"""
self.logger.debug("Debug message")
self.logger.info("Info message")
log_contents = self.log_output.getvalue()
self.assertIn("Debug message", log_contents)
self.assertIn("Info message", log_contents)
def test_exception_logging(self):
"""Test exception logging functionality"""
try:
raise ValueError("Test error")
except ValueError:
self.logger.exception("An error occurred")
log_contents = self.log_output.getvalue()
self.assertIn("An error occurred", log_contents)
self.assertIn("ValueError: Test error", log_contents)
self.assertIn("Traceback", log_contents)
def tearDown(self):
# Clean up
self.handler.close()
self.log_output.close()使用模拟记录器进行测试
import unittest
from unittest.mock import patch, MagicMock
from your_module import YourClass
class TestWithMockedLogger(unittest.TestCase):
@patch('logging.getLogger')
def test_logger_calls(self, mock_get_logger):
# Create mock logger
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
# Create instance of your class
instance = YourClass()
# Perform some operation that should log
instance.do_something()
# Assert logging calls
mock_logger.info.assert_called_with(
"Operation completed",
extra={'operation': 'do_something'}
)
# Check number of calls
self.assertEqual(mock_logger.error.call_count, 0)
self.assertEqual(mock_logger.info.call_count, 1)
@patch('logging.getLogger')
def test_error_logging(self, mock_get_logger):
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
instance = YourClass()
# Trigger an error condition
instance.problematic_operation()
# Verify error was logged
mock_logger.error.assert_called_once()
args, kwargs = mock_logger.error.call_args
self.assertIn("Operation failed", args[0])替代日志解决方案
洛古鲁
Loguru 提供了一个更简单的日志记录界面,具有强大的开箱即用功能:
from loguru import logger
import sys
# Basic Loguru configuration
logger.remove() # Remove default handler
logger.add(
sys.stdout,
colorize=True,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name} :{function} :{line} - {message} "
)
logger.add(
"app_{time}.log",
rotation="500 MB",
retention="10 days",
compression="zip"
)
# Advanced Loguru usage
from loguru import logger
import contextlib
# Custom format for structured logging
logger.add(
"structured.json",
format="{time} {level} {message}",
serialize=True # Enables JSON formatting
)
# Context manager for transaction logging
@contextlib.contextmanager
def transaction_context(transaction_id: str):
logger.info(f"Starting transaction {transaction_id}")
try:
yield
logger.info(f"Transaction {transaction_id} completed successfully")
except Exception as e:
logger.exception(f"Transaction {transaction_id} failed")
raise
# Example usage
def process_order(order_id: str):
with transaction_context(f"order_{order_id}"):
logger.debug("Processing order", order_id=order_id)
# Order processing logic
logger.info("Order processed", status="completed", order_id=order_id)结构日志
Structlog 非常适合具有上下文的结构化日志记录:
import structlog
import time
from typing import Any, Dict
# Configure structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
wrapper_class=structlog.BoundLogger,
cache_logger_on_first_use=True,
)
# Create logger instance
logger = structlog.get_logger()
class StructuredLogging:
def __init__(self):
self.logger = structlog.get_logger()
def with_context(self, **context) -> 'StructuredLogging':
"""Create a new logger with additional context"""
self.logger = self.logger.bind(**context)
return self
def log_operation(self, operation: str, **kwargs):
"""Log an operation with timing and context"""
start_time = time.time()
try:
result = operation(**kwargs)
duration = time.time() - start_time
self.logger.info(
"operation_completed",
operation_name=operation.__name__,
duration=duration,
status="success",
**kwargs
)
return result
except Exception as e:
duration = time.time() - start_time
self.logger.error(
"operation_failed",
operation_name=operation.__name__,
duration=duration,
error=str(e),
error_type=type(e).__name__,
**kwargs
)
raise
# Usage example
structured_logger = StructuredLogging()
def process_user_data(user_id: str, data: Dict[str, Any]):
logger = structured_logger.with_context(
user_id=user_id,
service="user_service"
)
logger.logger.info("processing_user_data", data_size=len(data))
# Process data...
logger.logger.info("user_data_processed", status="success")Python JSON 记录器
对于 JSON 格式的日志记录:
from pythonjsonlogger import jsonlogger
import logging
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
# Add custom fields
log_record['timestamp'] = datetime.datetime.now(datetime.UTC).isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
if hasattr(record, 'props'):
log_record.update(record.props)
def setup_json_logging():
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# Usage example
logger = setup_json_logging()
def log_event(event_type: str, **props):
logger.info(
f"Event: {event_type}",
extra={
'props': {
'event_type': event_type,
'timestamp': datetime.datetime.now(datetime.UTC).isoformat(),
**props
}
}
)最佳实践和指南
伐木标准
# Example of implementing logging standards
import logging
from typing import Optional, Dict, Any
class StandardizedLogger:
def __init__(self, service_name: str):
self.logger = logging.getLogger(service_name)
self.service_name = service_name
self._setup_logging()
def _setup_logging(self):
"""Setup standardized logging configuration"""
formatter = logging.Formatter(
'[%(asctime)s] - %(name)s - %(levelname)s - %(message)s'
' {service: %(service)s, trace_id: %(trace_id)s}'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def _log(self, level: str, message: str, trace_id: Optional[str] = None, **kwargs):
"""Standardized log method"""
extra = {
'service': self.service_name,
'trace_id': trace_id or 'no_trace',
**kwargs
}
getattr(self.logger, level)(
message,
extra=extra
)
def info(self, message: str, **kwargs):
self._log('info', message, **kwargs)
def error(self, message: str, **kwargs):
self._log('error', message, **kwargs)性能优化
import logging
from functools import lru_cache
from typing import Dict, Any
import queue
import threading
class PerformantLogger:
def __init__(self):
self.log_queue = queue.Queue(maxsize=1000)
self.running = True
self._start_worker()
def _start_worker(self):
"""Start background worker for async logging"""
def worker():
while self.running:
try:
record = self.log_queue.get(timeout=1)
self._process_log(record)
except queue.Empty:
continue
self.worker_thread = threading.Thread(target=worker, daemon=True)
self.worker_thread.start()
@lru_cache(maxsize=100)
def _format_message(self, template: str, **kwargs) -> str:
"""Cache commonly used log message templates"""
return template.format(**kwargs)
def _process_log(self, record: Dict[str, Any]):
"""Process log record"""
# Actual logging logic here
pass
def log(self, level: str, message: str, **kwargs):
"""Asynchronous logging method"""
record = {
'level': level,
'message': self._format_message(message, **kwargs),
'extra': kwargs
}
try:
self.log_queue.put_nowait(record)
except queue.Full:
# Handle queue full scenario
pass案例研究
实际实施:电子商务平台
import logging
from datetime import datetime
from typing import Dict, Any
class EcommerceLogger:
def __init__(self):
self.logger = logging.getLogger('ecommerce')
self._setup_logging()
def _setup_logging(self):
# Configure handlers for different components
self._add_transaction_handler()
self._add_inventory_handler()
self._add_user_activity_handler()
def _add_transaction_handler(self):
handler = logging.FileHandler('transactions.log')
handler.setFormatter(
logging.Formatter(
'%(asctime)s - %(transaction_id)s - %(message)s'
)
)
self.logger.addHandler(handler)
def log_order(self, order_id: str, user_id: str, amount: float):
"""Log order processing"""
self.logger.info(
f"Processing order {order_id}",
extra={
'transaction_id': order_id,
'user_id': user_id,
'amount': amount,
'timestamp': datetime.utcnow().isoformat()
}
)
# Example usage
class OrderProcessor:
def __init__(self):
self.logger = EcommerceLogger()
def process_order(self, order_data: Dict[str, Any]):
try:
# Log order initiation
self.logger.log_order(
order_data['order_id'],
order_data['user_id'],
order_data['amount']
)
# Process order...
# Log success
self.logger.logger.info(
f"Order {order_data['order_id']} processed successfully"
)
except Exception as e:
self.logger.logger.error(
f"Order processing failed: {str(e)}",
exc_info=True
)微服务架构示例
import logging
import json
from typing import Dict, Any
from datetime import datetime
class MicroserviceLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = self._configure_logger()
def _configure_logger(self):
logger = logging.getLogger(self.service_name)
# JSON formatter for structured logging
formatter = logging.Formatter(
json.dumps({
'timestamp': '%(asctime)s',
'service': '%(service_name)s',
'level': '%(levelname)s',
'message': '%(message)s',
'trace_id': '%(trace_id)s',
'additional_data': '%(additional_data)s'
})
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def log(self, level: str, message: str, trace_id: str, **kwargs):
"""Log with distributed tracing context"""
extra = {
'service_name': self.service_name,
'trace_id': trace_id,
'additional_data': json.dumps(kwargs)
}
getattr(self.logger, level)(
message,
extra=extra
)
# Example usage in a microservice
class PaymentService:
def __init__(self):
self.logger = MicroserviceLogger('payment-service')
def process_payment(self, payment_data: Dict[str, Any], trace_id: str):
self.logger.log(
'info',
'Processing payment',
trace_id,
payment_id=payment_data['id'],
amount=payment_data['amount']
)
# Process payment...
self.logger.log(
'info',
'Payment processed successfully',
trace_id,
payment_id=payment_data['id']
)结论
关键要点
实施检查表
def logging_implementation_checklist() -> Dict[str, bool]:
return {
# Basic Setup
"basic_configuration": True,
"log_levels_defined": True,
"handlers_configured": True,
# Security
"sensitive_data_filtered": True,
"access_controls_implemented": True,
"compliance_checked": True,
# Performance
"log_rotation_configured": True,
"async_logging_implemented": True,
"sampling_strategy_defined": True,
# Monitoring
"alerts_configured": True,
"metrics_collected": True,
"dashboards_created": True
}其他资源
本指南涵盖了 Python 日志记录的基本方面,从基本设置到高级实现。请记住,日志记录是应用程序可观察性和维护不可或缺的一部分。请谨慎实施并定期维护以获得最佳效果。
随着应用程序的发展和新需求的出现,请记住定期检查和更新您的日志记录实现。