工作池使用通道停止的恐慌

问题描述 投票:0回答:1

我正在编写一个工作池实现,在停止

Worker
时遇到了问题。运行一个循环后我立即感到恐慌
for _, worker := range p.workers {...}
。添加
debug
后我开始感到恐慌,为什么这是我无法理解的。在此之前
workersShutdown
忽略了循环,我将 fmt.Print 进行调试,但代码不起作用,我设置了
defer func()
并恢复,什么也没有。添加了调试并能够得到恐慌?????????我无法弄清楚如何以及为什么。当我在测试中调用
Stop
方法来终止
Pool
时,我感到恐慌,下面是运行测试后的日志。

{"level":"info","service":"workerPool","workerPoolName":"test-pool","service":"Worker","method":"Start","message":"worker finished processing the task"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"Stop","message":"stop worker pool with all worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"shutdown","message":"shutdown worker pool"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop all workers"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"stop each worker"}
{"level":"debug","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","message":"worker count all: 8"}
{"level":"error","service":"workerPool","workerPoolName":"test-pool","method":"workersShutdown","error":"runtime error: invalid memory address or nil pointer dereference","message":"workersShutdown panic"}

如何重写

Pool.Stop()
Worker.Stop
方法,但仍然保证知道
worker
已完成?

矿池源代码:

// Option represents an option that can be passed when instantiating the work pool
type Option func(pool *Pool)

// CreateContext initializes the context and contextCancelFunc function for the pool.
func CreateContext(ctx context.Context) Option {
    return func(pool *Pool) {
        pool.ctx, pool.contextCancelFunc = context.WithCancel(ctx)
    }
}

// Pool worker controller
type Pool struct {
    // Parent context for the pool.
    parentCtx context.Context
    // Pool's context.
    ctx context.Context
    // Cancel function for the pool's context.
    contextCancelFunc context.CancelFunc
    // Name of the worker pool. More convenient for logging.
    name string
    // Channel to signal stopping the pool.
    stopCh chan struct{}
    // Channel for queueing jobs.
    collector chan *Job
    // Slice to hold worker instances.
    workers []*Worker
    // Maximum number of workers in the pool.
    maxWorkersCount int
    // Number of currently running workers.
    workerConcurrency int32
    // Timeout for individual workers.
    workerTimeOut time.Duration
    // Mutex for locking access to the pool.
    mutex sync.Mutex
    // Used for a one-time action (starting the pool).
    onceStart sync.Once
    // Used for a one-time action (stopping the pool).
    onceStop sync.Once
    // Wait group for tracking running workers.
    wg sync.WaitGroup
    // Logger for the pool.
    logger *zerolog.Logger

    stopped bool
}

// NewPool creates a new worker pool.
func NewPool(parentCtx context.Context, queue chan *Job, workerConcurrency int, name string, logger *zerolog.Logger, workerTimeOut time.Duration) *Pool {
    l := logger.With().Str("service", "workerPool").Str("workerPoolName", name).Logger()

    if workerConcurrency == 0 {
        workerConcurrency = runtime.NumCPU() * 2
    }

    pool := &Pool{
        parentCtx:       parentCtx,
        name:            name,
        stopCh:          make(chan struct{}, 1),
        collector:       queue,
        workers:         make([]*Worker, workerConcurrency),
        maxWorkersCount: workerConcurrency,
        workerTimeOut:   workerTimeOut,
        logger:          &l,
        stopped:         false,
    }

    CreateContext(context.Background())(pool)

    return pool
}

// Run starts the worker pool and worker goroutines. It creates and launches a specified number of worker goroutines,
// each of which is responsible for processing jobs from a shared collector. This method also continuously listens for stop signals
// or context cancellation and reacts accordingly, ensuring a clean and controlled shutdown of the worker pool.
func (p *Pool) Run() {
    p.onceStart.Do(func() {
        // Add one to the wait group for the main pool loop.
        p.wg.Add(1)
        // Start the main pool loop in a goroutine.
        go p.loop()
    })
}

