Go 中的 Redis 队列和 Cron
在本教程中,我们将与队列交互并将其放入 Redis 服务器
使用`github.com/hibiken/asynq`包并为
使用 `github.com/robfig/cron` 包执行计划任务。此分步说明
指南解释了如何设置队列、安排任务以及处理优雅
关闭。
初始化模块
首先为项目创建一个新的 Go 模块:
go mod init learn_queue_and_cron
创建 cron.go
`cron.go` 文件负责在特定时间安排和运行任务
间隔。以下是实现:
package main
import (
"fmt"
"log"
"time"
"github.com/robfig/cron/v3"
)
func runCron(c *cron.Cron) {
// Schedule a task to run every minute
_, err := c.AddFunc("@every 1m", func() {
fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
})
if err != nil {
log.Fatal(err)
}
// Start the cron scheduler
c.Start()
log.Println("Cron scheduler started")
// Keep the main goroutine running
select {}
}此代码安排任务每分钟运行一次,并保持应用程序运行
确保调度程序持续工作。
创建queue.go
`queue.go` 文件使用 Asynq 管理任务处理。代码如下:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func runQueue(server *asynq.Server) {
mux := asynq.NewServeMux()
mux.HandleFunc("send_email", emailHandler)
mux.HandleFunc("generate_report", reportHandler)
if err := server.Run(mux); err != nil {
log.Fatalf("Failed to run Asynq server: %v", err)
}
}
func emailHandler(ctx context.Context, task *asynq.Task) error {
var payload struct {
To string `json:"to"`
}
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
fmt.Printf("Sending email to: %s\n", payload.To)
return nil
}
func reportHandler(ctx context.Context, task *asynq.Task) error {
var payload struct {
ReportID int `json:"report_id"`
}
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %w", err)
}
fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
return nil
}解释
创建 router.go
`router.go` 文件设置 HTTP 端点来排队任务:
package main
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"github.com/hibiken/asynq"
)
func setupRouter(client *asynq.Client) *gin.Engine {
r := gin.Default()
r.POST("/enqueue/email", func(c *gin.Context) {
var payload struct {
To string `json:"to"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
return
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
return
}
task := asynq.NewTask("send_email", jsonPayload)
_, err = client.Enqueue(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
})
r.POST("/enqueue/report", func(c *gin.Context) {
var payload struct {
ReportID int `json:"report_id"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
return
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
return
}
task := asynq.NewTask("generate_report", jsonPayload)
_, err = client.Enqueue(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
})
return r
}此代码使用 Gin 框架公开两个用于排队任务的端点。
创建main.go
`main.go` 文件将所有内容整合在一起:
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/hibiken/asynq"
"github.com/robfig/cron/v3"
)
func main() {
c := cron.New()
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Concurrency: 10,
},
)
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
router := setupRouter(client)
httpServer := &http.Server{
Addr: ":8080",
Handler: router,
}
// Prepare shutdown context
ctx, stop := context.WithCancel(context.Background())
defer stop()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
go runQueue(server)
go runCron(c)
go func() {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Failed to run HTTP server: %v", err)
}
}()
appShutdown(ctx, httpServer, c, server, quit)
}
func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
// Wait for termination signal
<-quit
log.Println("Shutting down gracefully...")
httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
defer httpCancel()
if err := httpServer.Shutdown(httpCtx); err != nil {
log.Printf("HTTP server shutdown error: %v", err)
}
server.Shutdown()
c.Stop()
log.Println("Application stopped")
}该文件结合了队列、cron、HTTP 服务器和关闭逻辑。
安装依赖项
安装所有必需的依赖项:
go mod tidy
构建并运行应用程序
使用以下方法构建并运行应用程序:
go build -o run *.go && ./run
测试应用程序
访问以下端点来加入任务: