正在获取绑定:即使在golang中关闭连接后也已在使用地址

问题描述 投票:-1回答:1

我正在用golang构建主/从类型系统。当从站(称为racer)启动时,它将对主机发出ping命令,告诉主机已启动并准备接收数据,然后开始侦听端口。用于ping和侦听的端口是相同的,但请确保在ping之后关闭连接。我的用例是使用相同的端口进行ping和侦听,但是当我使用相同的端口侦听时出现bind: address already in use错误。有时它起作用,有时却不起作用。我做错了吗?

Complete Code

main.go

package main

import (
    "flag"
    "log"
    "strconv"
    "time"

    "github.com/goku321/line-racer/master"
    "github.com/goku321/line-racer/model"
    "github.com/goku321/line-racer/racer"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lshortfile)
    nodeType := flag.String("nodeType", "master", "type of node: master/racer")
    masterIP := flag.String("masterIP", "127.0.0.1", "ip address of master process")
    racers := flag.Int("racers", 2, "number of racers")
    racerID := flag.Int("racerID", 0, "unique racer id (0 <= id < number of racers")
    laps := flag.Int("laps", 10, "number of lap")
    ip := flag.String("ip", "127.0.0.1", "ip address of the node")
    port := flag.String("port", "3000", "port to use")
    flag.Parse()

_, err := strconv.ParseInt(*port, 10, 64)
if err != nil {
    log.Fatalf("error parsing port number: %s", *port)
}

if *nodeType == "master" {
    m := master.New(*ip, *port, *racers, *laps)
    m.GenerateLaps()
    go m.Listen()
    m.WaitForRacers()
    m.StartRace()
    m.PrintLaps()
} else {
    r := racer.New(*racerID, *ip, *port, *masterIP)
    r.SignalMaster(&model.Message{Source: r.IPAddr + ":" + r.Port})
    time.Sleep(time.Second*60)
    r.ListenForNewLap()
}

}

racer.go(slave)

package racer

import (
    "encoding/json"
    "log"
    "net"
    "os"
    "strconv"
    "time"

    "github.com/goku321/line-racer/model"
)

// Racer represents a racer
type Racer struct {
    ID     int
    IPAddr string
    Port   string
    Master string
    Laps   [][]model.Point
    Status string
}

// New returns a new racer type
func New(id int, ip, port, masterIP string) *Racer {
    return &Racer{
        ID:     id,
        IPAddr: ip,
        Port:   port,
        Status: "up",
        Master: masterIP,
    }
}

// func updateRacerID(r *Racer, id int) {
//  r.ID = strconv.Itoa(id)
// }

// SignalMaster sends a signal to master process
// with its coordinates
func (r *Racer) SignalMaster(m *model.Message) {
    laddr, err := net.ResolveTCPAddr("tcp", r.IPAddr+":"+r.Port)
    if err != nil {
        log.Fatalf("error resolving tcp address: %s, reason: %v", r.IPAddr+":"+r.Port, err)
    }

    raddr, err := net.ResolveTCPAddr("tcp", r.Master+":3000")
    if err != nil {
        log.Fatalf("error resolving tcp address: %v", err)
    }

    for {
        conn, err := net.DialTCP("tcp", laddr, raddr)
        if err != nil {
            log.Printf("connecting to master, %v", err)
            time.Sleep(time.Second * 5)
        } else {
            m.Type = "ready"
            m.Source = strconv.Itoa(r.ID)
            m.Dest = r.Master + ":3000"
            err := json.NewEncoder(conn).Encode(&m)
            if err != nil {
                log.Fatalf("error communicating to master: %v", err)
            }
            if err = conn.Close(); err != nil {
                log.Fatal("unable to close connection")
            }
            break
        }
    }
}

// SendPOSUpdate sends position updates to master every 50ms
func (r *Racer) SendPOSUpdate(m *model.Message) {
    laddr, err := net.ResolveTCPAddr("tcp", "")
    if err != nil {
        log.Fatalf("error resolving tcp address: %s, reason: %v", r.IPAddr+":"+r.Port, err)
    }

    raddr, err := net.ResolveTCPAddr("tcp", r.Master+":3000")
    if err != nil {
        log.Fatalf("error resolving tcp address: %v", err)
    }

    for {
        conn, err := net.DialTCP("tcp", laddr, raddr)
        if err != nil {
            log.Printf("racer %d: connecting to master, %v", r.ID, err)
            time.Sleep(time.Second * 5)
        } else {
            if err = json.NewEncoder(conn).Encode(&m); err != nil {
                log.Printf("racer %d: error communicating to master: %v", r.ID, err)
            }
            conn.Close()
            break
        }
    }
}