func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
    l := p.logger.With().Str("method", "AddWorker").Logger()

    defer func() {
        // Recover from any panic in the job and report it.
        if rec := recover(); rec != nil {
            err = common.GetRecoverError(rec)
            if err != nil {
                l.Error().Err(err).Msgf("add worker panic, pool name: %s", p.name)
            }
        }
    }()

    if !p.incrementWorkerCount() {
        return fmt.Errorf("can't create more worker")
    }

    p.mutex.Lock()
    defer p.mutex.Unlock()
    worker := NewWorker(p.ctx, p, collector, workerID, workerTimeout, p.logger)
    // Add the worker to the workers slice.
    p.workers = append(p.workers, worker)
    // Add one to the wait group to track the new worker.
    p.wg.Add(1)
    go worker.Start(&p.wg)

    return nil
}

// loop is the main worker pool loop. It creates and launches worker goroutines, listens for stop signals,
// and ensures a clean and controlled shutdown of the worker pool.
func (p *Pool) loop() {
    l := p.logger.With().Str("method", "loop").Logger()

    defer func() {
        // Recover from any panic in the job and report it.
        if rec := recover(); rec != nil {
            err := common.GetRecoverError(rec)
            if err != nil {
                l.Error().Err(err).Msgf("pool catch panic, pool name: %s", p.name)
            }
        }

        p.wg.Done()
        // Cancel the pool's context.
        p.contextCancelFunc()
    }()

    for {
        select {
        case _, ok := <-p.stopCh:
            if !ok {
                l.Error().Msg("stop channel is close")
            }
            // This option suits us as it guarantees that the channel was closed when the Stop method was called,
            // which means that the pool should stop working.
            l.Info().Msg("stop pool")
            return

        case <-p.parentCtx.Done():
            // Log that the pool's context is closing.
            l.Info().Msg("parent context is close")
            // Acquire a read lock on the pool to prevent other modifications.
            p.mutex.Lock()
            // Trigger the pool shutdown sequence.
            p.shutdown()
            // Release the read lock.
            p.mutex.Unlock()
            return

        case <-p.ctx.Done():
            l.Info().Msg("context is close") // Log that the pool's context is closing.
            return
        }
    }
}

// RunningWorkers returns the number of running worker goroutines.
func (p *Pool) RunningWorkers() int {
    return int(atomic.LoadInt32(&p.workerConcurrency))
}

// incrementWorkerCount attempts to increment the worker count if it's below the maximum limit.
// It protects the worker count and associated wait group using a mutex to ensure
// thread-safety while managing the pool's worker count.
// If the current worker count is less than the maximum allowed workers, it increments
// the worker count and adds one to the wait group, signifying that a new worker is being started.
// If the maximum worker limit has been reached, it returns false to indicate that no more workers can be started.
// NOTE: Perhaps in the next iteration we will add simple workers to not create a large number of workers,
// but to make them flexible, to have a minimum number of workers,
// and a maximum number of workers to be able to automatically manage the pool of workers,
// without downtime or oversupply of workers.
func (p *Pool) incrementWorkerCount() bool {
    // Lock the mutex to protect the worker count and wait group.
    p.mutex.Lock()
    defer p.mutex.Unlock()
    // Get the current count of running workers.
    counter := p.RunningWorkers()

    if counter >= p.maxWorkersCount {
        // The maximum worker limit has been reached, no more workers can be started.
        return false
    }
    // Increment the worker counter.
    atomic.AddInt32(&p.workerConcurrency, 1)

    return true
}

func (p *Pool) decrementWorkerCount() {
    p.logger.Info().Msg("decrement worker count")
    atomic.AddInt32(&p.workerConcurrency, -1)
}

// Stop stops the worker pool.
func (p *Pool) Stop() {
    l := p.logger.With().Str("method", "Stop").Logger()
    l.Debug().Msg("stop worker pool with all worker")

    p.mutex.Lock()
    p.mutex.Unlock()
    defer func() {
        // Wait for all workers to finish.
        p.wg.Wait()
    }()

    p.shutdown()
    // Cancel the pool's context.
    p.contextCancelFunc()
    // Signal for stopping the pool.
    p.stopCh <- struct{}{}
    // Close the stop channel.
    close(p.stopCh)
    // Trigger the pool shutdown sequence.
}

