完整的 Python 日志指南:最佳实践与实施
为什么正确的日志记录很重要
在深入探讨技术细节之前,让我们先了解一下为什么正确的日志记录很重要:
Python 日志快速入门
对于 Python 日志记录的新手,这里有一个使用 **logging.basicConfig** 的基本示例:
# Simple python logging example import logging # Basic logger in python example logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a logger logger = logging.getLogger(__name__) # Logger in python example logger.info("This is an information message") logger.warning("This is a warning message")
此示例演示了 python 中日志模块的基础知识,并展示了如何在应用程序中使用 python 记录器日志。
Python 日志模块入门
基本设置
让我们从一个简单的日志配置开始:
import logging # Basic configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Your first logger logger = logging.getLogger(__name__) # Using the logger logger.info("Application started") logger.warning("Watch out!") logger.error("Something went wrong")
了解日志级别
Python 日志记录有五个标准级别:
超越 print() 语句
为什么选择记录而不是打印语句?
配置你的日志系统
基本配置选项
logging.basicConfig( filename='app.log', filemode='w', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S' )
高级配置
对于更复杂的应用程序:
config = { 'version': 1, 'formatters': { 'detailed': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' } }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'INFO', 'formatter': 'detailed' }, 'file': { 'class': 'logging.FileHandler', 'filename': 'app.log', 'level': 'DEBUG', 'formatter': 'detailed' } }, 'loggers': { 'myapp': { 'handlers': ['console', 'file'], 'level': 'DEBUG', 'propagate': True } } } logging.config.dictConfig(config)
使用高级日志记录
结构化日志
结构化日志记录提供了一致的、机器可读的格式,这对于日志分析和监控至关重要。如需全面了解结构化日志记录模式和最佳实践,请查看结构化日志记录指南。让我们用 Python 实现结构化日志记录:
import json import logging from datetime import datetime class JSONFormatter(logging.Formatter): def __init__(self): super().__init__() def format(self, record): # Create base log record log_obj = { "timestamp": self.formatTime(record, self.datefmt), "name": record.name, "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Add exception info if present if record.exc_info: log_obj["exception"] = self.formatException(record.exc_info) # Add custom fields from extra if hasattr(record, "extra_fields"): log_obj.update(record.extra_fields) return json.dumps(log_obj) # Usage Example logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) # Log with extra fields logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})
错误管理
正确的错误日志记录对于调试生产问题至关重要。以下是一种全面的方法:
import traceback import sys from contextlib import contextmanager class ErrorLogger: def __init__(self, logger): self.logger = logger @contextmanager def error_context(self, operation_name, **context): """Context manager for error logging with additional context""" try: yield except Exception as e: # Capture the current stack trace exc_type, exc_value, exc_traceback = sys.exc_info() # Format error details error_details = { "operation": operation_name, "error_type": exc_type.__name__, "error_message": str(exc_value), "context": context, "stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback) } # Log the error with full context self.logger.error( f"Error in {operation_name}: {str(exc_value)}", extra={"error_details": error_details} ) # Re-raise the exception raise # Usage Example logger = logging.getLogger(__name__) error_logger = ErrorLogger(logger) with error_logger.error_context("user_authentication", user_id="123", attempt=2): # Your code that might raise an exception authenticate_user(user_id)
并发日志
在多线程应用中登录时,需要保证线程安全:
import threading import logging from queue import Queue from logging.handlers import QueueHandler, QueueListener def setup_thread_safe_logging(): """Set up thread-safe logging with a queue""" # Create the queue log_queue = Queue() # Create handlers console_handler = logging.StreamHandler() file_handler = logging.FileHandler('app.log') # Create queue handler and listener queue_handler = QueueHandler(log_queue) listener = QueueListener( log_queue, console_handler, file_handler, respect_handler_level=True ) # Configure root logger root_logger = logging.getLogger() root_logger.addHandler(queue_handler) # Start the listener in a separate thread listener.start() return listener # Usage listener = setup_thread_safe_logging() def worker_function(): logger = logging.getLogger(__name__) logger.info(f"Worker thread {threading.current_thread().name} starting") # Do work... logger.info(f"Worker thread {threading.current_thread().name} finished") # Create and start threads threads = [ threading.Thread(target=worker_function) for _ in range(3) ] for thread in threads: thread.start()
不同环境中的日志记录
不同的应用环境需要特定的日志记录方法。无论您使用的是 Web 应用程序、微服务还是后台任务,每个环境都有独特的日志记录要求和最佳实践。让我们探索如何在各种部署场景中实现有效的日志记录。
Web 应用程序日志记录
Django 日志配置
以下是全面的 Django 日志设置:
# settings.py LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'verbose': { 'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}', 'style': '{', }, 'simple': { 'format': '{levelname} {message}', 'style': '{', }, }, 'filters': { 'require_debug_true': { '()': 'django.utils.log.RequireDebugTrue', }, }, 'handlers': { 'console': { 'level': 'INFO', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'simple' }, 'file': { 'level': 'ERROR', 'class': 'logging.FileHandler', 'filename': 'django-errors.log', 'formatter': 'verbose' }, 'mail_admins': { 'level': 'ERROR', 'class': 'django.utils.log.AdminEmailHandler', 'include_html': True, } }, 'loggers': { 'django': { 'handlers': ['console'], 'propagate': True, }, 'django.request': { 'handlers': ['file', 'mail_admins'], 'level': 'ERROR', 'propagate': False, }, 'myapp': { 'handlers': ['console', 'file'], 'level': 'INFO', } } }
Flask 日志设置
Flask 提供了自己的可定制的日志系统:
import logging from logging.handlers import RotatingFileHandler from flask import Flask, request app = Flask(__name__) def setup_logger(): # Create formatter formatter = logging.Formatter( '[%(asctime)s] %(levelname)s in %(module)s: %(message)s' ) # File Handler file_handler = RotatingFileHandler( 'flask_app.log', maxBytes=10485760, # 10MB backupCount=10 ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) # Add request context class RequestFormatter(logging.Formatter): def format(self, record): record.url = request.url record.remote_addr = request.remote_addr return super().format(record) # Configure app logger app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) return app.logger # Usage in routes @app.route('/api/endpoint') def api_endpoint(): app.logger.info(f'Request received from {request.remote_addr}') # Your code here return jsonify({'status': 'success'})
FastAPI 日志记录实践
FastAPI 可以通过一些中间件增强功能来利用 Python 的日志记录:
from fastapi import FastAPI, Request from typing import Callable import logging import time app = FastAPI() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Middleware for request logging @app.middleware("http") async def log_requests(request: Request, call_next: Callable): start_time = time.time() response = await call_next(request) duration = time.time() - start_time log_dict = { "url": str(request.url), "method": request.method, "client_ip": request.client.host, "duration": f"{duration:.2f}s", "status_code": response.status_code } logger.info(f"Request processed: {log_dict}") return response # Example endpoint with logging @app.get("/items/{item_id}") async def read_item(item_id: int): logger.info(f"Retrieving item {item_id}") # Your code here return {"item_id": item_id}
微服务日志
对于微服务,分布式跟踪和关联ID至关重要:
import logging import contextvars from uuid import uuid4 # Create context variable for trace ID trace_id_var = contextvars.ContextVar('trace_id', default=None) class TraceIDFilter(logging.Filter): def filter(self, record): trace_id = trace_id_var.get() record.trace_id = trace_id if trace_id else 'no_trace' return True def setup_microservice_logging(service_name): logger = logging.getLogger(service_name) # Create formatter with trace ID formatter = logging.Formatter( '%(asctime)s - %(name)s - [%(trace_id)s] - %(levelname)s - %(message)s' ) # Add handlers with trace ID filter handler = logging.StreamHandler() handler.setFormatter(formatter) handler.addFilter(TraceIDFilter()) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger # Usage in microservice logger = setup_microservice_logging('order_service') def process_order(order_data): # Generate or get trace ID from request trace_id_var.set(str(uuid4())) logger.info("Starting order processing", extra={ 'order_id': order_data['id'], 'customer_id': order_data['customer_id'] }) # Process order... logger.info("Order processed successfully")
后台任务日志记录
对于后台任务,我们需要确保正确的日志处理和轮换:
from logging.handlers import RotatingFileHandler import logging import threading from datetime import datetime class BackgroundTaskLogger: def __init__(self, task_name): self.logger = logging.getLogger(f'background_task.{task_name}') self.setup_logging() def setup_logging(self): # Create logs directory if it doesn't exist import os os.makedirs('logs', exist_ok=True) # Setup rotating file handler handler = RotatingFileHandler( filename=f'logs/task_{datetime.now():%Y%m%d}.log', maxBytes=5*1024*1024, # 5MB backupCount=5 ) # Create formatter formatter = logging.Formatter( '%(asctime)s - [%(threadName)s] - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def log_task_status(self, status, **kwargs): """Log task status with additional context""" extra = { 'thread_id': threading.get_ident(), 'timestamp': datetime.now().isoformat(), **kwargs } self.logger.info(f"Task status: {status}", extra=extra) # Usage example def background_job(): logger = BackgroundTaskLogger('data_processing') try: logger.log_task_status('started', job_id=123) # Do some work... logger.log_task_status('completed', records_processed=1000) except Exception as e: logger.logger.error(f"Task failed: {str(e)}", exc_info=True)
常见的日志记录模式和解决方案
请求 ID 跟踪
在您的应用程序中实施请求跟踪:
import logging from contextlib import contextmanager import threading import uuid # Store request ID in thread-local storage _request_id = threading.local() class RequestIDFilter(logging.Filter): def filter(self, record): record.request_id = getattr(_request_id, 'id', 'no_request_id') return True @contextmanager def request_context(request_id=None): """Context manager for request tracking""" if request_id is None: request_id = str(uuid.uuid4()) old_id = getattr(_request_id, 'id', None) _request_id.id = request_id try: yield request_id finally: if old_id is None: del _request_id.id else: _request_id.id = old_id # Setup logging with request ID def setup_request_logging(): logger = logging.getLogger() formatter = logging.Formatter( '%(asctime)s - [%(request_id)s] - %(levelname)s - %(message)s' ) handler = logging.StreamHandler() handler.setFormatter(formatter) handler.addFilter(RequestIDFilter()) logger.addHandler(handler) return logger # Usage example logger = setup_request_logging() def process_request(data): with request_context() as request_id: logger.info("Processing request", extra={ 'data': data, 'operation': 'process_request' }) # Process the request... logger.info("Request processed successfully")
用户活动记录
安全地跟踪用户操作:
import logging from datetime import datetime from typing import Dict, Any class UserActivityLogger: def __init__(self): self.logger = logging.getLogger('user_activity') self.setup_logger() def setup_logger(self): formatter = logging.Formatter( '%(asctime)s - %(levelname)s - ' '[User: %(user_id)s] %(message)s' ) handler = logging.FileHandler('user_activity.log') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def log_activity( self, user_id: str, action: str, resource: str, details: Dict[str, Any] = None ): """Log user activity with context""" activity_data = { 'timestamp': datetime.utcnow().isoformat(), 'user_id': user_id, 'action': action, 'resource': resource, 'details': details or {} } self.logger.info( f"User performed {action} on {resource}", extra={ 'user_id': user_id, 'activity_data': activity_data } ) return activity_data # Usage example activity_logger = UserActivityLogger() def update_user_profile(user_id: str, profile_data: dict): try: # Update profile logic here... activity_logger.log_activity( user_id=user_id, action='update_profile', resource='user_profile', details={ 'updated_fields': list(profile_data.keys()), 'source_ip': '192.168.1.1' } ) except Exception as e: activity_logger.logger.error( f"Profile update failed for user {user_id}", extra={'user_id': user_id, 'error': str(e)}, exc_info=True )
故障排除和调试
有效地排除日志问题需要了解常见问题及其解决方案。本节介绍开发人员在实施日志时面临的最常见挑战,并提供了调试日志配置的实用解决方案。
常见日志记录问题
缺少日志条目
# Common problem: Logs not appearing due to incorrect log level import logging # Wrong way logger = logging.getLogger(__name__) logger.debug("This won't appear") # No handler and wrong level # Correct way def setup_proper_logging(): logger = logging.getLogger(__name__) # Set the base logger level logger.setLevel(logging.DEBUG) # Create and configure handler handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) # Add formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) # Add handler to logger logger.addHandler(handler) return logger
性能瓶颈
import logging import time from functools import wraps class PerformanceLoggingHandler(logging.Handler): def __init__(self): super().__init__() self.log_times = [] def emit(self, record): self.log_times.append(time.time()) if len(self.log_times) > 1000: time_diff = self.log_times[-1] - self.log_times[0] if time_diff < 1: # More than 1000 logs per second print(f"Warning: High logging rate detected: {len(self.log_times)/time_diff} logs/second") self.log_times = self.log_times[-100:] def log_performance(logger): """Decorator to measure and log function performance""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() execution_time = end_time - start_time logger.info( f"Function {func.__name__} took {execution_time:.4f} seconds", extra={ 'execution_time': execution_time, 'function_name': func.__name__ } ) return result return wrapper return decorator
常见的日志记录陷阱和解决方案
配置问题
# Common Mistake 1: Not setting the log level properly logger = logging.getLogger(__name__) # Will not show DEBUG messages logger.debug("This won't appear") # Solution: Configure both logger and handler levels logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Common Mistake 2: Incorrect basicConfig usage logging.basicConfig(level=logging.INFO) # Called after handler was added - won't work logger.info("Message") # Solution: Configure logging before creating loggers logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger.info("Message")
内存和资源问题
# Common Mistake 3: Creating handlers in loops def process_item(item): # DON'T DO THIS logger = logging.getLogger(__name__) handler = logging.FileHandler('app.log') # Memory leak! logger.addHandler(handler) logger.info(f"Processing {item}") # Solution: Create handlers outside loops logger = logging.getLogger(__name__) handler = logging.FileHandler('app.log') logger.addHandler(handler) def process_item(item): logger.info(f"Processing {item}") # Common Mistake 4: Not closing file handlers handler = logging.FileHandler('app.log') logger.addHandler(handler) # ... later # handler.close() # Forgotten! # Solution: Use context manager from contextlib import contextmanager @contextmanager def log_file_handler(filename): handler = logging.FileHandler(filename) logger.addHandler(handler) try: yield finally: handler.close() logger.removeHandler(handler)
格式字符串和性能问题
# Common Mistake 5: Inefficient string formatting logger.error("Error occurred: {}".format(error)) # Inefficient logger.error(f"Error occurred: {error}") # Less efficient for logging # Solution: Use %-formatting for better performance logger.error("Error occurred: %s", error) # Common Mistake 6: Expensive operations in debug statements logger.debug(f"Complex data: {expensive_operation()}") # Operation runs even if debug is disabled # Solution: Use lazy evaluation logger.debug("Complex data: %s", expensive_operation) if logger.isEnabledFor(logging.DEBUG) else None
处理程序配置陷阱
# Common Mistake 7: Multiple handler addition logger = logging.getLogger(__name__) handler = logging.StreamHandler() logger.addHandler(handler) logger.addHandler(handler) # Duplicate handler! # Solution: Check for existing handlers if not logger.handlers: handler = logging.StreamHandler() logger.addHandler(handler) # Common Mistake 8: Incorrect propagation handling child_logger = logging.getLogger('parent.child') parent_logger = logging.getLogger('parent') # Messages appear twice due to propagation # Solution: Control propagation explicitly child_logger.propagate = False # If you don't want propagation
线程安全注意事项
# Common Mistake 9: Unsafe handler modification def setup_logging(): # DON'T DO THIS logger = logging.getLogger() logger.handlers.clear() # Unsafe in multi-threaded environment # Solution: Use lock for handler modifications import threading _handler_lock = threading.Lock() def setup_logging(): with _handler_lock: logger = logging.getLogger() existing_handlers = logger.handlers.copy() for handler in existing_handlers: logger.removeHandler(handler) handler.close()
配置文件问题
# Common Mistake 10: Hardcoded paths in config LOGGING = { 'handlers': { 'file': { 'filename': '/absolute/path/app.log', # Breaks portability } } } # Solution: Use environment variables or relative paths import os LOGGING = { 'handlers': { 'file': { 'filename': os.getenv('LOG_FILE', 'app.log'), } } }
测试你的日志记录
使用日志进行单元测试
import unittest from unittest.mock import patch import logging import io class TestLogging(unittest.TestCase): def setUp(self): # Create logger for testing self.logger = logging.getLogger('test_logger') self.logger.setLevel(logging.DEBUG) # Create string IO for capturing log output self.log_output = io.StringIO() # Create handler that writes to string IO self.handler = logging.StreamHandler(self.log_output) self.handler.setLevel(logging.DEBUG) # Add handler to logger self.logger.addHandler(self.handler) def test_log_level_filtering(self): """Test that log levels are properly filtered""" self.logger.debug("Debug message") self.logger.info("Info message") log_contents = self.log_output.getvalue() self.assertIn("Debug message", log_contents) self.assertIn("Info message", log_contents) def test_exception_logging(self): """Test exception logging functionality""" try: raise ValueError("Test error") except ValueError: self.logger.exception("An error occurred") log_contents = self.log_output.getvalue() self.assertIn("An error occurred", log_contents) self.assertIn("ValueError: Test error", log_contents) self.assertIn("Traceback", log_contents) def tearDown(self): # Clean up self.handler.close() self.log_output.close()
使用模拟记录器进行测试
import unittest from unittest.mock import patch, MagicMock from your_module import YourClass class TestWithMockedLogger(unittest.TestCase): @patch('logging.getLogger') def test_logger_calls(self, mock_get_logger): # Create mock logger mock_logger = MagicMock() mock_get_logger.return_value = mock_logger # Create instance of your class instance = YourClass() # Perform some operation that should log instance.do_something() # Assert logging calls mock_logger.info.assert_called_with( "Operation completed", extra={'operation': 'do_something'} ) # Check number of calls self.assertEqual(mock_logger.error.call_count, 0) self.assertEqual(mock_logger.info.call_count, 1) @patch('logging.getLogger') def test_error_logging(self, mock_get_logger): mock_logger = MagicMock() mock_get_logger.return_value = mock_logger instance = YourClass() # Trigger an error condition instance.problematic_operation() # Verify error was logged mock_logger.error.assert_called_once() args, kwargs = mock_logger.error.call_args self.assertIn("Operation failed", args[0])
替代日志解决方案
洛古鲁
Loguru 提供了一个更简单的日志记录界面,具有强大的开箱即用功能:
from loguru import logger import sys # Basic Loguru configuration logger.remove() # Remove default handler logger.add( sys.stdout, colorize=True, format="{time:YYYY-MM-DD HH:mm:ss} |{level: <8} |{name} :{function} :{line} -{message} " ) logger.add( "app_{time}.log", rotation="500 MB", retention="10 days", compression="zip" ) # Advanced Loguru usage from loguru import logger import contextlib # Custom format for structured logging logger.add( "structured.json", format="{time} {level} {message}", serialize=True # Enables JSON formatting ) # Context manager for transaction logging @contextlib.contextmanager def transaction_context(transaction_id: str): logger.info(f"Starting transaction {transaction_id}") try: yield logger.info(f"Transaction {transaction_id} completed successfully") except Exception as e: logger.exception(f"Transaction {transaction_id} failed") raise # Example usage def process_order(order_id: str): with transaction_context(f"order_{order_id}"): logger.debug("Processing order", order_id=order_id) # Order processing logic logger.info("Order processed", status="completed", order_id=order_id)
结构日志
Structlog 非常适合具有上下文的结构化日志记录:
import structlog import time from typing import Any, Dict # Configure structlog structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.PrintLoggerFactory(), wrapper_class=structlog.BoundLogger, cache_logger_on_first_use=True, ) # Create logger instance logger = structlog.get_logger() class StructuredLogging: def __init__(self): self.logger = structlog.get_logger() def with_context(self, **context) -> 'StructuredLogging': """Create a new logger with additional context""" self.logger = self.logger.bind(**context) return self def log_operation(self, operation: str, **kwargs): """Log an operation with timing and context""" start_time = time.time() try: result = operation(**kwargs) duration = time.time() - start_time self.logger.info( "operation_completed", operation_name=operation.__name__, duration=duration, status="success", **kwargs ) return result except Exception as e: duration = time.time() - start_time self.logger.error( "operation_failed", operation_name=operation.__name__, duration=duration, error=str(e), error_type=type(e).__name__, **kwargs ) raise # Usage example structured_logger = StructuredLogging() def process_user_data(user_id: str, data: Dict[str, Any]): logger = structured_logger.with_context( user_id=user_id, service="user_service" ) logger.logger.info("processing_user_data", data_size=len(data)) # Process data... logger.logger.info("user_data_processed", status="success")
Python JSON 记录器
对于 JSON 格式的日志记录:
from pythonjsonlogger import jsonlogger import logging import datetime class CustomJsonFormatter(jsonlogger.JsonFormatter): def add_fields(self, log_record, record, message_dict): super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict) # Add custom fields log_record['timestamp'] = datetime.datetime.now(datetime.UTC).isoformat() log_record['level'] = record.levelname log_record['logger'] = record.name if hasattr(record, 'props'): log_record.update(record.props) def setup_json_logging(): logger = logging.getLogger() handler = logging.StreamHandler() formatter = CustomJsonFormatter( '%(timestamp)s %(level)s %(name)s %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger # Usage example logger = setup_json_logging() def log_event(event_type: str, **props): logger.info( f"Event: {event_type}", extra={ 'props': { 'event_type': event_type, 'timestamp': datetime.datetime.now(datetime.UTC).isoformat(), **props } } )
最佳实践和指南
伐木标准
# Example of implementing logging standards import logging from typing import Optional, Dict, Any class StandardizedLogger: def __init__(self, service_name: str): self.logger = logging.getLogger(service_name) self.service_name = service_name self._setup_logging() def _setup_logging(self): """Setup standardized logging configuration""" formatter = logging.Formatter( '[%(asctime)s] - %(name)s - %(levelname)s - %(message)s' ' {service: %(service)s, trace_id: %(trace_id)s}' ) handler = logging.StreamHandler() handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def _log(self, level: str, message: str, trace_id: Optional[str] = None, **kwargs): """Standardized log method""" extra = { 'service': self.service_name, 'trace_id': trace_id or 'no_trace', **kwargs } getattr(self.logger, level)( message, extra=extra ) def info(self, message: str, **kwargs): self._log('info', message, **kwargs) def error(self, message: str, **kwargs): self._log('error', message, **kwargs)
性能优化
import logging from functools import lru_cache from typing import Dict, Any import queue import threading class PerformantLogger: def __init__(self): self.log_queue = queue.Queue(maxsize=1000) self.running = True self._start_worker() def _start_worker(self): """Start background worker for async logging""" def worker(): while self.running: try: record = self.log_queue.get(timeout=1) self._process_log(record) except queue.Empty: continue self.worker_thread = threading.Thread(target=worker, daemon=True) self.worker_thread.start() @lru_cache(maxsize=100) def _format_message(self, template: str, **kwargs) -> str: """Cache commonly used log message templates""" return template.format(**kwargs) def _process_log(self, record: Dict[str, Any]): """Process log record""" # Actual logging logic here pass def log(self, level: str, message: str, **kwargs): """Asynchronous logging method""" record = { 'level': level, 'message': self._format_message(message, **kwargs), 'extra': kwargs } try: self.log_queue.put_nowait(record) except queue.Full: # Handle queue full scenario pass
案例研究
实际实施:电子商务平台
import logging from datetime import datetime from typing import Dict, Any class EcommerceLogger: def __init__(self): self.logger = logging.getLogger('ecommerce') self._setup_logging() def _setup_logging(self): # Configure handlers for different components self._add_transaction_handler() self._add_inventory_handler() self._add_user_activity_handler() def _add_transaction_handler(self): handler = logging.FileHandler('transactions.log') handler.setFormatter( logging.Formatter( '%(asctime)s - %(transaction_id)s - %(message)s' ) ) self.logger.addHandler(handler) def log_order(self, order_id: str, user_id: str, amount: float): """Log order processing""" self.logger.info( f"Processing order {order_id}", extra={ 'transaction_id': order_id, 'user_id': user_id, 'amount': amount, 'timestamp': datetime.utcnow().isoformat() } ) # Example usage class OrderProcessor: def __init__(self): self.logger = EcommerceLogger() def process_order(self, order_data: Dict[str, Any]): try: # Log order initiation self.logger.log_order( order_data['order_id'], order_data['user_id'], order_data['amount'] ) # Process order... # Log success self.logger.logger.info( f"Order {order_data['order_id']} processed successfully" ) except Exception as e: self.logger.logger.error( f"Order processing failed: {str(e)}", exc_info=True )
微服务架构示例
import logging import json from typing import Dict, Any from datetime import datetime class MicroserviceLogger: def __init__(self, service_name: str): self.service_name = service_name self.logger = self._configure_logger() def _configure_logger(self): logger = logging.getLogger(self.service_name) # JSON formatter for structured logging formatter = logging.Formatter( json.dumps({ 'timestamp': '%(asctime)s', 'service': '%(service_name)s', 'level': '%(levelname)s', 'message': '%(message)s', 'trace_id': '%(trace_id)s', 'additional_data': '%(additional_data)s' }) ) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) return logger def log(self, level: str, message: str, trace_id: str, **kwargs): """Log with distributed tracing context""" extra = { 'service_name': self.service_name, 'trace_id': trace_id, 'additional_data': json.dumps(kwargs) } getattr(self.logger, level)( message, extra=extra ) # Example usage in a microservice class PaymentService: def __init__(self): self.logger = MicroserviceLogger('payment-service') def process_payment(self, payment_data: Dict[str, Any], trace_id: str): self.logger.log( 'info', 'Processing payment', trace_id, payment_id=payment_data['id'], amount=payment_data['amount'] ) # Process payment... self.logger.log( 'info', 'Payment processed successfully', trace_id, payment_id=payment_data['id'] )
结论
关键要点
实施检查表
def logging_implementation_checklist() -> Dict[str, bool]: return { # Basic Setup "basic_configuration": True, "log_levels_defined": True, "handlers_configured": True, # Security "sensitive_data_filtered": True, "access_controls_implemented": True, "compliance_checked": True, # Performance "log_rotation_configured": True, "async_logging_implemented": True, "sampling_strategy_defined": True, # Monitoring "alerts_configured": True, "metrics_collected": True, "dashboards_created": True }
其他资源
本指南涵盖了 Python 日志记录的基本方面,从基本设置到高级实现。请记住,日志记录是应用程序可观察性和维护不可或缺的一部分。请谨慎实施并定期维护以获得最佳效果。
随着应用程序的发展和新需求的出现,请记住定期检查和更新您的日志记录实现。