我们有一个用 Python
http.server
编写的简单 Web 服务器。它的一个端点提供流数据——一系列事件,并在新事件发生时发送。我们已成功设置此端点,以便它按预期传送数据。但是,如果客户端断开连接,服务器不会检测到这一点,并将继续通过没有人无限期监听的连接传输数据。
相关代码:
import json
import queue
import http.server
from common.log import log
logger = log.get_logger()
class RemoteHTTPHandler(http.server.BaseHTTPRequestHandler):
...
def __stream_events(self, start_after_event_id: int) -> None:
"""Stream events starting after the given ID and continuing as new events become available"""
# Get a queue of events, which will include all existing events from the given starting point,
# and be updated with new events as they become available
logger.info(f"Streaming events from ID {start_after_event_id}")
with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
self.send_response(200)
self.send_header("Content-type", "application/yaml; charset=utf-8")
self.send_header("Connection", "close")
self.end_headers()
# If the server is shutting down, all ongoing streams should terminate
while not self._stop_streams_event.is_set():
try:
# Get the next event;
# if the queue is empty, will block until an event is added, up to a maximum of 1s
try:
data = events_queue.get(timeout=1)
except queue.Empty:
# Send an empty line to keep the HTTP connection from timing out
self.wfile.write(b"\n")
continue
# Send the encoded event plus the seperator line
self.wfile.write(json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n")
except BrokenPipeError as ex:
# TODO: This does not reliably detect loss of connection
# Broken pipe means the connection was lost,
# either because the client closed it or there was a network error
logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
return
def serve():
http.server.ThreadingHTTPServer(("", 8090), RemoteHTTPHandler).serve_forever()
我写这篇文章的期望是,一旦客户端关闭连接,调用
self.wfile.write()
会引发 BrokenPipeError(可能在 TCP 连接超时一段时间后)。然而,这并没有发生;服务器将继续愉快地将事件写入一个连接,至少 20 分钟内没有人在没有任何错误地监听。
检查客户是否还在听的正确方法是什么?
使用
socket
模块在底层套接字上设置超时,然后使用 select
在写入数据之前检查客户端活动。
首先我们将这两个模块添加到您的代码中
import select
import socket
然后我们像这样修改我们的
__stream_events
方法:
def __stream_events(self, start_after_event_id: int) -> None:
"""Stream events starting after the given ID and continuing as new events become available"""
logger.info(f"Streaming events from ID {start_after_event_id}")
with self._events_stream_manager.stream_events(start_after_event_id) as events_queue:
self.send_response(200)
self.send_header("Content-type", "application/yaml; charset=utf-8")
self.send_header("Connection", "close")
self.end_headers()
# Set a timeout on the underlying socket
self.connection.settimeout(5)
# If the server is shutting down, all ongoing streams should terminate
while not self._stop_streams_event.is_set():
try:
# Get the next event;
# if the queue is empty, will block until an event is added, up to a maximum of 1s
try:
data = events_queue.get(timeout=1)
except queue.Empty:
# Send an empty line to keep the HTTP connection from timing out
self.wfile.write(b"\n")
continue
# Check if the client is still connected using select
r, w, e = select.select([], [self.connection], [], 5)
if w:
# Send the encoded event plus the separator line
self.wfile.write(json.dumps(data, indent=4).encode('utf-8') + b"\n\n---\n\n")
else:
logger.info("Client disconnected")
return
except (BrokenPipeError, socket.timeout) as ex:
# Connection closed or timed out
logger.info(f"Connection closed: {type(ex).__name__}: {ex}", exc_info=True)
return
现在在写入数据之前会检查,如果客户端断开连接,服务器将记录断开连接并停止写入数据。