// ListenForNewLap waits for master to get new coordinates
func (r *Racer) ListenForNewLap() {
    ln, err := net.Listen("tcp", ":"+r.Port)
    if err != nil {
        log.Fatalf("racer %d: %v - %v", r.ID, err, time.Now())
    }

    log.Printf("racer %d: listening on %s:%s", r.ID, r.IPAddr, r.Port)

    for {
        conn, err := ln.Accept()
        if err != nil {
            log.Fatal(err)
        }

        go handleConnection(conn, r)
    }
}

func handleConnection(conn net.Conn, r *Racer) {
    log.Printf("racer %d: new lap from master", r.ID)

    var msg model.Message
    err := json.NewDecoder(conn).Decode(&msg)
    if err != nil {
        log.Printf("racer %d: %v", r.ID, err)
    }

    // close connection here as message has already been received
    conn.Close()

    if msg.Type == "race" {
        r.Laps = append(r.Laps, msg.Coordinates)
        r.race(msg.Coordinates)
    } else if msg.Type == "kill" {
        log.Printf("racer %d: kill signal received. racer will terminate", r.ID)
        r.printLaps()
        os.Exit(0)
    }
}

func (r *Racer) race(l []model.Point) {
    log.Printf("racer %d: racing on lap %v", r.ID, l)
    // add a check for invalid lap
    m, c := l[r.ID].X, l[r.ID].Y
    p := getStartingPoint(l)
    log.Printf("racer %d: starting race from (%d, %d)", r.ID, p.X, p.Y)

    for {
        time.Sleep(time.Millisecond * 50)
        p.X++
        p.Y = (m * p.X) + c
        m := &model.Message{
            Source:      strconv.Itoa(r.ID),
            Dest:        "127.0.0.1:3000",
            Type:        "pos",
            Coordinates: []model.Point{p},
        }
        r.SendPOSUpdate(m)
    }
}

func (r *Racer) printLaps() {
    for k, v := range r.Laps {
        log.Printf("racer %d: lap %d: %v", r.ID, k+1, v)
    }
}

func getStartingPoint(x []model.Point) model.Point {
    m1, c1, m2, c2 := x[0].X, x[0].Y, x[1].X, x[1].Y

    sX := (c1 - c2) / (m2 - m1)
    sY := ((m2 * c1) - (m1 * c2)) / (m2 - m1)

    return model.New(sX, sY)
}

master.go

package master

import (
    "encoding/json"
    "log"
    "math/rand"
    "net"
    "strconv"
    "sync"
    "time"

    "github.com/goku321/line-racer/model"
)

var wg sync.WaitGroup

// Master manages all the racers
type Master struct {
    IPAddr      string
    Port        string
    racersCount int
    racers      map[string]string
    posUpdates  []pos // can be a queue
    laps        []lap
    lapsCount   int
    racerMutex  sync.Mutex
    posMutex    sync.Mutex
}

// lap represent a single lap
type lap struct {
    number      int
    pos         []model.Point
    start       time.Time
    end         time.Time
    timeElapsed int64
}

// pos represents  position of a racer
type pos struct {
    id string
    model.Point
}

// NewLap returns a new lap
func NewLap(number int, pos []model.Point) *lap {
    return &lap{
        number: number,
        pos:    pos,
    }
}

// New returns new master
func New(ip, port string, racersCount, lapsCount int) *Master {
    return &Master{
        IPAddr:      ip,
        Port:        port,
        racers:      map[string]string{},
        racersCount: racersCount,
        laps:        []lap{},
        lapsCount:   lapsCount,
    }
}

// Listen starts listening on a port
func (m *Master) Listen() {
    ln, err := net.Listen("tcp", ":"+m.Port)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("master listening on %s:%s", m.IPAddr, m.Port)

    for {
        conn, err := ln.Accept()
        if err != nil {
            log.Fatal(err)
        }

        go handleConnection(conn, m)
    }
}

func handleConnection(conn net.Conn, m *Master) {
    defer conn.Close()

    var msg model.Message
    err := json.NewDecoder(conn).Decode(&msg)
    if err != nil {
        log.Print(err)
    }

    if msg.Type == "ready" {
        id, err := strconv.Atoi(msg.Source)
        if err != nil {
            log.Fatal("invalid racer id")
        }
        if id < 0 || id >= m.racersCount {
            log.Fatal("invalid racer id")
        }
        // register racer
        m.registerRacer(msg.Source, conn.RemoteAddr().String())

        log.Printf("racer %d connected - %v", id, time.Now())

    } else if msg.Type == "pos" {
        log.Printf("racer %s position update: (%d, %d)", msg.Source, msg.Coordinates[0].X, msg.Coordinates[0].Y)
        m.updatePOS(msg.Source, msg.Coordinates[0])
    }
}

// SendMessage sends a lap to racers
func (m *Master) SendMessage(racer string, msg model.Message) {
    defer wg.Done()
    laddr, err := net.ResolveTCPAddr("tcp", "")
    if err != nil {
        log.Fatalf("error resolving tcp address: %v", err)
    }

    raddr, err := net.ResolveTCPAddr("tcp", racer)
    if err != nil {
        log.Fatalf("error resolving tcp address: %s, reason: %v", racer, err)
    }

    for {
        conn, err := net.DialTCP("tcp", laddr, raddr)
        if err != nil {
            log.Print("master: failed to establish connection to racer, retrying...", err)
            time.Sleep(time.Second * 5)
        } else {
            // Send Lap
            err := json.NewEncoder(conn).Encode(&msg)
            if err != nil {
                log.Printf("error sending lap to racer %s", racer)
            }
            break
        }
    }
}

// GenerateLaps generates 10 laps for n racers
func (m *Master) GenerateLaps() {
    s := rand.NewSource(time.Now().UnixNano())
    r := rand.New(s)

    for i := 0; i < m.lapsCount; i++ {
        l := []model.Point{}
        for j := 0; j < m.racersCount; j++ {
            p := model.New(r.Intn(50000), r.Intn(50000))
            l = append(l, p)
        }
        lap := NewLap(i, l)
        m.laps = append(m.laps, *lap)
    }
}

func (m *Master) registerRacer(id string, r string) {
    m.racers[id] = r
}

func (m *Master) updatePOS(id string, p model.Point) {
    u := &pos{
        id:    id,
        Point: p,
    }
    m.posMutex.Lock()
    m.posUpdates = append(m.posUpdates, *u)
    m.posMutex.Unlock()
}

// CalculateDistance constantly polls a slice
func (m *Master) CalculateDistance() {
    for {
        if len(m.posUpdates) >= 2 {
            p1 := m.posUpdates[0]
            p2 := m.posUpdates[1]
            if p1.id == p2.id {
                m.posMutex.Lock()
                m.posUpdates = m.posUpdates[1:]
                m.posMutex.Unlock()
            } else {
                m.posMutex.Lock()
                m.posUpdates = m.posUpdates[2:]
                m.posMutex.Unlock()
                d := p1.Point.Distance(p2.Point)

                if d > 10 {
                    // start a new lap
                    log.Print("distance exceeds 10 units")
                    break
                }
            }
        }
    }
}

// WaitForRacers waits infinitely for racers to get connected
func (m *Master) WaitForRacers() {
    for {
        if len(m.racers) == m.racersCount {
            break
        }
    }
}

// StartRace inits race
func (m *Master) StartRace() {
    for k, v := range m.laps {
        wg.Add(m.racersCount)

        start := time.Now()
        for _, r := range m.racers {
            lapMsg := model.NewMessage(m.IPAddr+":"+m.Port, r, v.pos)
            lapMsg.Type = "race"
            go m.SendMessage(r, lapMsg)
        }
        wg.Wait()
        m.CalculateDistance()
        end := time.Now()

        m.updateLap(k, start, end)
        // Clear update queue
    }
    m.SendKillMessage()
}

// SendKillMessage sends a kill message to all racers
func (m *Master) SendKillMessage() {
    wg.Add(m.racersCount)
    for _, r := range m.racers {
        msg := model.NewMessage(m.IPAddr+":"+m.Port, r, []model.Point{})
        msg.Type = "kill"
        go m.SendMessage(r, msg)
    }
    wg.Wait()
}

// PrintLaps prints all the laps
func (m *Master) PrintLaps() {
    for k, v := range m.laps {
        log.Printf("%d %v %s %s %d", k+1, v.pos, v.start, v.end, v.timeElapsed)
    }
}

// updateLap updates start and end time for a lap
func (m *Master) updateLap(index int, start, end time.Time) {
    l := m.laps[index]
    l.start = start
    l.end = end
    l.timeElapsed = end.Sub(start).Milliseconds()
    m.laps[index] = l
}

go tcp goroutine
1个回答
1
投票

某些细节因一个实现而异,但通常,在关闭TCP连接后,底层系统(通常是主机OS)必须将其保留一小段时间,以防出现重复的数据包。

此连接处于TIME_WAIT状态,可能会阻止该端口的进一步使用,除非您为主机操作系统提供了正确的基础设置,否则将无法创建新的侦听器。由于Go 1.5左右,因此Go在Linux上自动执行此操作:请参见issue 9929。您正在使用什么主机操作系统?

© www.soinside.com 2019 - 2024. All rights reserved.