// shutdown stops the worker pool.
func (p *Pool) shutdown() {
    l := p.logger.With().Str("method", "shutdown").Logger()
    l.Debug().Msg("shutdown worker pool")
    // Stop all workers.
    <-p.workersShutdown()
    return
}

// workersShutdown stops all worker goroutines.
func (p *Pool) workersShutdown() <-chan struct{} {
    l := p.logger.With().Str("method", "workersShutdown").Logger()
    l.Debug().Msg("stop all workers")

    doneCh := make(chan struct{})

    defer func() {
        if r := recover(); r != nil {
            l.Error().Err(common.GetRecoverError(r)).Msg("workersShutdown panic")
            return
        }
    }()

    // If we are already stopped, then there is nothing to do

    l.Debug().Msg("stop each worker")
    l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
    doneChs := make([]<-chan struct{}, p.workerConcurrency)
    for _, worker := range p.workers {
        l.Info().Msgf("stop worker: %d", worker.id)
        doneChs = append(doneChs, worker.Stop())
        l.Debug().Msgf("status: %d", worker.GetStatus())
    }

    go func() {
        l.Info().Msg("wait stop worker")
        // wait for all channels to be closed
        for _, ch := range doneChs {
            <-ch
            p.decrementWorkerCount()
        }
        close(doneCh)
    }()

    return doneCh
}

Worker源码:


// Status represents the current state of a worker.
type Status int

// The possible values for WorkerStatus.
const (
    WorkerIdle    Status = iota // Worker is waiting for a job.
    WorkerRunning               // Worker is running a job.
    WorkerStopped               // Worker has stopped.
)

type Worker struct {
    // The worker pool that this worker belongs to.
    pool *Pool

    // worker id
    id int64

    // The context of the parent worker pool.
    workerContext context.Context

    // A mutex used to protect the worker's state.
    mutex sync.Mutex

    // A channel used to signal to the worker that it should stop.
    stopCh chan struct{}

    // A channel from which the worker receives jobs.
    collector <-chan *Job

    // The job that the worker is currently processing.
    process *Job

    // The current status of the worker.
    status Status

    timeout time.Duration

    logger *zerolog.Logger
}

// NewWorker creates a new worker.
func NewWorker(parentCtx context.Context, pool *Pool, queue chan *Job, id int64, timeout time.Duration, logger *zerolog.Logger) *Worker {
    l := logger.With().Str("service", "Worker").Logger()
    return &Worker{
        pool:          pool,
        workerContext: parentCtx,
        collector:     queue,
        id:            id,
        timeout:       timeout,
        logger:        &l,
        stopCh:        make(chan struct{}, 1),
    }
}

