如何在asynq中高效地传达客户端依赖关系?

问题描述 投票:0回答:1

我正在尝试在我的项目中实现 Asynq,这是一个著名的 Golang 作业调度程序,目前我对缺乏有关任务本身内部可能的具体场景的文档感到非常困惑。请注意,互联网上甚至没有任何痕迹,这让我觉得我解决这个问题的方式可能是非常错误的。

通常,我会在

main.go
中设置一个数据库连接,并通过依赖注入与所有内容进行通信。

clients := services.Clients{
  Db:           dbClient,
  Redis:        redisClient,
  // and many more
}

// In my case I use GraphQL to handle requests
srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: &resolvers.Resolver{
  Services: &catalogue, // some services I need to use depending the request
  Clients:  &clients, // here we communicate the database, etc.
}}))

如您所见,我实际上可以将

clients
传达给我的请求处理程序(解析器),并在那里过着幸福的生活,查询数据库。

设置Asynq时,客户端是这样的

asynqClient := asynq.NewClient(asynq.RedisClientOpt{
  Addr:     os.Getenv("REDIS_ENDPOINT"),
  DB:       0, // will use the default one
  Password: os.Getenv("REDIS_PASSWORD"),
})
defer asynqClient.Close()

tsk, err := tasks.NewPing("pong")
if err != nil {
    fmt.Printf("%s", err)
    panic(err)
}
_, err = asynqClient.Enqueue(tsk)
if err != nil {
    fmt.Printf("%s", err)
    panic(err)
}

我已经在

tasks.go

中提取了ping代码
package tasks

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypePing = "misc:ping"
)

type pingTaskPayload struct {
    Test string
}

func NewPing(test string) (*asynq.Task, error) {
    payload, err := json.Marshal(pingTaskPayload{Test: test})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypePing, payload), nil
}

func HandlePing(ctx context.Context, t *asynq.Task) error {
    var p pingTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Ping received with arguments: %s", p.Test)
    return nil
}

您对其进行编组,它会通过 Redis 并在另一侧被捕获。

asynqServer := asynq.NewServer(
  asynq.RedisClientOpt{
    Addr:     os.Getenv("REDIS_ENDPOINT"),
    DB:       0, // will use the default one
    Password: os.Getenv("REDIS_PASSWORD"),
  },
  asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypePing, tasks.HandlePing)
go asynqServer.Run(mux)

如您所见,任何地方都没有空间可以注入任何东西。依赖项在哪里?为什么它不建议以某种方式将其传达给任务?大家都是怎么用的呢? 入门从未引用任何依赖项。

理想情况下,我想使用

mux.HandleFunc
并为
tasks.HandlePing

提供一堆客户端(例如数据库连接)

目前,我看到的唯一“可行”的解决方案是在

main
中设置一个全局变量,以便从系统中的任何位置获取我的服务器,但我不想这样做。我想要一个清晰的依赖注入模式。

如何以干净的方式将我的依赖项(

clients
,包括数据库)传达给 Asynq?我避免在这里设置全局是错误的吗?

我已经广泛搜索了几个小时。就像没有人问过自己传递依赖项这个问题一样,而且这个库非常有名,所以我可能在某种程度上做错了什么。

go
1个回答
0
投票

这比我想象的要简单得多,我只需将客户端传递到我创建的

Tasks
结构中即可。

# in /tasks/
package tasks

import (
    "aquiestoy/pkg/mailer"
    "aquiestoy/pkg/tracking"

    "github.com/hibiken/asynq"
    "github.com/redis/go-redis/v9"
    "gorm.io/gorm"
)

type Clients struct {
    Db       *gorm.DB
    Redis    *redis.Client
    Tracking *tracking.Tracking
    Mailer   *mailer.Mailer
    Asynq    *asynq.Client
}

type Tasks struct {
    Clients *Clients
}

然后在

main.go

tsks := tasks.Tasks{
  Clients: &tasks.Clients{
    Db:       dbClient,
    Redis:    redisClient,
    Tracking: tkClient,
    Mailer:   mailClient,
    Asynq:    asynqClient,
  },
}

tsk, err := tsks.NewPing("pong")
if err != nil {
  fmt.Printf("%s", err)
  panic(err)
}
_, err = asynqClient.Enqueue(tsk)
if err != nil {
  fmt.Printf("%s", err)
  panic(err)
}

// NOTE : this should eventually be separated from the client
asynqServer := asynq.NewServer(
  asynq.RedisClientOpt{
    Addr:     os.Getenv("REDIS_ENDPOINT"),
    DB:       0, // will use the default one
    Password: os.Getenv("REDIS_PASSWORD"),
  },
  asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypePing, tsks.HandlePing)
go asynqServer.Run(mux)

也可以将其包装在

NewTasks
中以达到良好的效果,但这对我来说有效。

© www.soinside.com 2019 - 2024. All rights reserved.