我正在编写一个工作池实现,在停止
Worker
时遇到了问题。运行一个循环后我立即感到恐慌for _, worker := range p.workers {...}
。添加debug
后我开始感到恐慌,为什么这是我无法理解的。在此之前 workersShutdown
忽略了循环,我将 fmt.Print 进行调试,但代码不起作用,我设置了 defer func()
并恢复,什么也没有。添加了调试并能够得到恐慌?????????我无法弄清楚如何以及为什么。当我在测试中调用 Stop
方法来终止 Pool
时,我感到恐慌,下面是运行测试后的日志。
{"level":"info","service":"workerPool","workerPoolName":"test-pool","service":"Worker","method":"Start","message":"worker finished processing the task"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"Stop","message":"stop worker pool with all worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"shutdown","message":"shutdown worker pool"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop all workers"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop each worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"worker count all: 8"}
{"level":"error","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","error":"runtime error: invalid memory address or nil pointer dereference","message":"workersShutdown panic"}
如何重写
Pool.Stop()
和 Worker.Stop
方法,但仍然保证知道 worker
已完成?
矿池源代码:
// Option represents an option that can be passed when instantiating the work pool
type Option func(pool *Pool)
// CreateContext initializes the context and contextCancelFunc function for the pool.
func CreateContext(ctx context.Context) Option {
return func(pool *Pool) {
pool.ctx, pool.contextCancelFunc = context.WithCancel(ctx)
}
}
// Pool worker controller
type Pool struct {
// Parent context for the pool.
parentCtx context.Context
// Pool's context.
ctx context.Context
// Cancel function for the pool's context.
contextCancelFunc context.CancelFunc
// Name of the worker pool. More convenient for logging.
name string
// Channel to signal stopping the pool.
stopCh chan struct{}
// Channel for queueing jobs.
collector chan *Job
// Slice to hold worker instances.
workers []*Worker
// Maximum number of workers in the pool.
maxWorkersCount int
// Number of currently running workers.
workerConcurrency int32
// Timeout for individual workers.
workerTimeOut time.Duration
// Mutex for locking access to the pool.
mutex sync.Mutex
// Used for a one-time action (starting the pool).
onceStart sync.Once
// Used for a one-time action (stopping the pool).
onceStop sync.Once
// Wait group for tracking running workers.
wg sync.WaitGroup
// Logger for the pool.
logger *zerolog.Logger
stopped bool
}
// NewPool creates a new worker pool.
func NewPool(parentCtx context.Context, queue chan *Job, workerConcurrency int, name string, logger *zerolog.Logger, workerTimeOut time.Duration) *Pool {
l := logger.With().Str("service", "workerPool").Str("workerPoolName", name).Logger()
if workerConcurrency == 0 {
workerConcurrency = runtime.NumCPU() * 2
}
pool := &Pool{
parentCtx: parentCtx,
name: name,
stopCh: make(chan struct{}, 1),
collector: queue,
workers: make([]*Worker, workerConcurrency),
maxWorkersCount: workerConcurrency,
workerTimeOut: workerTimeOut,
logger: &l,
stopped: false,
}
CreateContext(context.Background())(pool)
return pool
}
// Run starts the worker pool and worker goroutines. It creates and launches a specified number of worker goroutines,
// each of which is responsible for processing jobs from a shared collector. This method also continuously listens for stop signals
// or context cancellation and reacts accordingly, ensuring a clean and controlled shutdown of the worker pool.
func (p *Pool) Run() {
p.onceStart.Do(func() {
// Add one to the wait group for the main pool loop.
p.wg.Add(1)
// Start the main pool loop in a goroutine.
go p.loop()
})
}
func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
l := p.logger.With().Str("method", "AddWorker").Logger()
defer func() {
// Recover from any panic in the job and report it.
if rec := recover(); rec != nil {
err = common.GetRecoverError(rec)
if err != nil {
l.Error().Err(err).Msgf("add worker panic, pool name: %s", p.name)
}
}
}()
if !p.incrementWorkerCount() {
return fmt.Errorf("can't create more worker")
}
p.mutex.Lock()
defer p.mutex.Unlock()
worker := NewWorker(p.ctx, p, collector, workerID, workerTimeout, p.logger)
// Add the worker to the workers slice.
p.workers = append(p.workers, worker)
// Add one to the wait group to track the new worker.
p.wg.Add(1)
go worker.Start(&p.wg)
return nil
}
// loop is the main worker pool loop. It creates and launches worker goroutines, listens for stop signals,
// and ensures a clean and controlled shutdown of the worker pool.
func (p *Pool) loop() {
l := p.logger.With().Str("method", "loop").Logger()
defer func() {
// Recover from any panic in the job and report it.
if rec := recover(); rec != nil {
err := common.GetRecoverError(rec)
if err != nil {
l.Error().Err(err).Msgf("pool catch panic, pool name: %s", p.name)
}
}
p.wg.Done()
// Cancel the pool's context.
p.contextCancelFunc()
}()
for {
select {
case _, ok := <-p.stopCh:
if !ok {
l.Error().Msg("stop channel is close")
}
// This option suits us as it guarantees that the channel was closed when the Stop method was called,
// which means that the pool should stop working.
l.Info().Msg("stop pool")
return
case <-p.parentCtx.Done():
// Log that the pool's context is closing.
l.Info().Msg("parent context is close")
// Acquire a read lock on the pool to prevent other modifications.
p.mutex.Lock()
// Trigger the pool shutdown sequence.
p.shutdown()
// Release the read lock.
p.mutex.Unlock()
return
case <-p.ctx.Done():
l.Info().Msg("context is close") // Log that the pool's context is closing.
return
}
}
}
// RunningWorkers returns the number of running worker goroutines.
func (p *Pool) RunningWorkers() int {
return int(atomic.LoadInt32(&p.workerConcurrency))
}
// incrementWorkerCount attempts to increment the worker count if it's below the maximum limit.
// It protects the worker count and associated wait group using a mutex to ensure
// thread-safety while managing the pool's worker count.
// If the current worker count is less than the maximum allowed workers, it increments
// the worker count and adds one to the wait group, signifying that a new worker is being started.
// If the maximum worker limit has been reached, it returns false to indicate that no more workers can be started.
// NOTE: Perhaps in the next iteration we will add simple workers to not create a large number of workers,
// but to make them flexible, to have a minimum number of workers,
// and a maximum number of workers to be able to automatically manage the pool of workers,
// without downtime or oversupply of workers.
func (p *Pool) incrementWorkerCount() bool {
// Lock the mutex to protect the worker count and wait group.
p.mutex.Lock()
defer p.mutex.Unlock()
// Get the current count of running workers.
counter := p.RunningWorkers()
if counter >= p.maxWorkersCount {
// The maximum worker limit has been reached, no more workers can be started.
return false
}
// Increment the worker counter.
atomic.AddInt32(&p.workerConcurrency, 1)
return true
}
func (p *Pool) decrementWorkerCount() {
p.logger.Info().Msg("decrement worker count")
atomic.AddInt32(&p.workerConcurrency, -1)
}
// Stop stops the worker pool.
func (p *Pool) Stop() {
l := p.logger.With().Str("method", "Stop").Logger()
l.Debug().Msg("stop worker pool with all worker")
p.mutex.Lock()
p.mutex.Unlock()
defer func() {
// Wait for all workers to finish.
p.wg.Wait()
}()
p.shutdown()
// Cancel the pool's context.
p.contextCancelFunc()
// Signal for stopping the pool.
p.stopCh <- struct{}{}
// Close the stop channel.
close(p.stopCh)
// Trigger the pool shutdown sequence.
}
// shutdown stops the worker pool.
func (p *Pool) shutdown() {
l := p.logger.With().Str("method", "shutdown").Logger()
l.Debug().Msg("shutdown worker pool")
// Stop all workers.
<-p.workersShutdown()
return
}
// workersShutdown stops all worker goroutines.
func (p *Pool) workersShutdown() <-chan struct{} {
l := p.logger.With().Str("method", "workersShutdown").Logger()
l.Debug().Msg("stop all workers")
doneCh := make(chan struct{})
defer func() {
if r := recover(); r != nil {
l.Error().Err(common.GetRecoverError(r)).Msg("workersShutdown panic")
return
}
}()
// If we are already stopped, then there is nothing to do
l.Debug().Msg("stop each worker")
l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
doneChs := make([]<-chan struct{}, p.workerConcurrency)
for _, worker := range p.workers {
l.Info().Msgf("stop worker: %d", worker.id)
doneChs = append(doneChs, worker.Stop())
l.Debug().Msgf("status: %d", worker.GetStatus())
}
go func() {
l.Info().Msg("wait stop worker")
// wait for all channels to be closed
for _, ch := range doneChs {
<-ch
p.decrementWorkerCount()
}
close(doneCh)
}()
return doneCh
}
Worker源码:
// Status represents the current state of a worker.
type Status int
// The possible values for WorkerStatus.
const (
WorkerIdle Status = iota // Worker is waiting for a job.
WorkerRunning // Worker is running a job.
WorkerStopped // Worker has stopped.
)
type Worker struct {
// The worker pool that this worker belongs to.
pool *Pool
// worker id
id int64
// The context of the parent worker pool.
workerContext context.Context
// A mutex used to protect the worker's state.
mutex sync.Mutex
// A channel used to signal to the worker that it should stop.
stopCh chan struct{}
// A channel from which the worker receives jobs.
collector <-chan *Job
// The job that the worker is currently processing.
process *Job
// The current status of the worker.
status Status
timeout time.Duration
logger *zerolog.Logger
}
// NewWorker creates a new worker.
func NewWorker(parentCtx context.Context, pool *Pool, queue chan *Job, id int64, timeout time.Duration, logger *zerolog.Logger) *Worker {
l := logger.With().Str("service", "Worker").Logger()
return &Worker{
pool: pool,
workerContext: parentCtx,
collector: queue,
id: id,
timeout: timeout,
logger: &l,
stopCh: make(chan struct{}, 1),
}
}
// Start method is a goroutine that continuously extracts and processes tasks assigned to a worker, ensuring it's always ready to handle incoming tasks.
// We create a specific logger for the worker to track its activities.
// The method operates in a loop, constantly handling various scenarios to exit gracefully, such as receiving stop signals or context termination.
// When a task is available, the worker retrieves and logs it.
// The worker sets its status to 'WorkerRunning,' indicating active task processing.
// It executes the task using the Run method, passing worker-specific context.
// Any task execution errors are logged for debugging.
// After task completion, the worker resets its status to 'WorkerIdle,' indicating readiness for more tasks.
// A log message signals the task processing completion.
// This method guarantees continuous task processing while maintaining detailed logs for debugging and monitoring.
func (w *Worker) Start(wg *sync.WaitGroup) {
// Create a logger for the worker.
l := w.logger.With().Str("method", "Start").Logger()
// Log that the worker is starting.
l.Debug().Msg("worker runner")
// As soon as a worker is created, it is necessarily in the status of WorkerIdle.
w.setStatus(WorkerIdle)
// Start a goroutine to process jobs.
defer func() {
if rec := recover(); rec != nil {
err := common.GetRecoverError(rec)
if err != nil {
l.Error().Err(err).Msgf("worker pool panic: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
}
}
w.setStatus(WorkerStopped)
wg.Done()
}()
for {
select {
// If the worker receives a stop signal, exit the goroutine.
case <-w.stopCh:
// The worker has completed from outside, which means the worker pool completes the job.
w.setStatus(WorkerStopped)
l.Info().Msg("stop channel is close")
return
// If the parent context is done, exit the goroutine.
case <-w.workerContext.Done():
w.setStatus(WorkerStopped)
l.Info().Msg("parent context is close")
return
default:
}
select {
// If the worker receives a stop signal, exit the goroutine.
// https://stackoverflow.com/questions/16105325/how-to-check-a-channel-is-closed-or-not-without-reading-it
// I couldn't figure out why, but in some tests, I was getting an error that the channel is closed,
// for what reason I don't know, so I added this check in case the channel is closed, I should clearly return an error in the log that the channel is closed.
case _, ok := <-w.stopCh:
if !ok {
w.setStatus(WorkerStopped)
l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
return
}
w.setStatus(WorkerStopped)
l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
return
case <-w.workerContext.Done():
w.setStatus(WorkerStopped)
l.Info().Msg("parent context is close")
return
// If a job is available, process it.
case task, ok := <-w.collector:
if ok {
if task != nil {
l.Info().Msgf("worker: [%d] get task: [%s]", w.id, task.name)
err := w.processTask(task)
if err != nil {
l.Error().Err(err).Msgf("failed run task: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
}
// Log that the worker finished processing the task.
l.Info().Msg("worker finished processing the task")
}
} else {
// if the job channel is closed, there is no point in further work of the worker
w.setStatus(WorkerStopped)
l.Info().Msgf("job collector is close: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
return
}
}
}
}
// processTask processes the given task.
func (w *Worker) processTask(task *Job) error {
w.setStatus(WorkerRunning)
w.process = task
// Process the job.
task.Run(w.timeout, w.id, w.pool.name)
taskErr := task.GetError()
if taskErr != nil {
return taskErr
}
// Set the worker status to idle.
w.process = nil
w.setStatus(WorkerIdle)
return nil
}
func (w *Worker) Stop() <-chan struct{} {
l := w.logger.With().Str("method", "Stop").Logger()
l.Debug().Msg("stop worker")
defer func() {
if r := recover(); r != nil {
l.Error().Err(common.GetRecoverError(r)).Msg("stop worker panic")
return
}
}()
doneCh := make(chan struct{})
go func() {
w.mutex.Lock()
defer w.mutex.Unlock()
select {
case <-w.stopCh:
close(doneCh)
close(w.errCh)
return
default:
// Close the stopCh to signal to the worker that it should stop.
// This will cause the worker to exit its loop and finish processing the current job.
w.stopCh <- struct{}{}
close(w.stopCh)
close(w.errCh)
// Set the worker status to stopped.
w.setStatus(WorkerStopped)
// If there is a task in processing at the time of worker termination, it must be terminated
if w.process != nil {
w.process.Stop()
}
close(doneCh)
}
}()
return doneCh
}
// setStatus sets the worker's status.
func (w *Worker) setStatus(status Status) {
// Lock the worker's mutex to protect its state.
w.mutex.Lock()
defer w.mutex.Unlock()
// Set the worker's status.
w.status = status
}
// GetStatus returns the current status of the worker.
func (w *Worker) GetStatus() Status {
return w.status
}
测试用例代码:
t.Run("worker pool", func(t *testing.T) {
counter := 0
workerCount := runtime.NumCPU()
mut := new(sync.Mutex)
mockMainFunc := func(ctx context.Context, task *model.Task) {
mut.Lock()
defer mut.Unlock()
counter++
fmt.Printf("Main func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)
}
// Create a mock error function for the job.
mockErrFunc := func(ctx context.Context, task *model.Task) {
// Simulate error function logic here.
fmt.Printf("Err func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)
}
var wg sync.WaitGroup
parentCtx := context.Background()
logger := zerolog.New(os.Stdout)
task := make(chan *Job, 100)
pool := NewPool(parentCtx, task, workerCount, "test-pool", &logger, 3*time.Second)
assert.Equal(t, 0, pool.RunningWorkers())
go func() {
pool.Run()
}()
for w := 1; w <= workerCount; w++ {
_ = pool.AddWorker(task, int64(w), 3*time.Second)
}
time.Sleep(1 * time.Second)
assert.Equal(t, workerCount, pool.RunningWorkers())
for i := 0; i < 100; i++ {
wg.Add(1)
j := NewJob(parentCtx, &model.Task{FileID: 900000, VolumeID: int32(890 + i)}, -1*time.Second, fmt.Sprintf("job:%d", i))
j.SetMainFunc(mockMainFunc)
j.SetErrorFunc(mockErrFunc)
j.SetWaitGroup(&wg)
task <- j
}
time.Sleep(2 * time.Second)
wg.Add(1)
go func() {
defer wg.Done()
pool.Stop()
}()
wg.Wait()
assert.Equal(t, 100, counter)
fmt.Print("COUNT worker: ", pool.RunningWorkers())
})
这个问题有点难以回答,因为您提供了相当多的代码,但它不完整(因此我们无法轻松地自己运行它) - 一个最小的、可重现的示例会让回答更容易。话虽如此,根据您提供的日志,问题出在两个日志条目之间:
l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
doneChs := make([]<-chan struct{}, p.workerConcurrency)
for _, worker := range p.workers {
l.Info().Msgf("stop worker: %d", worker.id)
这使得问题很可能是
nil
中存在 p.workers
条目。看看你如何初始化它,我们发现:
pool := &Pool{
...
workers: make([]*Worker, workerConcurrency),
...
}
并且添加项目:
func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
... l := p.logger.With().Str("method", "AddWorker").Logger()
p.workers = append(p.workers, worker)
...
make
的语法是make([]T, length, capacity)
。因此,您正在创建一个包含 workerConcurrency
元素的切片(将设置为默认值 nil
),然后 AddWorker
将新条目附加到切片的末尾。
通过一个基本示例可能最容易理解这一点:
func main() {
a := 8
x := make([]*int, 3)
x = append(x, &a)
fmt.Println(x) // sample output [<nil> <nil> <nil> 0xc000118000]
}
请注意,有三个
nil
条目,后面跟着添加了 append
的条目。
这样做的结果是,
worker
中的第一个for _, worker := range p.workers {
将为零(导致访问worker.id
时出现恐慌)。
最简单的解决方案是将
workers
初始化为 nil
(或 []*Worker{}
,如果您愿意)。如果你真的想提前分配内存,你可以使用make([]*Worker, 0, workerConcurrency)
(大小= 0,容量=workerConcurrency)。
注意:我并不是说这是唯一的问题(例如,它不是
Pool.Stop()
锁定互斥锁,然后在我怀疑您打算使用延迟的地方立即解锁它),但希望这能让您继续前进.