// Start method is a goroutine that continuously extracts and processes tasks assigned to a worker, ensuring it's always ready to handle incoming tasks.
// We create a specific logger for the worker to track its activities.
// The method operates in a loop, constantly handling various scenarios to exit gracefully, such as receiving stop signals or context termination.
// When a task is available, the worker retrieves and logs it.
// The worker sets its status to 'WorkerRunning,' indicating active task processing.
// It executes the task using the Run method, passing worker-specific context.
// Any task execution errors are logged for debugging.
// After task completion, the worker resets its status to 'WorkerIdle,' indicating readiness for more tasks.
// A log message signals the task processing completion.
// This method guarantees continuous task processing while maintaining detailed logs for debugging and monitoring.
func (w *Worker) Start(wg *sync.WaitGroup) {
    // Create a logger for the worker.
    l := w.logger.With().Str("method", "Start").Logger()
    // Log that the worker is starting.
    l.Debug().Msg("worker runner")

    // As soon as a worker is created, it is necessarily in the status of WorkerIdle.
    w.setStatus(WorkerIdle)

    // Start a goroutine to process jobs.
    defer func() {
        if rec := recover(); rec != nil {
            err := common.GetRecoverError(rec)
            if err != nil {
                l.Error().Err(err).Msgf("worker pool panic: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
            }
        }

        w.setStatus(WorkerStopped)
        wg.Done()
    }()

    for {
        select {
        // If the worker receives a stop signal, exit the goroutine.
        case <-w.stopCh:
            // The worker has completed from outside, which means the worker pool completes the job.
            w.setStatus(WorkerStopped)
            l.Info().Msg("stop channel is close")
            return

        // If the parent context is done, exit the goroutine.
        case <-w.workerContext.Done():
            w.setStatus(WorkerStopped)
            l.Info().Msg("parent context is close")
            return
        default:
        }

        select {
        // If the worker receives a stop signal, exit the goroutine.
        // https://stackoverflow.com/questions/16105325/how-to-check-a-channel-is-closed-or-not-without-reading-it
        // I couldn't figure out why, but in some tests, I was getting an error that the channel is closed,
        // for what reason I don't know, so I added this check in case the channel is closed, I should clearly return an error in the log that the channel is closed.
        case _, ok := <-w.stopCh:
            if !ok {
                w.setStatus(WorkerStopped)
                l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
                return
            }

            w.setStatus(WorkerStopped)
            l.Error().Msgf("stop channel is close, workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
            return

        case <-w.workerContext.Done():
            w.setStatus(WorkerStopped)
            l.Info().Msg("parent context is close")
            return

        // If a job is available, process it.
        case task, ok := <-w.collector:
            if ok {
                if task != nil {
                    l.Info().Msgf("worker: [%d] get task: [%s]", w.id, task.name)
                    err := w.processTask(task)
                    if err != nil {
                        l.Error().Err(err).Msgf("failed run task: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
                    }

                    // Log that the worker finished processing the task.
                    l.Info().Msg("worker finished processing the task")
                }
            } else {
                // if the job channel is closed, there is no point in further work of the worker
                w.setStatus(WorkerStopped)
                l.Info().Msgf("job collector is close: workerID [%d], workerStatus [%d], workerPoolName [%s]", w.id, w.status, w.pool.name)
                return
            }
        }
    }

}

// processTask processes the given task.
func (w *Worker) processTask(task *Job) error {
    w.setStatus(WorkerRunning)
    w.process = task

    // Process the job.
    task.Run(w.timeout, w.id, w.pool.name)
    taskErr := task.GetError()
    if taskErr != nil {
        return taskErr
    }

    // Set the worker status to idle.
    w.process = nil
    w.setStatus(WorkerIdle)

    return nil
}

func (w *Worker) Stop() <-chan struct{} {
    l := w.logger.With().Str("method", "Stop").Logger()
    l.Debug().Msg("stop worker")

    defer func() {
        if r := recover(); r != nil {
            l.Error().Err(common.GetRecoverError(r)).Msg("stop worker panic")
            return
        }
    }()

    doneCh := make(chan struct{})
    go func() {
        w.mutex.Lock()
        defer w.mutex.Unlock()

        select {
        case <-w.stopCh:
            close(doneCh)
            close(w.errCh)
            return

        default:
            // Close the stopCh to signal to the worker that it should stop.
            // This will cause the worker to exit its loop and finish processing the current job.
            w.stopCh <- struct{}{}
            close(w.stopCh)
            close(w.errCh)
            // Set the worker status to stopped.
            w.setStatus(WorkerStopped)
            // If there is a task in processing at the time of worker termination, it must be terminated
            if w.process != nil {
                w.process.Stop()
            }

            close(doneCh)
        }
    }()

    return doneCh
}

// setStatus sets the worker's status.
func (w *Worker) setStatus(status Status) {
    // Lock the worker's mutex to protect its state.
    w.mutex.Lock()
    defer w.mutex.Unlock()

    // Set the worker's status.
    w.status = status
}

// GetStatus returns the current status of the worker.
func (w *Worker) GetStatus() Status {
    return w.status
}

测试用例代码:

t.Run("worker pool", func(t *testing.T) {
        counter := 0
        workerCount := runtime.NumCPU()
        mut := new(sync.Mutex)
        mockMainFunc := func(ctx context.Context, task *model.Task) {
            mut.Lock()
            defer mut.Unlock()
            counter++
            fmt.Printf("Main func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)
        }

        // Create a mock error function for the job.
        mockErrFunc := func(ctx context.Context, task *model.Task) {
            // Simulate error function logic here.
            fmt.Printf("Err func fileID: %d, volumeID: %d\n", task.FileID, task.VolumeID)
        }

        var wg sync.WaitGroup
        parentCtx := context.Background()

        logger := zerolog.New(os.Stdout)
        task := make(chan *Job, 100)
        pool := NewPool(parentCtx, task, workerCount, "test-pool", &logger, 3*time.Second)
        assert.Equal(t, 0, pool.RunningWorkers())
        go func() {
            pool.Run()
        }()

        for w := 1; w <= workerCount; w++ {
            _ = pool.AddWorker(task, int64(w), 3*time.Second)
        }

        time.Sleep(1 * time.Second)
        assert.Equal(t, workerCount, pool.RunningWorkers())

        for i := 0; i < 100; i++ {
            wg.Add(1)
            j := NewJob(parentCtx, &model.Task{FileID: 900000, VolumeID: int32(890 + i)}, -1*time.Second, fmt.Sprintf("job:%d", i))
            j.SetMainFunc(mockMainFunc)
            j.SetErrorFunc(mockErrFunc)
            j.SetWaitGroup(&wg)
            task <- j
        }

        time.Sleep(2 * time.Second)
        wg.Add(1)
        go func() {
            defer wg.Done()
            pool.Stop()
        }()
        wg.Wait()
        assert.Equal(t, 100, counter)
        fmt.Print("COUNT worker: ", pool.RunningWorkers())
    })
go channel goroutine
1个回答
0
投票

这个问题有点难以回答,因为您提供了相当多的代码,但它不完整(因此我们无法轻松地自己运行它) - 一个最小的、可重现的示例会让回答更容易。话虽如此,根据您提供的日志,问题出在两个日志条目之间:

l.Debug().Msgf("worker count all: %d", p.workerConcurrency)
    doneChs := make([]<-chan struct{}, p.workerConcurrency)
    for _, worker := range p.workers {
        l.Info().Msgf("stop worker: %d", worker.id)

这使得问题很可能是

nil
中存在
p.workers
条目。看看你如何初始化它,我们发现:

pool := &Pool{
 ...
    workers:         make([]*Worker, workerConcurrency),
...
}

并且添加项目:

func (p *Pool) AddWorker(collector chan *Job, workerID int64, workerTimeout time.Duration) (err error) {
...    l := p.logger.With().Str("method", "AddWorker").Logger()
    p.workers = append(p.workers, worker)
...

make
的语法是
make([]T, length, capacity)
。因此,您正在创建一个包含
workerConcurrency
元素的切片(将设置为默认值
nil
),然后
AddWorker
将新条目附加到切片的末尾。

通过一个基本示例可能最容易理解这一点:

func main() {
    a := 8
    x := make([]*int, 3)
    x = append(x, &a)
    fmt.Println(x) // sample output [<nil> <nil> <nil> 0xc000118000]
}

请注意,有三个

nil
条目,后面跟着添加了
append
的条目。

这样做的结果是,

worker
中的第一个
for _, worker := range p.workers {
将为零(导致访问
worker.id
时出现恐慌)。

最简单的解决方案是将

workers
初始化为
nil
(或
[]*Worker{}
,如果您愿意)。如果你真的想提前分配内存,你可以使用
make([]*Worker, 0, workerConcurrency)
(大小= 0,容量=workerConcurrency)。

注意:我并不是说这是唯一的问题(例如,它不是

Pool.Stop()
锁定互斥锁,然后在我怀疑您打算使用延迟的地方立即解锁它),但希望这能让您继续前进.

© www.soinside.com 2019 - 2024. All rights reserved.