相关的 RFC 章节提到了这一点:
RFC 7540
Stream identifiers cannot be reused. Long-lived connections can
result in an endpoint exhausting the available range of stream
identifiers. A client that is unable to establish a new stream
identifier can establish a new connection for new streams. A server
that is unable to establish a new stream identifier can send a GOAWAY
frame so that the client is forced to open a new connection for new
streams.
基于 http2 RFC,从客户端站点发出请求时,如果流 ID 大于 2^31,我们的 http2 库会将其视为无效的 strream id,并返回 errStreamId。但是在我们的http2库或者h2_hundle中好像没有对errStreamId做额外的处理。那么client如何才能知道streamID耗尽以便它可以请求新的连接呢?客户端或服务器无法关闭旧版连接,因为旧版流 ID 仍然可以保留。
是否有可能遇到 HTTP2 连接流 id 耗尽(超过 2^31)的问题。换句话说,errStreamID 将被触发。 像这样:
https://github.com/golang/net/blob/daac0cec0cf964a628a29bb4b82940c225b921ed/http2/frame.go#L1097
但不返回 errStreamID 给 werr,所以 werr(write err) 为 nil
https://github.com/golang/net/blob/daac0cec0cf964a628a29bb4b82940c225b921ed/http2/transport.go#L1637
所以我对如何处理 errStreamID 感到困惑,我尝试使用 http2 client/server echo 模块自己测试,其中 stream id 是 1 ,然后手动修改代码这样
func (f *Framer) WriteHeaders(p HeadersFrameParam) error {
//if !validStreamID(p.StreamID) && !f.AllowIllegalWrites {
// return errStreamID
//}
return errStreamID
我发现客户端没有通过使用 wireshark 的 capture fream 发送任何 tcp 连接请求,并且代码在 pipe.go 中的读取功能被阻止,也许我的观察是有限的。
客户使用: https://github.com/posener/h2conn/blob/master/example/echo/client.go#L35
golang如何处理errStreamId,为什么http2 lib会被屏蔽?可能 golang 的 http2 库不支持这个吗?如果要实现关于流 ID 耗尽的情况。应用程序级别必须自己维护流 ID,而不是依赖 http2 库,如果流 ID 用完,应用程序应该请求一个新的连接。
http2 lib 的下一个流 id 应该传输到应用程序级别,应用程序应该维护流 id,如果 applicaiotn 知道下一个流 id 大于 2^31,它将通过新端口请求新连接,以及传统的 http2(tcp) 连接应该保留。
这个答案集中在客户端。
在调用
(*Framer).WriteHeaders
(或*Framer
的其他方法)之前保证流id有效。
(*Transport).RoundTrip
让我们从
(*Transport).RoundTrip
开始,它可以被认为是发送请求的入口点。该方法依次调用(*Transport).RoundTripOpt
(全源)。
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
// ...unrelated code truncated...
for retry := 0; ; retry++ {
cc, err := t.connPool().GetClientConn(req, addr)
// ...unrelated code truncated...
res, err := cc.RoundTrip(req)
// ...unrelated code truncated...
return res, nil
}
}
我们将深入挖掘
t.connPool().GetClientConn(req, addr)
和res, err := cc.RoundTrip(req)
.
(*clientConnPool).getClientConn
(*clientConnPool).GetClientConn
依次调用(*clientConnPool).getClientConn
(full source):
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
// ...unrelated code truncated...
for {
p.mu.Lock()
for _, cc := range p.conns[addr] {
if cc.ReserveNewRequest() {
// ...unrelated code truncated...
p.mu.Unlock()
return cc, nil
}
}
// ...unrelated code truncated...
call := p.getStartDialLocked(req.Context(), addr)
p.mu.Unlock()
<-call.done
if shouldRetryDial(call, req) {
continue
}
cc, err := call.res, call.err
if err != nil {
return nil, err
}
if cc.ReserveNewRequest() {
return cc, nil
}
}
}
如果它可以从池中找到可用于发送请求的连接,则使用它。否则,拨一个新的。这是
(*ClientConn).ReserveNewRequest
(完整来源):
func (cc *ClientConn) ReserveNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
if st := cc.idleStateLocked(); !st.canTakeNewRequest {
return false
}
cc.streamsReserved++
return true
}
和
(*ClientConn).idleStateLocked
(完整来源):
func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
// ...unrelated code truncated...
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
!cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
return
}
在这里:
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32
。一个连接只有在它的nextStreamID
保证有效时才会被使用。
(*ClientConn).RoundTrip
现在让我们回到
(*ClientConn).RoundTrip
,看看流id是如何分配给clientStream
的。
调用顺序为:
(*ClientConn).RoundTrip
(完整来源)
func (cc *ClientConn) addStreamLocked(cs *clientStream) {
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.init(transportDefaultStreamFlow)
cs.ID = cc.nextStreamID
// ^^^^^^^^^^^^^^^^^^^^ <==== Here it is.
cc.nextStreamID += 2
cc.streams[cs.ID] = cs
if cs.ID == 0 {
panic("assigned stream ID 0")
}
}
流id从
(ClientConn).nextStreamID
复制,(ClientConn).nextStreamID
增加2
(Streams
由客户端发起的必须使用奇数流标识符)。
关注x/net/http源码,和@Zuke Lu的回答一样,再次感谢他的追踪码。 https://stackoverflow.com/a/76209075/21847706.
我发现errStreamID的错误可能很少从http2客户端触发,因为stream id只有在获得有效连接后才会被inc。
func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
// ...unrelated code truncated...
for retry := 0; ; retry++ {
cc, err := t.connPool().GetClientConn(req, addr)
// ...unrelated code truncated...
res, err := cc.RoundTrip(req)//only in this function, new stream id will be only inc.
// ...unrelated code truncated.
return res, nil
}
}
必须可以找到连接。所以进入函数 (*ClientConn).idleStateLocked.
func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
// ...unrelated code truncated...
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
!cc.doNotReuse &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
return
}
如果 st.canTakeNewRequest 在 MaxInt32 不能包含 streamid 时返回 false。然后代码将进入
func(p* clientConnPool) getStartDialLocked(ctx context.Context, addr string) * diaCall {
call := &dialCall{p, make(chan struct{}),ctx}
...
go call.dial(call.ctx, addr)
}
然后转到拨号功能
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
...
return t.newClientConn(tconn, singleUse)
}
所以调用了 newClientConnection,这个函数将创建一个新的 tcp 连接,因为将分配一个随机端口。
所以我再次感到困惑,客户端/服务器如何处理遗留的 tcp 连接。我认为应用层根据实际情况来决定如何使用 legacy tcp 连接。当发现遗留连接上没有负载时,服务器可能会关闭它。这只是一个猜测