我正在尝试 Go 并编写了一个应用程序来管理由工作协程处理的 HTTP 请求队列。
并发的东西似乎工作正常,但我在发回响应时收到此错误
http: wrote more than the declared Content-Length
。
完整代码如下:
package main
import (
"log"
"net/http"
"sync"
)
// Job represents a unit of work to be processed by a worker.
type Job struct {
r *http.Request // HTTP request to be processed
w http.ResponseWriter // Response writer to send the result
}
// Queue manages a list of jobs to be processed by workers.
type Queue struct {
jobs []*Job // List of jobs in the queue
mu sync.Mutex // Mutex to synchronize access to the queue
cond *sync.Cond // Condition variable for signaling
}
var q Queue // Global instance of the queue
var WORKER_POOL_SIZE = 4 // Number of workers
// Push adds a job to the queue.
func (q *Queue) Push(j *Job) {
q.mu.Lock()
defer q.mu.Unlock()
q.jobs = append(q.jobs, j)
q.cond.Signal() // Signal a waiting worker that a job is available
log.Println("Job added to queue")
}
// Pop retrieves and removes a job from the queue.
func (q *Queue) Pop() (*Job, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.jobs) == 0 {
q.cond.Wait() // If the queue is empty, wait for a signal
}
job := q.jobs[0]
q.jobs = q.jobs[1:]
log.Println("Job removed from queue")
return job, true
}
// handler adds a job to the queue.
func handler(w http.ResponseWriter, r *http.Request) {
// Create a job with the request and response.
job := &Job{r, w}
// Push the job onto the queue.
q.Push(job)
log.Println("Received request and added job to queue")
}
// init initializes the condition variable and starts worker goroutines.
func init() {
q.cond = sync.NewCond(&q.mu)
for i := 0; i < WORKER_POOL_SIZE; i++ {
go worker()
}
}
// worker processes jobs from the queue.
func worker() {
for {
job, ok := q.Pop()
if ok {
log.Println("Worker processing job")
doWork(job)
}
}
}
// doWork simulates processing a job and sends a response.
func doWork(job *Job) {
// Extract the "Name" parameter from the request query.
name := job.r.URL.Query().Get("Name")
// Check if the name is not empty.
if name != "" {
// Send the name as the response.
_, err := job.w.Write([]byte("Hello, " + name))
if err != nil {
log.Println("Error writing response:", err)
}
log.Println("Response sent: Hello,", name)
} else {
// If the "Name" parameter is missing or empty, send an error response.
http.Error(job.w, "Name parameter is missing or empty", http.StatusBadRequest)
log.Println("Error: Name parameter is missing or empty")
}
}
func main() {
http.HandleFunc("/addJob", handler)
log.Println("Server started and listening on :8080")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal("Error starting server:", err)
}
}
关于如何解决这个问题有什么想法吗?另外,由于我对 go 中的并发性很陌生,你认为它可以改进吗?谢谢!
http.Handler
文档:
返回信号表明请求完成;在 ServeHTTP 调用完成之后或同时,使用 ResponseWriter 或从 Request.Body 读取是无效的。
您的
handler
正在将请求/编写器推送到队列中,然后返回。这意味着您在处理程序返回后尝试写入ResponseWriter
,违反了上述规定(因为没有同步,写入也可能实际上发生在return
之前或同时发生)。
有很多方法可以解决这个问题;一种技术是:
// Job represents a unit of work to be processed by a worker.
type Job struct {
r *http.Request // HTTP request to be processed
w http.ResponseWriter // Response writer to send the result
done chan struct{} // Closed when write competed
}
...
func handler(w http.ResponseWriter, r *http.Request) {
// Create a job with the request and response.
job := &Job{r, w, make(chan struct{})}
// Push the job onto the queue.
q.Push(job)
log.Println("Received request and added job to queue")
<-job.done // Wait until job has been processed
}
...
// worker processes jobs from the queue.
func worker() {
for {
job, ok := q.Pop()
if ok {
log.Println("Worker processing job")
doWork(job)
close(job.done) // Notify requester that we are done
}
}
}