如何通过 multiprocessing.Process 传递对象的实例

问题描述 投票:0回答:0

我正在编写一个库,它本质上是一个 HTTP 响应引擎(类似于 Flask 或 DJango)。问题是我目前正在尝试调整代码,使其在自己的进程中运行,以便可以随时关闭。问题是我无法这样做,因为我需要调用的函数是一个名为

Server
的类的实例上的方法。根据我的研究,问题是它无法腌制对象,这就是我收到错误的原因。我想不出任何其他方法来传递对象或调用静态方法,因为该方法访问其他方法主要是获取路由数据。我对多处理模块很陌生,尽管我对类似的线程模块有很多经验。

这是

Server
类的当前代码。我实现了一个线程来处理日志记录,因为据我所知,不可能通过进程传递日志记录对象。所以我用队列来解决它。

import threading
import multiprocessing
import socket
import logging
from typing import Self

from . import request
from . import routes

class Server(routes.Routes):
    '''HTTP server running on a specified host and port.'''

    def __init__(self,
                 *,
                 host: str = '127.0.0.1',
                 port: int = 80,
                 logger: logging.Logger = None) -> None:
        '''Initializes the server class.'''
        
        super().__init__()
        
        self._host: str = host

        self._port: int = port

        self._logger: logging.Logger = logger

        if self._logger:

            self._logging_queue: multiprocessing.Queue = multiprocessing.Queue(-1)

        self._process: multiprocessing.Process = \
multiprocessing.Process(target = self._listen,
                        daemon = True)

    def start(self) -> None:
        '''Starts the server. Specify the host and port to listen on.
        Optionally, specify a logging.Logger object to log to.'''
        
        self._process.start()

        if self._logger:

            threading.Thread(target = self._logging_handler,
                            daemon = True).start()
        
    def wait(self) -> None:
        '''Waits for a keyboard interrupt or the enter key is pressed is recieved.'''

        try:

            input('Press Enter to continue...')

        except (KeyboardInterrupt, EOFError):

            pass

    def stop(self) -> None:
        '''Stops the server.'''

        if self._logger:

            self._logging_queue.put((logging.INFO,
                                     'Exiting server...'))

        self._process.kill()

    def _listen(self) -> None:
        '''Listens for incoming connections 
        and spawns a thread to handle each request.'''

        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

            s.bind((self._host, self._port))

            s.listen()

            if self._logger:

                self._logging_queue.put((logging.INFO, f'Server hosted on \
{self._host}:{self._port}.'))

            while True:

                connection, address = s.accept()

                threading.Thread(target = self._handle_request,
                                kwargs = {'connection': connection,
                                        'address': address},
                                daemon = True).start()
                
    def _handle_request(self,
                        connection: socket.socket,
                        address: socket.AddressInfo) -> None:
        '''Handles a request from a client.'''

        while True:
            
            try:

                raw_request = ''

                while True:

                    raw_request += connection.recv(4096).decode(encoding = 'utf-8',
                                                                errors = 'ignore')

                    if '\r\n\r\n' in raw_request or raw_request == '':

                        break

                if not raw_request:
                        
                        if self._logger:

                            self._logging_queue.put((logging.DEBUG, f'Connection closed\
 by client {address[0]}:{address[1]}.'))

                        connection.close()

                        return None

                try:

                    parsed_request = request.Request.from_string(address = address,
                                                                 request = raw_request)

                except Exception:

                    if self._logger:

                        self._logging_queue.put((logging.ERROR, f'Invalid request from \
{address[0]}:{address[1]}, closing connection.'))

                    connection.close()

                    return None

                if content_length:=parsed_request.headers.get('Content-Length'):

                    if length_recieved:=len(raw_request.split('\r\n\r\n')[1]) < \
                    (length_to_recieve:=int(content_length)):

                        raw_request += connection.recv(length_to_recieve - \
                                                       length_recieved)\
                            .decode(encoding = 'utf-8',
                                    errors = 'ignore')

                        parsed_request = request.Request.from_string(address = address,
                                                                request = raw_request)
                        
                if self._logger:

                    self._logging_queue.put((logging.DEBUG, f'Recieved \
{parsed_request.method.title()} request from {address[0]}:{address[1]} for \
{parsed_request.path if parsed_request.path else "/"}'))

                connection.send(self._get_route(path = parsed_request.path,
                                                request = parsed_request))
                
                if self._logger:

                    self._logging_queue.put((logging.DEBUG, f'Sent \
{parsed_request.method} response to {address[0]}:{address[1]} for \
{parsed_request.path if parsed_request.path else "/"}'))

                if not parsed_request.headers.get('Connection') == 'keep-alive':

                    try:
                
                        connection.close()

                    except Exception:

                        pass

                    if self._logger:
                        
                        self._logging_queue.put((logging.DEBUG, f'Connection closed \
with {address[0]}:{address[1]} by server.'))

                    return None

            except Exception as e:

                if self._logger:

                    self._logging_queue.put((logging.ERROR, f'Error while handling \
request from {address[0]}:{address[1]}: "{e}".'))                   

                return None
            
    def _logging_handler(self) -> None:

        while self._process.is_alive():

            log_level, log_message = self._logging_queue.get()

            match log_level:

                case logging.DEBUG:

                    self._logger.debug(log_message)

                case logging.INFO:

                    self._logger.info(log_message)

                case logging.WARNING:

                    self._logger.warning(log_message)

                case logging.ERROR:

                    self._logger.error(log_message)

                case logging.CRITICAL:

                    self._logger.critical(log_message)

    def __enter__(self) -> Self:

        self.start()

        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:

        self.stop()

