x/net/http2:如果 Stream ID 耗尽会发生什么以及触发 errStreamID 错误时的操作

问题描述 投票:0回答:2

相关的 RFC 章节提到了这一点:

RFC 7540
Stream identifiers cannot be reused. Long-lived connections can
result in an endpoint exhausting the available range of stream
identifiers. A client that is unable to establish a new stream
identifier can establish a new connection for new streams. A server
that is unable to establish a new stream identifier can send a GOAWAY
frame so that the client is forced to open a new connection for new
streams.

基于 http2 RFC,从客户端站点发出请求时,如果流 ID 大于 2^31,我们的 http2 库会将其视为无效的 strream id,并返回 errStreamId。但是在我们的http2库或者h2_hundle中好像没有对errStreamId做额外的处理。那么client如何才能知道streamID耗尽以便它可以请求新的连接呢?客户端或服务器无法关闭旧版连接,因为旧版流 ID 仍然可以保留。

是否有可能遇到 HTTP2 连接流 id 耗尽(超过 2^31)的问题。换句话说,errStreamID 将被触发。 像这样:

https://github.com/golang/net/blob/daac0cec0cf964a628a29bb4b82940c225b921ed/http2/frame.go#L1097

但不返回 errStreamID 给 werr,所以 werr(write err) 为 nil

https://github.com/golang/net/blob/daac0cec0cf964a628a29bb4b82940c225b921ed/http2/transport.go#L1637

所以我对如何处理 errStreamID 感到困惑,我尝试使用 http2 client/server echo 模块自己测试,其中 stream id 是 1 ,然后手动修改代码这样

func (f *Framer) WriteHeaders(p HeadersFrameParam) error {
    //if !validStreamID(p.StreamID) && !f.AllowIllegalWrites {
    //  return errStreamID
    //}
    return errStreamID

我发现客户端没有通过使用 wireshark 的 capture fream 发送任何 tcp 连接请求,并且代码在 pipe.go 中的读取功能被阻止,也许我的观察是有限的。

客户使用: https://github.com/posener/h2conn/blob/master/example/echo/client.go#L35

enter image description here

golang如何处理errStreamId,为什么http2 lib会被屏蔽?可能 golang 的 http2 库不支持这个吗?如果要实现关于流 ID 耗尽的情况。应用程序级别必须自己维护流 ID,而不是依赖 http2 库,如果流 ID 用完,应用程序应该请求一个新的连接。

http2 lib 的下一个流 id 应该传输到应用程序级别,应用程序应该维护流 id,如果 applicaiotn 知道下一个流 id 大于 2^31,它将通过新端口请求新连接,以及传统的 http2(tcp) 连接应该保留。

go http2
2个回答
0
投票

这个答案集中在客户端。

在调用

(*Framer).WriteHeaders
(或
*Framer
的其他方法)之前保证流id有效。

入口点:
(*Transport).RoundTrip

让我们从

(*Transport).RoundTrip
开始,它可以被认为是发送请求的入口点。该方法依次调用
(*Transport).RoundTripOpt
全源)。

func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {

    // ...unrelated code truncated...
    for retry := 0; ; retry++ {
        cc, err := t.connPool().GetClientConn(req, addr)
        
        // ...unrelated code truncated...
        res, err := cc.RoundTrip(req)
        // ...unrelated code truncated...
        return res, nil
    }
}

我们将深入挖掘

t.connPool().GetClientConn(req, addr)
res, err := cc.RoundTrip(req)
.

连接池:
(*clientConnPool).getClientConn

(*clientConnPool).GetClientConn
依次调用
(*clientConnPool).getClientConn
full source):

func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
    // ...unrelated code truncated...
    for {
        p.mu.Lock()
        for _, cc := range p.conns[addr] {
            if cc.ReserveNewRequest() {
                // ...unrelated code truncated...
                p.mu.Unlock()
                return cc, nil
            }
        }
        // ...unrelated code truncated...
        call := p.getStartDialLocked(req.Context(), addr)
        p.mu.Unlock()
        <-call.done
        if shouldRetryDial(call, req) {
            continue
        }
        cc, err := call.res, call.err
        if err != nil {
            return nil, err
        }
        if cc.ReserveNewRequest() {
            return cc, nil
        }
    }
}

如果它可以从池中找到可用于发送请求的连接,则使用它。否则,拨一个新的。这是

(*ClientConn).ReserveNewRequest
完整来源):

func (cc *ClientConn) ReserveNewRequest() bool {
    cc.mu.Lock()
    defer cc.mu.Unlock()
    if st := cc.idleStateLocked(); !st.canTakeNewRequest {
        return false
    }
    cc.streamsReserved++
    return true
}

(*ClientConn).idleStateLocked
完整来源):

func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
    // ...unrelated code truncated...
    st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
        !cc.doNotReuse &&
        int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
        !cc.tooIdleLocked()
    return
}

在这里

int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32
。一个连接只有在它的
nextStreamID
保证有效时才会被使用。

分配流ID:
(*ClientConn).RoundTrip

现在让我们回到

(*ClientConn).RoundTrip
,看看流id是如何分配给
clientStream
的。

调用顺序为:

func (cc *ClientConn) addStreamLocked(cs *clientStream) {
    cs.flow.add(int32(cc.initialWindowSize))
    cs.flow.setConnFlow(&cc.flow)
    cs.inflow.init(transportDefaultStreamFlow)
    cs.ID = cc.nextStreamID
    // ^^^^^^^^^^^^^^^^^^^^ <==== Here it is.
    cc.nextStreamID += 2
    cc.streams[cs.ID] = cs
    if cs.ID == 0 {
        panic("assigned stream ID 0")
    }
}

流id从

(ClientConn).nextStreamID
复制,
(ClientConn).nextStreamID
增加
2
(Streams 由客户端发起的必须使用奇数流标识符)。


0
投票

关注x/net/http源码,和@Zuke Lu的回答一样,再次感谢他的追踪码。 https://stackoverflow.com/a/76209075/21847706.

我发现errStreamID的错误可能很少从http2客户端触发,因为stream id只有在获得有效连接后才会被inc。

func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {

    // ...unrelated code truncated...
    for retry := 0; ; retry++ {
        cc, err := t.connPool().GetClientConn(req, addr)
        
        // ...unrelated code truncated...
        res, err := cc.RoundTrip(req)//only in this function, new stream id will be only inc.
        // ...unrelated code truncated.
        
        return res, nil
    }
}

必须可以找到连接。所以进入函数 (*ClientConn).idleStateLocked.

func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
    // ...unrelated code truncated...
    st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
        !cc.doNotReuse &&
        int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
        !cc.tooIdleLocked()
    return
}

如果 st.canTakeNewRequest 在 MaxInt32 不能包含 streamid 时返回 false。然后代码将进入

func(p* clientConnPool) getStartDialLocked(ctx context.Context, addr string) * diaCall {
  call := &dialCall{p, make(chan struct{}),ctx}
  ...
  go call.dial(call.ctx, addr)
}

然后转到拨号功能

func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
    ...
    return t.newClientConn(tconn, singleUse)
}

所以调用了 newClientConnection,这个函数将创建一个新的 tcp 连接,因为将分配一个随机端口。

所以我再次感到困惑,客户端/服务器如何处理遗留的 tcp 连接。我认为应用层根据实际情况来决定如何使用 legacy tcp 连接。当发现遗留连接上没有负载时,服务器可能会关闭它。这只是一个猜测

© www.soinside.com 2019 - 2024. All rights reserved.