我正在用golang构建主/从类型系统。当从站(称为racer)启动时,它将对主机发出ping命令,告诉主机已启动并准备接收数据,然后开始侦听端口。用于ping和侦听的端口是相同的,但请确保在ping之后关闭连接。我的用例是使用相同的端口进行ping和侦听,但是当我使用相同的端口侦听时出现bind: address already in use
错误。有时它起作用,有时却不起作用。我做错了吗?
main.go
package main
import (
"flag"
"log"
"strconv"
"time"
"github.com/goku321/line-racer/master"
"github.com/goku321/line-racer/model"
"github.com/goku321/line-racer/racer"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
nodeType := flag.String("nodeType", "master", "type of node: master/racer")
masterIP := flag.String("masterIP", "127.0.0.1", "ip address of master process")
racers := flag.Int("racers", 2, "number of racers")
racerID := flag.Int("racerID", 0, "unique racer id (0 <= id < number of racers")
laps := flag.Int("laps", 10, "number of lap")
ip := flag.String("ip", "127.0.0.1", "ip address of the node")
port := flag.String("port", "3000", "port to use")
flag.Parse()
_, err := strconv.ParseInt(*port, 10, 64)
if err != nil {
log.Fatalf("error parsing port number: %s", *port)
}
if *nodeType == "master" {
m := master.New(*ip, *port, *racers, *laps)
m.GenerateLaps()
go m.Listen()
m.WaitForRacers()
m.StartRace()
m.PrintLaps()
} else {
r := racer.New(*racerID, *ip, *port, *masterIP)
r.SignalMaster(&model.Message{Source: r.IPAddr + ":" + r.Port})
time.Sleep(time.Second*60)
r.ListenForNewLap()
}
}
racer.go(slave)
package racer
import (
"encoding/json"
"log"
"net"
"os"
"strconv"
"time"
"github.com/goku321/line-racer/model"
)
// Racer represents a racer
type Racer struct {
ID int
IPAddr string
Port string
Master string
Laps [][]model.Point
Status string
}
// New returns a new racer type
func New(id int, ip, port, masterIP string) *Racer {
return &Racer{
ID: id,
IPAddr: ip,
Port: port,
Status: "up",
Master: masterIP,
}
}
// func updateRacerID(r *Racer, id int) {
// r.ID = strconv.Itoa(id)
// }
// SignalMaster sends a signal to master process
// with its coordinates
func (r *Racer) SignalMaster(m *model.Message) {
laddr, err := net.ResolveTCPAddr("tcp", r.IPAddr+":"+r.Port)
if err != nil {
log.Fatalf("error resolving tcp address: %s, reason: %v", r.IPAddr+":"+r.Port, err)
}
raddr, err := net.ResolveTCPAddr("tcp", r.Master+":3000")
if err != nil {
log.Fatalf("error resolving tcp address: %v", err)
}
for {
conn, err := net.DialTCP("tcp", laddr, raddr)
if err != nil {
log.Printf("connecting to master, %v", err)
time.Sleep(time.Second * 5)
} else {
m.Type = "ready"
m.Source = strconv.Itoa(r.ID)
m.Dest = r.Master + ":3000"
err := json.NewEncoder(conn).Encode(&m)
if err != nil {
log.Fatalf("error communicating to master: %v", err)
}
if err = conn.Close(); err != nil {
log.Fatal("unable to close connection")
}
break
}
}
}
// SendPOSUpdate sends position updates to master every 50ms
func (r *Racer) SendPOSUpdate(m *model.Message) {
laddr, err := net.ResolveTCPAddr("tcp", "")
if err != nil {
log.Fatalf("error resolving tcp address: %s, reason: %v", r.IPAddr+":"+r.Port, err)
}
raddr, err := net.ResolveTCPAddr("tcp", r.Master+":3000")
if err != nil {
log.Fatalf("error resolving tcp address: %v", err)
}
for {
conn, err := net.DialTCP("tcp", laddr, raddr)
if err != nil {
log.Printf("racer %d: connecting to master, %v", r.ID, err)
time.Sleep(time.Second * 5)
} else {
if err = json.NewEncoder(conn).Encode(&m); err != nil {
log.Printf("racer %d: error communicating to master: %v", r.ID, err)
}
conn.Close()
break
}
}
}
// ListenForNewLap waits for master to get new coordinates
func (r *Racer) ListenForNewLap() {
ln, err := net.Listen("tcp", ":"+r.Port)
if err != nil {
log.Fatalf("racer %d: %v - %v", r.ID, err, time.Now())
}
log.Printf("racer %d: listening on %s:%s", r.ID, r.IPAddr, r.Port)
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
go handleConnection(conn, r)
}
}
func handleConnection(conn net.Conn, r *Racer) {
log.Printf("racer %d: new lap from master", r.ID)
var msg model.Message
err := json.NewDecoder(conn).Decode(&msg)
if err != nil {
log.Printf("racer %d: %v", r.ID, err)
}
// close connection here as message has already been received
conn.Close()
if msg.Type == "race" {
r.Laps = append(r.Laps, msg.Coordinates)
r.race(msg.Coordinates)
} else if msg.Type == "kill" {
log.Printf("racer %d: kill signal received. racer will terminate", r.ID)
r.printLaps()
os.Exit(0)
}
}
func (r *Racer) race(l []model.Point) {
log.Printf("racer %d: racing on lap %v", r.ID, l)
// add a check for invalid lap
m, c := l[r.ID].X, l[r.ID].Y
p := getStartingPoint(l)
log.Printf("racer %d: starting race from (%d, %d)", r.ID, p.X, p.Y)
for {
time.Sleep(time.Millisecond * 50)
p.X++
p.Y = (m * p.X) + c
m := &model.Message{
Source: strconv.Itoa(r.ID),
Dest: "127.0.0.1:3000",
Type: "pos",
Coordinates: []model.Point{p},
}
r.SendPOSUpdate(m)
}
}
func (r *Racer) printLaps() {
for k, v := range r.Laps {
log.Printf("racer %d: lap %d: %v", r.ID, k+1, v)
}
}
func getStartingPoint(x []model.Point) model.Point {
m1, c1, m2, c2 := x[0].X, x[0].Y, x[1].X, x[1].Y
sX := (c1 - c2) / (m2 - m1)
sY := ((m2 * c1) - (m1 * c2)) / (m2 - m1)
return model.New(sX, sY)
}
master.go
package master
import (
"encoding/json"
"log"
"math/rand"
"net"
"strconv"
"sync"
"time"
"github.com/goku321/line-racer/model"
)
var wg sync.WaitGroup
// Master manages all the racers
type Master struct {
IPAddr string
Port string
racersCount int
racers map[string]string
posUpdates []pos // can be a queue
laps []lap
lapsCount int
racerMutex sync.Mutex
posMutex sync.Mutex
}
// lap represent a single lap
type lap struct {
number int
pos []model.Point
start time.Time
end time.Time
timeElapsed int64
}
// pos represents position of a racer
type pos struct {
id string
model.Point
}
// NewLap returns a new lap
func NewLap(number int, pos []model.Point) *lap {
return &lap{
number: number,
pos: pos,
}
}
// New returns new master
func New(ip, port string, racersCount, lapsCount int) *Master {
return &Master{
IPAddr: ip,
Port: port,
racers: map[string]string{},
racersCount: racersCount,
laps: []lap{},
lapsCount: lapsCount,
}
}
// Listen starts listening on a port
func (m *Master) Listen() {
ln, err := net.Listen("tcp", ":"+m.Port)
if err != nil {
log.Fatal(err)
}
log.Printf("master listening on %s:%s", m.IPAddr, m.Port)
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
go handleConnection(conn, m)
}
}
func handleConnection(conn net.Conn, m *Master) {
defer conn.Close()
var msg model.Message
err := json.NewDecoder(conn).Decode(&msg)
if err != nil {
log.Print(err)
}
if msg.Type == "ready" {
id, err := strconv.Atoi(msg.Source)
if err != nil {
log.Fatal("invalid racer id")
}
if id < 0 || id >= m.racersCount {
log.Fatal("invalid racer id")
}
// register racer
m.registerRacer(msg.Source, conn.RemoteAddr().String())
log.Printf("racer %d connected - %v", id, time.Now())
} else if msg.Type == "pos" {
log.Printf("racer %s position update: (%d, %d)", msg.Source, msg.Coordinates[0].X, msg.Coordinates[0].Y)
m.updatePOS(msg.Source, msg.Coordinates[0])
}
}
// SendMessage sends a lap to racers
func (m *Master) SendMessage(racer string, msg model.Message) {
defer wg.Done()
laddr, err := net.ResolveTCPAddr("tcp", "")
if err != nil {
log.Fatalf("error resolving tcp address: %v", err)
}
raddr, err := net.ResolveTCPAddr("tcp", racer)
if err != nil {
log.Fatalf("error resolving tcp address: %s, reason: %v", racer, err)
}
for {
conn, err := net.DialTCP("tcp", laddr, raddr)
if err != nil {
log.Print("master: failed to establish connection to racer, retrying...", err)
time.Sleep(time.Second * 5)
} else {
// Send Lap
err := json.NewEncoder(conn).Encode(&msg)
if err != nil {
log.Printf("error sending lap to racer %s", racer)
}
break
}
}
}
// GenerateLaps generates 10 laps for n racers
func (m *Master) GenerateLaps() {
s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)
for i := 0; i < m.lapsCount; i++ {
l := []model.Point{}
for j := 0; j < m.racersCount; j++ {
p := model.New(r.Intn(50000), r.Intn(50000))
l = append(l, p)
}
lap := NewLap(i, l)
m.laps = append(m.laps, *lap)
}
}
func (m *Master) registerRacer(id string, r string) {
m.racers[id] = r
}
func (m *Master) updatePOS(id string, p model.Point) {
u := &pos{
id: id,
Point: p,
}
m.posMutex.Lock()
m.posUpdates = append(m.posUpdates, *u)
m.posMutex.Unlock()
}
// CalculateDistance constantly polls a slice
func (m *Master) CalculateDistance() {
for {
if len(m.posUpdates) >= 2 {
p1 := m.posUpdates[0]
p2 := m.posUpdates[1]
if p1.id == p2.id {
m.posMutex.Lock()
m.posUpdates = m.posUpdates[1:]
m.posMutex.Unlock()
} else {
m.posMutex.Lock()
m.posUpdates = m.posUpdates[2:]
m.posMutex.Unlock()
d := p1.Point.Distance(p2.Point)
if d > 10 {
// start a new lap
log.Print("distance exceeds 10 units")
break
}
}
}
}
}
// WaitForRacers waits infinitely for racers to get connected
func (m *Master) WaitForRacers() {
for {
if len(m.racers) == m.racersCount {
break
}
}
}
// StartRace inits race
func (m *Master) StartRace() {
for k, v := range m.laps {
wg.Add(m.racersCount)
start := time.Now()
for _, r := range m.racers {
lapMsg := model.NewMessage(m.IPAddr+":"+m.Port, r, v.pos)
lapMsg.Type = "race"
go m.SendMessage(r, lapMsg)
}
wg.Wait()
m.CalculateDistance()
end := time.Now()
m.updateLap(k, start, end)
// Clear update queue
}
m.SendKillMessage()
}
// SendKillMessage sends a kill message to all racers
func (m *Master) SendKillMessage() {
wg.Add(m.racersCount)
for _, r := range m.racers {
msg := model.NewMessage(m.IPAddr+":"+m.Port, r, []model.Point{})
msg.Type = "kill"
go m.SendMessage(r, msg)
}
wg.Wait()
}
// PrintLaps prints all the laps
func (m *Master) PrintLaps() {
for k, v := range m.laps {
log.Printf("%d %v %s %s %d", k+1, v.pos, v.start, v.end, v.timeElapsed)
}
}
// updateLap updates start and end time for a lap
func (m *Master) updateLap(index int, start, end time.Time) {
l := m.laps[index]
l.start = start
l.end = end
l.timeElapsed = end.Sub(start).Milliseconds()
m.laps[index] = l
}
某些细节因一个实现而异,但通常,在关闭TCP连接后,底层系统(通常是主机OS)必须将其保留一小段时间,以防出现重复的数据包。
此连接处于TIME_WAIT状态,可能会阻止该端口的进一步使用,除非您为主机操作系统提供了正确的基础设置,否则将无法创建新的侦听器。由于Go 1.5左右,因此Go在Linux上自动执行此操作:请参见issue 9929。您正在使用什么主机操作系统?