Go 中的 Redis 队列和 Cron

在本教程中,我们将与队列交互并将其放入 Redis 服务器

使用`github.com/hibiken/asynq`包并为

使用 `github.com/robfig/cron` 包执行计划任务。此分步说明

指南解释了如何设置队列、安排任务以及处理优雅

关闭。

初始化模块

首先为项目创建一个新的 Go 模块:

go mod init learn_queue_and_cron

创建 cron.go

`cron.go` 文件负责在特定时间安排和运行任务

间隔。以下是实现:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}

此代码安排任务每分钟运行一次,并保持应用程序运行

确保调度程序持续工作。

创建queue.go

`queue.go` 文件使用 Asynq 管理任务处理。代码如下:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}

解释

  • 处理程序:emailHandler 和 reportHandler 通过解析其有效负载并执行相应的操作来处理任务。
  • 任务队列:“send_email”和“generate_report”等任务通过 Asynq 的任务队列定义和处理。
  • 创建 router.go

    `router.go` 文件设置 HTTP 端点来排队任务:

    package main
    
    import (
        "encoding/json"
        "net/http"
    
        "github.com/gin-gonic/gin"
        "github.com/hibiken/asynq"
    )
    
    func setupRouter(client *asynq.Client) *gin.Engine {
        r := gin.Default()
    
        r.POST("/enqueue/email", func(c *gin.Context) {
            var payload struct {
                To string `json:"to"`
            }
            if err := c.ShouldBindJSON(&payload); err != nil {
                c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
                return
            }
    
            jsonPayload, err := json.Marshal(payload)
            if err != nil {
                c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
                return
            }
    
            task := asynq.NewTask("send_email", jsonPayload)
            _, err = client.Enqueue(task)
            if err != nil {
                c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
                return
            }
    
            c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
        })
    
        r.POST("/enqueue/report", func(c *gin.Context) {
            var payload struct {
                ReportID int `json:"report_id"`
            }
            if err := c.ShouldBindJSON(&payload); err != nil {
                c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
                return
            }
    
            jsonPayload, err := json.Marshal(payload)
            if err != nil {
                c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
                return
            }
    
            task := asynq.NewTask("generate_report", jsonPayload)
            _, err = client.Enqueue(task)
            if err != nil {
                c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
                return
            }
    
            c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
        })
    
        return r
    }

    此代码使用 Gin 框架公开两个用于排队任务的端点。

    创建main.go

    `main.go` 文件将所有内容整合在一起:

    package main
    
    import (
        "context"
        "log"
        "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
    
        "github.com/hibiken/asynq"
        "github.com/robfig/cron/v3"
    )
    
    func main() {
        c := cron.New()
    
        server := asynq.NewServer(
            asynq.RedisClientOpt{Addr: "localhost:6379"},
            asynq.Config{
                Concurrency: 10,
            },
        )
    
        client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
        defer client.Close()
    
        router := setupRouter(client)
    
        httpServer := &http.Server{
            Addr:    ":8080",
            Handler: router,
        }
    
        // Prepare shutdown context
        ctx, stop := context.WithCancel(context.Background())
        defer stop()
        quit := make(chan os.Signal, 1)
        signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
    
        go runQueue(server)
        go runCron(c)
        go func() {
            if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
                log.Fatalf("Failed to run HTTP server: %v", err)
            }
        }()
    
        appShutdown(ctx, httpServer, c, server, quit)
    }
    
    func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
        // Wait for termination signal
        <-quit
        log.Println("Shutting down gracefully...")
    
        httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
        defer httpCancel()
        if err := httpServer.Shutdown(httpCtx); err != nil {
            log.Printf("HTTP server shutdown error: %v", err)
        }
    
        server.Shutdown()
        c.Stop()
    
        log.Println("Application stopped")
    }

    该文件结合了队列、cron、HTTP 服务器和关闭逻辑。

    安装依赖项

    安装所有必需的依赖项:

    go mod tidy

    构建并运行应用程序

    使用以下方法构建并运行应用程序:

    go build -o run *.go && ./run

    测试应用程序

    访问以下端点来加入任务:

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report