我目前正在使用
socket.socketpair()
和 signal.set_wakeup_fd()
作为解决方案,使 socket.recv()
与 Windows 上的 KeyboardInterrupt
兼容 — 特别是在使用 UDP 时 — 请参阅下面的示例。
我还知道其他流行的解决方案,即使用
timeout=
配置繁忙循环来定期检查程序是否被中断。
但这两种解决方案看起来都像是相当丑陋的黑客。有没有“正确”的方法来做到这一点?
import select
import signal
import socket
def _udp_listen(address, family=socket.AF_INET, flags=0, sockopts=frozenset()):
bufsize = getpagesize()
with socket.socket(family, socket.SOCK_DGRAM) as sock:
for sockopt in sockopts:
sock.setsockopt(*sockopt)
sock.bind(address)
_coalmine, _canary = socket.socketpair()
with _canary, _coalmine, _wakeup_fd_ctx(_coalmine.fileno(), strict=True, warn_on_full_buffer=False):
while True:
ready = select.select((sock, _canary), (), ())[0]
if _canary in ready:
# There's no need to raise any error ourselves,
# since Python itself will raise KeyboardInterrupt
# out of select.select() if needed
pass
if sock in ready:
yield sock.recv(bufsize, flags)
# -----
from contextlib import contextmanager
try:
from resource import getpagesize
except ImportError:
import mmap
def getpagesize():
return mmap.PAGESIZE
@contextmanager
def _wakeup_fd_ctx(fd, strict=True, **k):
_orig_wakeup_fd = signal.set_wakeup_fd(fd, **k)
_needs_restore = True
try:
if _orig_wakeup_fd == -1:
yield fd
else:
# We overwrote the existing handler
if strict:
raise RuntimeError(f'wakeup fd already occupied ({_orig_wakeup_fd}). Not sure what to do about that.')
else:
signal.set_wakeup_fd(_orig_wakeup_fd)
_needs_restore = False
yield _orig_wakeup_fd
finally:
if _needs_restore:
signal.set_wakeup_fd(_orig_wakeup_fd)
以下内容有帮助吗?基本范例是将读取套接字放入列表中,然后进行选择,然后检查哪个套接字有可用数据,然后对数据进行操作。
#!/usr/bin/python3
import time
import socket
import select
import threading
glob_control_read_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
glob_control_read_socket.bind(('localhost', 0))
glob_control_write_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
glob_control_write_socket.connect(('localhost', glob_control_read_socket.getsockname()[1]))
glob_server_read_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
glob_server_read_socket.bind(('localhost', 12000))
print("Server running on port " + str(glob_server_read_socket.getsockname()[1]))
def sockets_shutdown():
glob_control_write_socket.close()
glob_control_read_socket.close()
glob_server_read_socket.close()
print("Graceful shutdown")
def handle_server_read():
read_str = glob_server_read_socket.recv(100).decode("utf-8")
print("Server got: " + read_str.rstrip())
def sockets_thread_main():
read_sockets_list = [glob_control_read_socket, glob_server_read_socket]
while True:
try:
read_sockets,_,_ = select.select(read_sockets_list, [], [])
for ready_socket in read_sockets:
if ready_socket == glob_control_read_socket:
control_bytes = glob_control_read_socket.recv(32)
if control_bytes == b"QUIT":
sockets_shutdown()
return
if ready_socket == glob_server_read_socket:
handle_server_read()
except Exception as e:
print(str(e))
sockets_thread = threading.Thread(target = sockets_thread_main, args = ())
try:
sockets_thread.start()
while True:
time.sleep(100000) # Placeholder, do something real here
except KeyboardInterrupt:
glob_control_write_socket.send(b"QUIT")
sockets_thread.join()
在最近的 Windows 10 和 ncat 上进行了测试(有趣的是,如果我使用 ncat 连接到本地主机,它无法建立连接并报告“现有连接被远程主机强制关闭。”我需要使用 127.0.0.1)