使用 http.server 服务流端点时如何检测客户端断开连接

问题描述 投票:0回答:1

我们有一个用 Python

http.server
编写的简单 Web 服务器。它的一个端点提供流数据——一系列事件,并在新事件发生时发送。我们已成功设置此端点,以便它按预期传送数据。但是,如果客户端断开连接,服务器不会检测到这一点,并将继续通过没有人无限期监听的连接传输数据。

相关代码:

import json
import queue
import http.server

from common.log import log

logger = log.get_logger()

class RemoteHTTPHandler(http.server.BaseHTTPRequestHandler):

    ...

    def __stream_events(self, start_after_event_id: int) -> None:
        """Stream events starting after the given ID and continuing as new events become available"""

        # Get a queue of events, which will include all existing events from the given starting point,
        # and be updated with new events as they become available
        logger.info(f"Streaming events from ID {start_after_event_id}")
        with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
            self.send_response(200)
            self.send_header("Content-type", "application/yaml; charset=utf-8")
            self.send_header("Connection", "close")
            self.end_headers()

            # If the server is shutting down, all ongoing streams should terminate
            while not self._stop_streams_event.is_set():
                try:
                    # Get the next event;
                    # if the queue is empty, will block until an event is added, up to a maximum of 1s
                    try:
                        data = events_queue.get(timeout=1)
                    except queue.Empty:
                        # Send an empty line to keep the HTTP connection from timing out
                        self.wfile.write(b"\n")
                        continue

                    # Send the encoded event plus the seperator line
                    self.wfile.write(json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n")
                except BrokenPipeError as ex:
                    # TODO: This does not reliably detect loss of connection
                    # Broken pipe means the connection was lost,
                    # either because the client closed it or there was a network error
                    logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
                    return


def serve():
    http.server.ThreadingHTTPServer(("", 8090), RemoteHTTPHandler).serve_forever()

我写这篇文章的期望是,一旦客户端关闭连接,调用

self.wfile.write()
会引发 BrokenPipeError(可能在 TCP 连接超时一段时间后)。然而,这并没有发生;服务器将继续愉快地将事件写入一个连接,至少 20 分钟内没有人在没有任何错误地监听。

检查客户是否还在听的正确方法是什么?

python http streaming http.server
1个回答
0
投票

使用

socket
模块在底层套接字上设置超时,然后使用
select
在写入数据之前检查客户端活动。

首先我们将这两个模块添加到您的代码中

import select
import socket

然后我们像这样修改我们的

__stream_events
方法:

def __stream_events(self, start_after_event_id: int) -> None:
    """Stream events starting after the given ID and continuing as new events become available"""

    logger.info(f"Streaming events from ID {start_after_event_id}")
    with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
        self.send_response(200)
        self.send_header("Content-type", "application/yaml; charset=utf-8")
        self.send_header("Connection", "close")
        self.end_headers()

        # Set a timeout on the underlying socket
        self.connection.settimeout(5)

        # If the server is shutting down, all ongoing streams should terminate
        while not self._stop_streams_event.is_set():
            try:
                # Get the next event;
                # if the queue is empty, will block until an event is added, up to a maximum of 1s
                try:
                    data = events_queue.get(timeout=1)
                except queue.Empty:
                    # Send an empty line to keep the HTTP connection from timing out
                    self.wfile.write(b"\n")
                    continue

                # Check if the client is still connected using select
                r, w, e = select.select([], [self.connection], [], 5)
                if w:
                    # Send the encoded event plus the separator line
                    self.wfile.write(json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n")
                else:
                    logger.info("Client disconnected")
                    return
            except (BrokenPipeError, socket.timeout) as ex:
                # Connection closed or timed out
                logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
                return

现在在写入数据之前会检查,如果客户端断开连接,服务器将记录断开连接并停止写入数据。

© www.soinside.com 2019 - 2024. All rights reserved.