如何在Python Web应用程序中为状态更改通知生成服务器发送的事件?

问题描述 投票:0回答:1

我有一个用CherryPy编写的Web应用程序:用户上传文件,然后开始冗长的操作,经历了多个阶段。我希望将这些阶段的通知推送到所有连接的客户端。但是我不知道如何在流程之间进行通信。我想我将不得不在一个单独的过程中启动冗长的操作,但是然后我不知道如何将“高级N阶段”消息传递给“服务器发送功能”。

从概念上讲,应该是这样的:

SSEtest.py:

from pathlib import Path
from time import sleep
import cherrypy


def lengthy_operation(name, stream):
    for stage in range(10):
        print(f'stage {stage}... ', end='')
        sleep(2)
        print('done')
    print('finished')


class SSETest():

    @cherrypy.expose
    def index(self):
        return Path('SSEtest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Content-Type'] = 'text/event-stream;charset=utf-8'

        def lengthy_operation():
            for stage in range(5):
                yield f'data: stage {stage}... \n\n'
                sleep(2)
                yield 'data: done\n\n'
            yield 'data: finished\n\n'

        return lengthy_operation()

    stage._cp_config = {'response.stream': True, 'tools.encode.encoding': 'utf-8'}


cherrypy.quickstart(SSETest())

SSEtest.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>SSE Test</title>
</head>
<body>
<h1>SSE Test</h1>
<div>
    <form id="load_file_form" action="" enctype="multipart/form-data">
        <label for="load_file">Load a file: </label>
        <input type="file" id="load_file" name="load_file">
        <progress max="100" value="0" id="progress_bar"></progress>
    </form>
</div>

<div id="status_messages">
<h3>Stages:</h3>
</div>

<script>
    const load_file = document.getElementById('load_file');
    const progress_bar = document.getElementById('progress_bar');

    function update_progress_bar(event) {
        if (event.lengthComputable) {
            progress_bar.value = Math.round((event.loaded/event.total)*100);
        }
    }

    load_file.onchange = function (event) {
        let the_file = load_file.files[0];
        let formData = new FormData();
        let connection = new XMLHttpRequest();

        formData.append('file', the_file, the_file.name);

        connection.open('POST', 'upload', true);
        connection.upload.onprogress = update_progress_bar;
        connection.onload = function (event) {
            if (connection.status != 200) {
                alert('Error! ' + event);
            }
        };

        connection.send(formData);
    };

    const status_messages = document.getElementById("status_messages");
    const sse = new EventSource("stage");

    sse.onopen = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = "Connection established: " + event.type;
        status_messages.appendChild(new_message);
    };

    sse.onmessage = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = event.data;
        status_messages.appendChild(new_message);
    };

    sse.onerror = function(event) {
        let new_message = document.createElement("p");
        if (event.readyState == EventSource.CLOSED) {
            new_message.innerHTML = "Connections closed";
        } else {
            new_message.innerHTML = "Error: " + event.type;
        }
        status_messages.appendChild(new_message);
    };

</script>
</body>
</html>

上传文件时,我只需要调用一次lengthy_operation()。并将其生成的消息发送给所有客户端。现在,它可以与本地功能一起使用,这不是我想要的。如何使用外部函数并将其消息传递给stage()方法?

python cherrypy server-sent-events
1个回答
0
投票

我希望将这些阶段的通知推送到所有连接的客户端。

我怀疑您最终将需要比这更多的控制权,但是我会回答您所提出的问题。稍后,您可能希望以下面的示例为基础,并根据用户的会话,特定的开始时间戳或其他相关概念来过滤广播的通知。

每个“连接的客户端”实际上挂在对/stage的长期运行请求上,服务器将使用该请求将events流式传输到客户端。在您的示例中,每个客户端将立即开始该请求,并将其保持打开状态,直到服务器终止流。您也可以使用close()上的close()从客户端关闭流。

您问过如何让EventSource处理程序广播其事件或将其事件镜像到所有当前连接的客户端。可以通过多种方法来完成此操作,但总而言之,您希望/stage函数将事件发布到所有lengthy_operation处理程序读取器,或者发布到所有/stage处理程序读取的持久共享位置。我将展示一种封装上述第一个想法的方法。

考虑序列化为/stage的通用流事件类:

data: <some message>

以及与文件相关的流事件的更具体派生案例:

class StreamEvent:
    def __init__(self, message: str) -> bytes:
        self.message = message

    def serialize(self) -> str:
        return f'data: {self.message}\n\n'.encode('utf-8')

您可以创建一种非常原始的发布/订阅类型的容器,其中class FileStreamEvent(StreamEvent): def __init__(self, message: str, name: str): super().__init__(message) self.name = name def serialize(self) -> bytes: return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8') 然后可以订阅侦听器,/stage可以将lengthy_operation()实例发布给所有侦听器:

StreamEvent

class StreamSource: def __init__(self): self.listeners: List[Queue] = [] def put(self, event: StreamEvent): for listener in self.listeners: listener.put_nowait(event) def get(self): listener = Queue() self.listeners.append(listener) try: while True: event = listener.get() yield event.serialize() finally: self.listeners.remove(listener) 中,您可能想创建一个结束案例(例如,检查“关闭”或“完成”事件)以从通用StreamSource.get()退出,并且您可能想在阻塞while True上设置超时]通话。但是为了这个示例,我将所有内容保持基本。

现在,Queue.get()只需要引用一个lengthy_operation()

StreamSource

[def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO): for stage in range(10): events.put(FileStreamEvent(f'stage {stage}: begin', name)) sleep(2) events.put(FileStreamEvent(f'stage {stage}: end', name)) events.put(FileStreamEvent('finished', name)) 然后可以为每个SSETest调用提供StreamSource的共享实例,并且lengthy_operation()可以使用SSETest.stage()在此共享实例上注册侦听器:

StreamSource.get()

这是一个完整的[1]示例,说明了如何解决您的紧迫问题,但是当您更接近最终用户体验时,您很可能希望对它进行调整。

[1]:为便于阅读,我省略了导入,因此这里是:

class SSETest:
    _stream_source: StreamSource = StreamSource()

    @cherrypy.expose
    def index(self):
        return Path('SSETest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(self._stream_source, name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Cache-Control'] = 'no-cache'
        cherrypy.response.headers['Content-Type'] = 'text/event-stream'
        def stream():
            yield from self._stream_source.get()
        return stream()

    stage._cp_config = {'response.stream': True}
© www.soinside.com 2019 - 2024. All rights reserved.