这里是

Routes
类。

import os

from . import response_messages
from . import response_codes
from . import response
from . import request
from . import render

class Routes:
    '''Stores, sorts, and handles a collection of routes.'''

    def __init__(self) -> None:

        self.routes: dict[str: callable] = {
            '/404': lambda request: response.Response(version = 1.1,
code = response_codes.ResponseCodes.NOT_FOUND,
message = response_messages.ResponseMessages.NOT_FOUND,
headers = {'Content-Type': 'text/html',
            'Content-Length': '202'},
body = b'''<!DOCTYPE html>

<html>

<head>
<title>404 Not Found</title>
</head>

<body>

<h1 style="text-align: center;">404 Not Found</h1>

<hr>

<p style="text-align: center;">HTTP-PyServer</p>

</body>

</html>''')
    }

    def route(self,
              path: str,
              *,
              ressources: tuple[tuple[str, str]] = ()) -> callable:
        '''Adds a route to the routes dictionary.'''

        for ressource_file_path, ressource_reference_path in ressources:

            ressource_reference_path = '/' + ressource_reference_path.strip('/')

            if not os.path.exists(ressource_file_path):

                self.routes[ressource_reference_path] = \
                    lambda _: self.routes['/404'](request = request.Request())

            else:

                ressource = render.attachment(filepath = ressource_file_path)

                self.routes[ressource_reference_path] = \
                lambda _, ressource = ressource: ressource

        def callable_route(route_function):

            self.routes[path.rstrip('/')] = route_function

            return route_function
        
        return callable_route
    
    def _get_wildcard_path(self,
                           path: str,
                           *,
                           request: request.Request) -> bytes:
        '''Gets a route with wildcard values.'''
    
        for route in self.routes:

            if not len(route.split('/')) == len(path.split('/')):

                continue

            for sub_route, sub_path in zip(route.split('/'), path.split('/')):

                if (not sub_route == sub_path) \
                    and (not (sub_route.startswith('%') \
                            and sub_route.endswith('%'))):

                    break

            else:

                wildcard_values = []

                for sub_route, sub_path \
                    in zip(route.split('/'), path.split('/')):

                    if sub_route.startswith('%') and sub_route.endswith('%'):

                        wildcard_values.append(sub_path)

                message =  self.routes[route](request,
                                         *wildcard_values)
                
                if isinstance(message, str):

                    return bytes(render.text(message))
                
                elif isinstance(message, bytes):

                    return message
                
                elif isinstance(message, response.Response):

                    return bytes(message)
                
                else:

                    raise TypeError(f'Expected function for {request.path} \
to return str, bytes, or Response, got {type(message)}.')

        return self._get_route('/404', request = request)

    def _get_route(self,
                  path: str,
                  *,
                  request: request.Request = request.Request()) -> bytes:
        '''Gets a route from the routes dictionary.'''

        if path in self.routes:

            message = self.routes[path](request)

            if isinstance(message, str):

                return bytes(render.text(text = message))
            
            elif isinstance(message, bytes):

                return message
            
            elif isinstance(message, response.Response):

                return bytes(message)
            
            else:

                raise TypeError(f'Expected function for {request.path} \
to return str, bytes, or Response, got {type(message)}.')

        else:

            return self._get_wildcard_path(path = path,
                                          request = request)

这是我运行一个简单脚本时的错误:

$ py main.py
Traceback (most recent call last):
  File "C:\Users\alexj\Desktop\Text Documents\Coding\HTTP-PyServer\src\main.py", line 10, in <module>
    with Server(logger = logging.getLogger(__name__)) as s:
  File "C:\Users\alexj\Desktop\Text Documents\Coding\HTTP-PyServer\src\server\server.py", line 223, in __enter__
    self.start()
  File "C:\Users\alexj\Desktop\Text Documents\Coding\HTTP-PyServer\src\server\server.py", line 40, in start
    self._process.start()
  File "C:\Users\alexj\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "C:\Users\alexj\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\alexj\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\context.py", line 336, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\alexj\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 94, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\alexj\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'Routes.__init__.<locals>.<lambda>'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\alexj\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\spawn.py", line 106, in spawn_main
    source_process = _winapi.OpenProcess(
                     ^^^^^^^^^^^^^^^^^^^^
OSError: [WinError 87] The parameter is incorrect
python pipe pickle python-multiprocessing python-multithreading
© www.soinside.com 2019 - 2024. All rights reserved.