完整的 Python 日志指南:最佳实践与实施

为什么正确的日志记录很重要

在深入探讨技术细节之前,让我们先了解一下为什么正确的日志记录很重要:

  • 实现生产中的有效调试
  • 提供对应用程序行为的洞察
  • 促进绩效监控
  • 帮助追踪安全事件
  • 支持合规性要求
  • 提高维护效率
  • Python 日志快速入门

    对于 Python 日志记录的新手,这里有一个使用 **logging.basicConfig** 的基本示例:

    # Simple python logging example
    import logging
    
    # Basic logger in python example
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Create a logger
    logger = logging.getLogger(__name__)
    
    # Logger in python example
    logger.info("This is an information message")
    logger.warning("This is a warning message")

    此示例演示了 python 中日志模块的基础知识,并展示了如何在应用程序中使用 python 记录器日志。

    Python 日志模块入门

    基本设置

    让我们从一个简单的日志配置开始:

    import logging
    
    # Basic configuration
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Your first logger
    logger = logging.getLogger(__name__)
    
    # Using the logger
    logger.info("Application started")
    logger.warning("Watch out!")
    logger.error("Something went wrong")

    了解日志级别

    Python 日志记录有五个标准级别:

    超越 print() 语句

    为什么选择记录而不是打印语句?

  • 严重程度等级,便于更好地分类
  • 时间戳信息
  • 源信息(文件、行号)
  • 可配置的输出目的地
  • 可投入生产的过滤
  • 线程安全
  • 配置你的日志系统

    基本配置选项

    logging.basicConfig(
        filename='app.log',
        filemode='w',
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.DEBUG,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    高级配置

    对于更复杂的应用程序:

    config = {
        'version': 1,
        'formatters': {
            'detailed': {
                'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
                'formatter': 'detailed'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'app.log',
                'level': 'DEBUG',
                'formatter': 'detailed'
            }
        },
        'loggers': {
            'myapp': {
                'handlers': ['console', 'file'],
                'level': 'DEBUG',
                'propagate': True
            }
        }
    }
    
    logging.config.dictConfig(config)

    使用高级日志记录

    结构化日志

    结构化日志记录提供了一致的、机器可读的格式,这对于日志分析和监控至关重要。如需全面了解结构化日志记录模式和最佳实践,请查看结构化日志记录指南。让我们用 Python 实现结构化日志记录:

    import json
    import logging
    from datetime import datetime
    
    class JSONFormatter(logging.Formatter):
        def __init__(self):
            super().__init__()
    
        def format(self, record):
            # Create base log record
            log_obj = {
                "timestamp": self.formatTime(record, self.datefmt),
                "name": record.name,
                "level": record.levelname,
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName,
                "line": record.lineno
            }
    
            # Add exception info if present
            if record.exc_info:
                log_obj["exception"] = self.formatException(record.exc_info)
    
            # Add custom fields from extra
            if hasattr(record, "extra_fields"):
                log_obj.update(record.extra_fields)
    
            return json.dumps(log_obj)
    
    # Usage Example
    logger = logging.getLogger(__name__)
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    logger.addHandler(handler)
    
    # Log with extra fields
    logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})

    错误管理

    正确的错误日志记录对于调试生产问题至关重要。以下是一种全面的方法:

    import traceback
    import sys
    from contextlib import contextmanager
    
    class ErrorLogger:
        def __init__(self, logger):
            self.logger = logger
    
        @contextmanager
        def error_context(self, operation_name, **context):
            """Context manager for error logging with additional context"""
            try:
                yield
            except Exception as e:
                # Capture the current stack trace
                exc_type, exc_value, exc_traceback = sys.exc_info()
    
                # Format error details
                error_details = {
                    "operation": operation_name,
                    "error_type": exc_type.__name__,
                    "error_message": str(exc_value),
                    "context": context,
                    "stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback)
                }
    
                # Log the error with full context
                self.logger.error(
                    f"Error in {operation_name}: {str(exc_value)}",
                    extra={"error_details": error_details}
                )
    
                # Re-raise the exception
                raise
    
    # Usage Example
    logger = logging.getLogger(__name__)
    error_logger = ErrorLogger(logger)
    
    with error_logger.error_context("user_authentication", user_id="123", attempt=2):
        # Your code that might raise an exception
        authenticate_user(user_id)

    并发日志

    在多线程应用中登录时,需要保证线程安全:

    import threading
    import logging
    from queue import Queue
    from logging.handlers import QueueHandler, QueueListener
    
    def setup_thread_safe_logging():
        """Set up thread-safe logging with a queue"""
        # Create the queue
        log_queue = Queue()
    
        # Create handlers
        console_handler = logging.StreamHandler()
        file_handler = logging.FileHandler('app.log')
    
        # Create queue handler and listener
        queue_handler = QueueHandler(log_queue)
        listener = QueueListener(
            log_queue,
            console_handler,
            file_handler,
            respect_handler_level=True
        )
    
        # Configure root logger
        root_logger = logging.getLogger()
        root_logger.addHandler(queue_handler)
    
        # Start the listener in a separate thread
        listener.start()
    
        return listener
    
    # Usage
    listener = setup_thread_safe_logging()
    
    def worker_function():
        logger = logging.getLogger(__name__)
        logger.info(f"Worker thread {threading.current_thread().name} starting")
        # Do work...
        logger.info(f"Worker thread {threading.current_thread().name} finished")
    
    # Create and start threads
    threads = [
        threading.Thread(target=worker_function)
        for _ in range(3)
    ]
    for thread in threads:
        thread.start()

    不同环境中的日志记录

    不同的应用环境需要特定的日志记录方法。无论您使用的是 Web 应用程序、微服务还是后台任务,每个环境都有独特的日志记录要求和最佳实践。让我们探索如何在各种部署场景中实现有效的日志记录。

    Web 应用程序日志记录

    Django 日志配置

    以下是全面的 Django 日志设置:

    # settings.py
    LOGGING = {
        'version': 1,
        'disable_existing_loggers': False,
        'formatters': {
            'verbose': {
                'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
                'style': '{',
            },
            'simple': {
                'format': '{levelname} {message}',
                'style': '{',
            },
        },
        'filters': {
            'require_debug_true': {
                '()': 'django.utils.log.RequireDebugTrue',
            },
        },
        'handlers': {
            'console': {
                'level': 'INFO',
                'filters': ['require_debug_true'],
                'class': 'logging.StreamHandler',
                'formatter': 'simple'
            },
            'file': {
                'level': 'ERROR',
                'class': 'logging.FileHandler',
                'filename': 'django-errors.log',
                'formatter': 'verbose'
            },
            'mail_admins': {
                'level': 'ERROR',
                'class': 'django.utils.log.AdminEmailHandler',
                'include_html': True,
            }
        },
        'loggers': {
            'django': {
                'handlers': ['console'],
                'propagate': True,
            },
            'django.request': {
                'handlers': ['file', 'mail_admins'],
                'level': 'ERROR',
                'propagate': False,
            },
            'myapp': {
                'handlers': ['console', 'file'],
                'level': 'INFO',
            }
        }
    }

    Flask 日志设置

    Flask 提供了自己的可定制的日志系统:

    import logging
    from logging.handlers import RotatingFileHandler
    from flask import Flask, request
    
    app = Flask(__name__)
    
    def setup_logger():
        # Create formatter
        formatter = logging.Formatter(
            '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
        )
    
        # File Handler
        file_handler = RotatingFileHandler(
            'flask_app.log',
            maxBytes=10485760,  # 10MB
            backupCount=10
        )
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
    
        # Add request context
        class RequestFormatter(logging.Formatter):
            def format(self, record):
                record.url = request.url
                record.remote_addr = request.remote_addr
                return super().format(record)
    
        # Configure app logger
        app.logger.addHandler(file_handler)
        app.logger.setLevel(logging.INFO)
    
        return app.logger
    
    # Usage in routes
    @app.route('/api/endpoint')
    def api_endpoint():
        app.logger.info(f'Request received from {request.remote_addr}')
        # Your code here
        return jsonify({'status': 'success'})

    FastAPI 日志记录实践

    FastAPI 可以通过一些中间件增强功能来利用 Python 的日志记录:

    from fastapi import FastAPI, Request
    from typing import Callable
    import logging
    import time
    
    app = FastAPI()
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)
    
    # Middleware for request logging
    @app.middleware("http")
    async def log_requests(request: Request, call_next: Callable):
        start_time = time.time()
        response = await call_next(request)
        duration = time.time() - start_time
    
        log_dict = {
            "url": str(request.url),
            "method": request.method,
            "client_ip": request.client.host,
            "duration": f"{duration:.2f}s",
            "status_code": response.status_code
        }
    
        logger.info(f"Request processed: {log_dict}")
        return response
    
    # Example endpoint with logging
    @app.get("/items/{item_id}")
    async def read_item(item_id: int):
        logger.info(f"Retrieving item {item_id}")
        # Your code here
        return {"item_id": item_id}

    微服务日志

    对于微服务,分布式跟踪和关联ID至关重要:

    import logging
    import contextvars
    from uuid import uuid4
    
    # Create context variable for trace ID
    trace_id_var = contextvars.ContextVar('trace_id', default=None)
    
    class TraceIDFilter(logging.Filter):
        def filter(self, record):
            trace_id = trace_id_var.get()
            record.trace_id = trace_id if trace_id else 'no_trace'
            return True
    
    def setup_microservice_logging(service_name):
        logger = logging.getLogger(service_name)
    
        # Create formatter with trace ID
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - [%(trace_id)s] - %(levelname)s - %(message)s'
        )
    
        # Add handlers with trace ID filter
        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        handler.addFilter(TraceIDFilter())
    
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
    
        return logger
    
    # Usage in microservice
    logger = setup_microservice_logging('order_service')
    
    def process_order(order_data):
        # Generate or get trace ID from request
        trace_id_var.set(str(uuid4()))
    
        logger.info("Starting order processing", extra={
            'order_id': order_data['id'],
            'customer_id': order_data['customer_id']
        })
    
        # Process order...
    
        logger.info("Order processed successfully")

    后台任务日志记录

    对于后台任务,我们需要确保正确的日志处理和轮换:

    from logging.handlers import RotatingFileHandler
    import logging
    import threading
    from datetime import datetime
    
    class BackgroundTaskLogger:
        def __init__(self, task_name):
            self.logger = logging.getLogger(f'background_task.{task_name}')
            self.setup_logging()
    
        def setup_logging(self):
            # Create logs directory if it doesn't exist
            import os
            os.makedirs('logs', exist_ok=True)
    
            # Setup rotating file handler
            handler = RotatingFileHandler(
                filename=f'logs/task_{datetime.now():%Y%m%d}.log',
                maxBytes=5*1024*1024,  # 5MB
                backupCount=5
            )
    
            # Create formatter
            formatter = logging.Formatter(
                '%(asctime)s - [%(threadName)s] - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
    
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
    
        def log_task_status(self, status, **kwargs):
            """Log task status with additional context"""
            extra = {
                'thread_id': threading.get_ident(),
                'timestamp': datetime.now().isoformat(),
                **kwargs
            }
            self.logger.info(f"Task status: {status}", extra=extra)
    
    # Usage example
    def background_job():
        logger = BackgroundTaskLogger('data_processing')
        try:
            logger.log_task_status('started', job_id=123)
            # Do some work...
            logger.log_task_status('completed', records_processed=1000)
        except Exception as e:
            logger.logger.error(f"Task failed: {str(e)}", exc_info=True)

    常见的日志记录模式和解决方案

    请求 ID 跟踪

    在您的应用程序中实施请求跟踪:

    import logging
    from contextlib import contextmanager
    import threading
    import uuid
    
    # Store request ID in thread-local storage
    _request_id = threading.local()
    
    class RequestIDFilter(logging.Filter):
        def filter(self, record):
            record.request_id = getattr(_request_id, 'id', 'no_request_id')
            return True
    
    @contextmanager
    def request_context(request_id=None):
        """Context manager for request tracking"""
        if request_id is None:
            request_id = str(uuid.uuid4())
    
        old_id = getattr(_request_id, 'id', None)
        _request_id.id = request_id
        try:
            yield request_id
        finally:
            if old_id is None:
                del _request_id.id
            else:
                _request_id.id = old_id
    
    # Setup logging with request ID
    def setup_request_logging():
        logger = logging.getLogger()
        formatter = logging.Formatter(
            '%(asctime)s - [%(request_id)s] - %(levelname)s - %(message)s'
        )
    
        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        handler.addFilter(RequestIDFilter())
    
        logger.addHandler(handler)
        return logger
    
    # Usage example
    logger = setup_request_logging()
    
    def process_request(data):
        with request_context() as request_id:
            logger.info("Processing request", extra={
                'data': data,
                'operation': 'process_request'
            })
            # Process the request...
            logger.info("Request processed successfully")

    用户活动记录

    安全地跟踪用户操作:

    import logging
    from datetime import datetime
    from typing import Dict, Any
    
    class UserActivityLogger:
        def __init__(self):
            self.logger = logging.getLogger('user_activity')
            self.setup_logger()
    
        def setup_logger(self):
            formatter = logging.Formatter(
                '%(asctime)s - %(levelname)s - '
                '[User: %(user_id)s] %(message)s'
            )
    
            handler = logging.FileHandler('user_activity.log')
            handler.setFormatter(formatter)
    
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
    
        def log_activity(
            self,
            user_id: str,
            action: str,
            resource: str,
            details: Dict[str, Any] = None
        ):
            """Log user activity with context"""
            activity_data = {
                'timestamp': datetime.utcnow().isoformat(),
                'user_id': user_id,
                'action': action,
                'resource': resource,
                'details': details or {}
            }
    
            self.logger.info(
                f"User performed {action} on {resource}",
                extra={
                    'user_id': user_id,
                    'activity_data': activity_data
                }
            )
    
            return activity_data
    
    # Usage example
    activity_logger = UserActivityLogger()
    
    def update_user_profile(user_id: str, profile_data: dict):
        try:
            # Update profile logic here...
    
            activity_logger.log_activity(
                user_id=user_id,
                action='update_profile',
                resource='user_profile',
                details={
                    'updated_fields': list(profile_data.keys()),
                    'source_ip': '192.168.1.1'
                }
            )
    
        except Exception as e:
            activity_logger.logger.error(
                f"Profile update failed for user {user_id}",
                extra={'user_id': user_id, 'error': str(e)},
                exc_info=True
            )

    故障排除和调试

    有效地排除日志问题需要了解常见问题及其解决方案。本节介绍开发人员在实施日志时面临的最常见挑战,并提供了调试日志配置的实用解决方案。

    常见日志记录问题

    缺少日志条目

    # Common problem: Logs not appearing due to incorrect log level
    import logging
    
    # Wrong way
    logger = logging.getLogger(__name__)
    logger.debug("This won't appear")  # No handler and wrong level
    
    # Correct way
    def setup_proper_logging():
        logger = logging.getLogger(__name__)
    
        # Set the base logger level
        logger.setLevel(logging.DEBUG)
    
        # Create and configure handler
        handler = logging.StreamHandler()
        handler.setLevel(logging.DEBUG)
    
        # Add formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
    
        # Add handler to logger
        logger.addHandler(handler)
    
        return logger

    性能瓶颈

    import logging
    import time
    from functools import wraps
    
    class PerformanceLoggingHandler(logging.Handler):
        def __init__(self):
            super().__init__()
            self.log_times = []
    
        def emit(self, record):
            self.log_times.append(time.time())
            if len(self.log_times) > 1000:
                time_diff = self.log_times[-1] - self.log_times[0]
                if time_diff < 1:  # More than 1000 logs per second
                    print(f"Warning: High logging rate detected: {len(self.log_times)/time_diff} logs/second")
                self.log_times = self.log_times[-100:]
    
    def log_performance(logger):
        """Decorator to measure and log function performance"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.perf_counter()
                result = func(*args, **kwargs)
                end_time = time.perf_counter()
    
                execution_time = end_time - start_time
                logger.info(
                    f"Function {func.__name__} took {execution_time:.4f} seconds",
                    extra={
                        'execution_time': execution_time,
                        'function_name': func.__name__
                    }
                )
                return result
            return wrapper
        return decorator

    常见的日志记录陷阱和解决方案

    配置问题

    # Common Mistake 1: Not setting the log level properly
    logger = logging.getLogger(__name__)  # Will not show DEBUG messages
    logger.debug("This won't appear")
    
    # Solution: Configure both logger and handler levels
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    # Common Mistake 2: Incorrect basicConfig usage
    logging.basicConfig(level=logging.INFO)  # Called after handler was added - won't work
    logger.info("Message")
    
    # Solution: Configure logging before creating loggers
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info("Message")

    内存和资源问题

    # Common Mistake 3: Creating handlers in loops
    def process_item(item):  # DON'T DO THIS
        logger = logging.getLogger(__name__)
        handler = logging.FileHandler('app.log')  # Memory leak!
        logger.addHandler(handler)
        logger.info(f"Processing {item}")
    
    # Solution: Create handlers outside loops
    logger = logging.getLogger(__name__)
    handler = logging.FileHandler('app.log')
    logger.addHandler(handler)
    
    def process_item(item):
        logger.info(f"Processing {item}")
    
    # Common Mistake 4: Not closing file handlers
    handler = logging.FileHandler('app.log')
    logger.addHandler(handler)
    # ... later
    # handler.close()  # Forgotten!
    
    # Solution: Use context manager
    from contextlib import contextmanager
    
    @contextmanager
    def log_file_handler(filename):
        handler = logging.FileHandler(filename)
        logger.addHandler(handler)
        try:
            yield
        finally:
            handler.close()
            logger.removeHandler(handler)

    格式字符串和性能问题

    # Common Mistake 5: Inefficient string formatting
    logger.error("Error occurred: {}".format(error))  # Inefficient
    logger.error(f"Error occurred: {error}")  # Less efficient for logging
    
    # Solution: Use %-formatting for better performance
    logger.error("Error occurred: %s", error)
    
    # Common Mistake 6: Expensive operations in debug statements
    logger.debug(f"Complex data: {expensive_operation()}")  # Operation runs even if debug is disabled
    
    # Solution: Use lazy evaluation
    logger.debug("Complex data: %s", expensive_operation) if logger.isEnabledFor(logging.DEBUG) else None

    处理程序配置陷阱

    # Common Mistake 7: Multiple handler addition
    logger = logging.getLogger(__name__)
    handler = logging.StreamHandler()
    logger.addHandler(handler)
    logger.addHandler(handler)  # Duplicate handler!
    
    # Solution: Check for existing handlers
    if not logger.handlers:
        handler = logging.StreamHandler()
        logger.addHandler(handler)
    
    # Common Mistake 8: Incorrect propagation handling
    child_logger = logging.getLogger('parent.child')
    parent_logger = logging.getLogger('parent')
    # Messages appear twice due to propagation
    
    # Solution: Control propagation explicitly
    child_logger.propagate = False  # If you don't want propagation

    线程安全注意事项

    # Common Mistake 9: Unsafe handler modification
    def setup_logging():  # DON'T DO THIS
        logger = logging.getLogger()
        logger.handlers.clear()  # Unsafe in multi-threaded environment
    
    # Solution: Use lock for handler modifications
    import threading
    _handler_lock = threading.Lock()
    
    def setup_logging():
        with _handler_lock:
            logger = logging.getLogger()
            existing_handlers = logger.handlers.copy()
            for handler in existing_handlers:
                logger.removeHandler(handler)
                handler.close()

    配置文件问题

    # Common Mistake 10: Hardcoded paths in config
    LOGGING = {
        'handlers': {
            'file': {
                'filename': '/absolute/path/app.log',  # Breaks portability
            }
        }
    }
    
    # Solution: Use environment variables or relative paths
    import os
    
    LOGGING = {
        'handlers': {
            'file': {
                'filename': os.getenv('LOG_FILE', 'app.log'),
            }
        }
    }

    测试你的日志记录

    使用日志进行单元测试

    import unittest
    from unittest.mock import patch
    import logging
    import io
    
    class TestLogging(unittest.TestCase):
        def setUp(self):
            # Create logger for testing
            self.logger = logging.getLogger('test_logger')
            self.logger.setLevel(logging.DEBUG)
    
            # Create string IO for capturing log output
            self.log_output = io.StringIO()
    
            # Create handler that writes to string IO
            self.handler = logging.StreamHandler(self.log_output)
            self.handler.setLevel(logging.DEBUG)
    
            # Add handler to logger
            self.logger.addHandler(self.handler)
    
        def test_log_level_filtering(self):
            """Test that log levels are properly filtered"""
            self.logger.debug("Debug message")
            self.logger.info("Info message")
    
            log_contents = self.log_output.getvalue()
    
            self.assertIn("Debug message", log_contents)
            self.assertIn("Info message", log_contents)
    
        def test_exception_logging(self):
            """Test exception logging functionality"""
            try:
                raise ValueError("Test error")
            except ValueError:
                self.logger.exception("An error occurred")
    
            log_contents = self.log_output.getvalue()
    
            self.assertIn("An error occurred", log_contents)
            self.assertIn("ValueError: Test error", log_contents)
            self.assertIn("Traceback", log_contents)
    
        def tearDown(self):
            # Clean up
            self.handler.close()
            self.log_output.close()

    使用模拟记录器进行测试

    import unittest
    from unittest.mock import patch, MagicMock
    from your_module import YourClass
    
    class TestWithMockedLogger(unittest.TestCase):
        @patch('logging.getLogger')
        def test_logger_calls(self, mock_get_logger):
            # Create mock logger
            mock_logger = MagicMock()
            mock_get_logger.return_value = mock_logger
    
            # Create instance of your class
            instance = YourClass()
    
            # Perform some operation that should log
            instance.do_something()
    
            # Assert logging calls
            mock_logger.info.assert_called_with(
                "Operation completed",
                extra={'operation': 'do_something'}
            )
    
            # Check number of calls
            self.assertEqual(mock_logger.error.call_count, 0)
            self.assertEqual(mock_logger.info.call_count, 1)
    
        @patch('logging.getLogger')
        def test_error_logging(self, mock_get_logger):
            mock_logger = MagicMock()
            mock_get_logger.return_value = mock_logger
    
            instance = YourClass()
    
            # Trigger an error condition
            instance.problematic_operation()
    
            # Verify error was logged
            mock_logger.error.assert_called_once()
            args, kwargs = mock_logger.error.call_args
            self.assertIn("Operation failed", args[0])

    替代日志解决方案

    洛古鲁

    Loguru 提供了一个更简单的日志记录界面,具有强大的开箱即用功能:

    from loguru import logger
    import sys
    
    # Basic Loguru configuration
    logger.remove()  # Remove default handler
    logger.add(
        sys.stdout,
        colorize=True,
        format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
    )
    logger.add(
        "app_{time}.log",
        rotation="500 MB",
        retention="10 days",
        compression="zip"
    )
    
    # Advanced Loguru usage
    from loguru import logger
    import contextlib
    
    # Custom format for structured logging
    logger.add(
        "structured.json",
        format="{time} {level} {message}",
        serialize=True  # Enables JSON formatting
    )
    
    # Context manager for transaction logging
    @contextlib.contextmanager
    def transaction_context(transaction_id: str):
        logger.info(f"Starting transaction {transaction_id}")
        try:
            yield
            logger.info(f"Transaction {transaction_id} completed successfully")
        except Exception as e:
            logger.exception(f"Transaction {transaction_id} failed")
            raise
    
    # Example usage
    def process_order(order_id: str):
        with transaction_context(f"order_{order_id}"):
            logger.debug("Processing order", order_id=order_id)
            # Order processing logic
            logger.info("Order processed", status="completed", order_id=order_id)

    结构日志

    Structlog 非常适合具有上下文的结构化日志记录:

    import structlog
    import time
    from typing import Any, Dict
    
    # Configure structlog
    structlog.configure(
        processors=[
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer()
        ],
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        wrapper_class=structlog.BoundLogger,
        cache_logger_on_first_use=True,
    )
    
    # Create logger instance
    logger = structlog.get_logger()
    
    class StructuredLogging:
        def __init__(self):
            self.logger = structlog.get_logger()
    
        def with_context(self, **context) -> 'StructuredLogging':
            """Create a new logger with additional context"""
            self.logger = self.logger.bind(**context)
            return self
    
        def log_operation(self, operation: str, **kwargs):
            """Log an operation with timing and context"""
            start_time = time.time()
    
            try:
                result = operation(**kwargs)
                duration = time.time() - start_time
    
                self.logger.info(
                    "operation_completed",
                    operation_name=operation.__name__,
                    duration=duration,
                    status="success",
                    **kwargs
                )
                return result
    
            except Exception as e:
                duration = time.time() - start_time
                self.logger.error(
                    "operation_failed",
                    operation_name=operation.__name__,
                    duration=duration,
                    error=str(e),
                    error_type=type(e).__name__,
                    **kwargs
                )
                raise
    
    # Usage example
    structured_logger = StructuredLogging()
    
    def process_user_data(user_id: str, data: Dict[str, Any]):
        logger = structured_logger.with_context(
            user_id=user_id,
            service="user_service"
        )
    
        logger.logger.info("processing_user_data", data_size=len(data))
        # Process data...
        logger.logger.info("user_data_processed", status="success")

    Python JSON 记录器

    对于 JSON 格式的日志记录:

    from pythonjsonlogger import jsonlogger
    import logging
    import datetime
    
    class CustomJsonFormatter(jsonlogger.JsonFormatter):
        def add_fields(self, log_record, record, message_dict):
            super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
    
            # Add custom fields
            log_record['timestamp'] = datetime.datetime.now(datetime.UTC).isoformat()
            log_record['level'] = record.levelname
            log_record['logger'] = record.name
    
            if hasattr(record, 'props'):
                log_record.update(record.props)
    
    def setup_json_logging():
        logger = logging.getLogger()
        handler = logging.StreamHandler()
    
        formatter = CustomJsonFormatter(
            '%(timestamp)s %(level)s %(name)s %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
    
        return logger
    
    # Usage example
    logger = setup_json_logging()
    
    def log_event(event_type: str, **props):
        logger.info(
            f"Event: {event_type}",
            extra={
                'props': {
                    'event_type': event_type,
                    'timestamp': datetime.datetime.now(datetime.UTC).isoformat(),
                    **props
                }
            }
        )

    最佳实践和指南

    伐木标准

    # Example of implementing logging standards
    import logging
    from typing import Optional, Dict, Any
    
    class StandardizedLogger:
        def __init__(self, service_name: str):
            self.logger = logging.getLogger(service_name)
            self.service_name = service_name
            self._setup_logging()
    
        def _setup_logging(self):
            """Setup standardized logging configuration"""
            formatter = logging.Formatter(
                '[%(asctime)s] - %(name)s - %(levelname)s - %(message)s'
                ' {service: %(service)s, trace_id: %(trace_id)s}'
            )
    
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)
    
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
    
        def _log(self, level: str, message: str, trace_id: Optional[str] = None, **kwargs):
            """Standardized log method"""
            extra = {
                'service': self.service_name,
                'trace_id': trace_id or 'no_trace',
                **kwargs
            }
    
            getattr(self.logger, level)(
                message,
                extra=extra
            )
    
        def info(self, message: str, **kwargs):
            self._log('info', message, **kwargs)
    
        def error(self, message: str, **kwargs):
            self._log('error', message, **kwargs)

    性能优化

    import logging
    from functools import lru_cache
    from typing import Dict, Any
    import queue
    import threading
    
    class PerformantLogger:
        def __init__(self):
            self.log_queue = queue.Queue(maxsize=1000)
            self.running = True
            self._start_worker()
    
        def _start_worker(self):
            """Start background worker for async logging"""
            def worker():
                while self.running:
                    try:
                        record = self.log_queue.get(timeout=1)
                        self._process_log(record)
                    except queue.Empty:
                        continue
    
            self.worker_thread = threading.Thread(target=worker, daemon=True)
            self.worker_thread.start()
    
        @lru_cache(maxsize=100)
        def _format_message(self, template: str, **kwargs) -> str:
            """Cache commonly used log message templates"""
            return template.format(**kwargs)
    
        def _process_log(self, record: Dict[str, Any]):
            """Process log record"""
            # Actual logging logic here
            pass
    
        def log(self, level: str, message: str, **kwargs):
            """Asynchronous logging method"""
            record = {
                'level': level,
                'message': self._format_message(message, **kwargs),
                'extra': kwargs
            }
    
            try:
                self.log_queue.put_nowait(record)
            except queue.Full:
                # Handle queue full scenario
                pass

    案例研究

    实际实施:电子商务平台

    import logging
    from datetime import datetime
    from typing import Dict, Any
    
    class EcommerceLogger:
        def __init__(self):
            self.logger = logging.getLogger('ecommerce')
            self._setup_logging()
    
        def _setup_logging(self):
            # Configure handlers for different components
            self._add_transaction_handler()
            self._add_inventory_handler()
            self._add_user_activity_handler()
    
        def _add_transaction_handler(self):
            handler = logging.FileHandler('transactions.log')
            handler.setFormatter(
                logging.Formatter(
                    '%(asctime)s - %(transaction_id)s - %(message)s'
                )
            )
            self.logger.addHandler(handler)
    
        def log_order(self, order_id: str, user_id: str, amount: float):
            """Log order processing"""
            self.logger.info(
                f"Processing order {order_id}",
                extra={
                    'transaction_id': order_id,
                    'user_id': user_id,
                    'amount': amount,
                    'timestamp': datetime.utcnow().isoformat()
                }
            )
    
    # Example usage
    class OrderProcessor:
        def __init__(self):
            self.logger = EcommerceLogger()
    
        def process_order(self, order_data: Dict[str, Any]):
            try:
                # Log order initiation
                self.logger.log_order(
                    order_data['order_id'],
                    order_data['user_id'],
                    order_data['amount']
                )
    
                # Process order...
    
                # Log success
                self.logger.logger.info(
                    f"Order {order_data['order_id']} processed successfully"
                )
    
            except Exception as e:
                self.logger.logger.error(
                    f"Order processing failed: {str(e)}",
                    exc_info=True
                )

    微服务架构示例

    import logging
    import json
    from typing import Dict, Any
    from datetime import datetime
    
    class MicroserviceLogger:
        def __init__(self, service_name: str):
            self.service_name = service_name
            self.logger = self._configure_logger()
    
        def _configure_logger(self):
            logger = logging.getLogger(self.service_name)
    
            # JSON formatter for structured logging
            formatter = logging.Formatter(
                json.dumps({
                    'timestamp': '%(asctime)s',
                    'service': '%(service_name)s',
                    'level': '%(levelname)s',
                    'message': '%(message)s',
                    'trace_id': '%(trace_id)s',
                    'additional_data': '%(additional_data)s'
                })
            )
    
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)
            logger.addHandler(handler)
    
            return logger
    
        def log(self, level: str, message: str, trace_id: str, **kwargs):
            """Log with distributed tracing context"""
            extra = {
                'service_name': self.service_name,
                'trace_id': trace_id,
                'additional_data': json.dumps(kwargs)
            }
    
            getattr(self.logger, level)(
                message,
                extra=extra
            )
    
    # Example usage in a microservice
    class PaymentService:
        def __init__(self):
            self.logger = MicroserviceLogger('payment-service')
    
        def process_payment(self, payment_data: Dict[str, Any], trace_id: str):
            self.logger.log(
                'info',
                'Processing payment',
                trace_id,
                payment_id=payment_data['id'],
                amount=payment_data['amount']
            )
    
            # Process payment...
    
            self.logger.log(
                'info',
                'Payment processed successfully',
                trace_id,
                payment_id=payment_data['id']
            )

    结论

    关键要点

  • 基础第一:从正确的基本配置开始
  • 设置适当的日志级别
  • 配置有意义的格式
  • 选择合适的处理程序
  • 结构化方法:使用结构化日志进行更好的分析
  • 一致的日志格式
  • 背景信息
  • 机器可解析的输出
  • 性能至关重要:优化生产日志记录
  • 实施日志轮换
  • 在需要时使用异步日志记录
  • 考虑采样策略
  • 安全意识:保护敏感信息,过滤敏感数据,实施适当的访问控制,遵守合规性要求
  • 实施检查表

    def logging_implementation_checklist() -> Dict[str, bool]:
        return {
            # Basic Setup
            "basic_configuration": True,
            "log_levels_defined": True,
            "handlers_configured": True,
    
            # Security
            "sensitive_data_filtered": True,
            "access_controls_implemented": True,
            "compliance_checked": True,
    
            # Performance
            "log_rotation_configured": True,
            "async_logging_implemented": True,
            "sampling_strategy_defined": True,
    
            # Monitoring
            "alerts_configured": True,
            "metrics_collected": True,
            "dashboards_created": True
        }

    其他资源

  • 官方文档:
  • Python 日志记录方法
  • 日志手册
  • 工具和库:
  • Loguru 文档
  • Structlog 文档
  • Python JSON 记录器
  • 本指南涵盖了 Python 日志记录的基本方面,从基本设置到高级实现。请记住,日志记录是应用程序可观察性和维护不可或缺的一部分。请谨慎实施并定期维护以获得最佳效果。

    随着应用程序的发展和新需求的出现,请记住定期检查和更新您的日志记录实现。