与Python中的套接字工作分开计算

问题描述 投票:7回答:2

我正在序列化列数据,然后通过套接字连接发送它。就像是:

import array, struct, socket

## Socket setup
s = socket.create_connection((ip, addr))

## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)

for i in range(num_of_chunks):
    ## Binarize data
    columns['col1'] = array.array('i', range(10000))
    columns['col2'] = array.array('f', [float(num) for num in range(10000)])
    .
    .
    .

    ## Send away
    chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
    s.sendall(chunk)
    s.recv(1000)      #get confirmation

我希望将计算与发送分开,将它们放在单独的线程或进程上,这样我就可以在数据被发送时继续进行计算。

我把二值化部分作为生成器函数,然后将生成器发送到一个单独的线程,然后通过队列产生二进制块。

我从主线程中收集了数据并将其发送出去。就像是:

import array, struct, socket
from time import sleep
try:
    import  thread
    from Queue import Queue
except:
    import _thread as thread
    from queue import Queue


## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()


def binarize(num_of_chunks):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)

    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        .
        .

        yield b''.join((columns[col_name] for col_name in ordered_col_list))


def chunk_yielder(queue):
    ''' Generate binary chunks and put them on a queue. To be used from a thread '''

    while True:   
        try:
            data_gen = queue.get_nowait()
        except:
            sleep(0.1)
            continue
        else:    
            for chunk in data_gen:
                queue.put(chunk)


## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)


## Get data back and send away
while True:
   try:
        binary_chunk = queue.get_nowait()
    except:
        sleep(0.1)
        continue
    else:    
        socket.sendall(binary_chunk)
        socket.recv(1000) #Get confirmation

但是,我没有看到和性能改进 - 它没有更快的工作。

我不太了解线程/进程,我的问题是,是否有可能(在所有和Python中)从这种类型的分离中获益,以及使用线程或者线程或者什么是好的方法。处理(或任何其他方式 - 异步等)。

编辑:

据我所知 -

  1. Multirpocessing需要序列化任何发送的数据,因此我将双重发送每个计算数据。
  2. 通过socket.send()发送应释放GIL

因此,我认为(如果我弄错了,请纠正我),线程解决方案是正确的方法。但是我不确定如何正确地做到这一点。

我知道cython可以释放线程的GIL,但由于其中一个只是socket.send / recv,我的理解是它不应该是必要的。

python multithreading sockets concurrency
2个回答
3
投票

在Python中有两个并行运行的选项,可以使用multiprocessingdocs)库,也可以在cython中编写并行代码并释放GIL。一般而言,后者的工作量大得多,而且适用性较差。

Python线程受Global Interpreter Lock(GIL)的限制,我不会在这里详细介绍,因为你会在网上找到足够的信息。简而言之,顾名思义,GIL是CPython解释器中的全局锁,它确保多个线程不会同时修改在所述解释器范围内的对象。这就是为什么,例如,cython程序可以并行运行代码,因为它们可以存在于GIL之外。


至于你的代码,一个问题是你在GIL中同时运行数字运算(binarize)和socket.send,这将严格按顺序运行它们。 queue也非常奇怪,并且有一个NameError,但让我们把它们放在一边。

考虑到Jeremy Friesner已经指出的警告,我建议你按照以下方式重新构造代码:你有两个进程(不是线程)一个用于二进制数据,另一个用于发送数据。除此之外,还有启动两个子进程的父进程,以及将子进程1连接到子进程2的队列。

  • Subprocess-1执行数字运算并将生成的数据生成到队列中
  • Subprocess-2使用队列中的数据并执行socket.send

在代码中,设置看起来像

from multiprocessing import Process, Queue

work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()

binarize可以保留在你的代码中,除了最后代替yield,你将元素添加到队列中

def binarize(num_of_chunks, q):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)
    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        data = b''.join((columns[col_name] for col_name in ordered_col_list))
        q.put(data)

send_data应该只是代码底部的while循环,具有连接打开/关闭功能

def send_data(ip, addr, q):
     s = socket.create_connection((ip, addr))
     while True:
         try:
             binary_chunk = q.get(False)
         except:
             sleep(0.1)
             continue
         else:    
             socket.sendall(binary_chunk)
             socket.recv(1000) # Get confirmation
    # maybe remember to close the socket before killing the process

现在,您有两个(实际上是三个,如果您计算父级)独立处理数据的进程。您可以通过将队列的max_size设置为单个元素来强制这两个进程同步其操作。这两个独立进程的操作也很容易通过计算机上的进程管理器监控top(Linux),Activity Monitor(OsX),不记得在Windows下调用它的内容。


最后,Python 3提供了使用协同例程的选项,这些例程既不是进程也不是线程,而是完全不同的东西。从CS的角度来看,协同程序非常酷,但最初有点令人头疼。虽然有很多资源需要学习,比如媒体上的this帖子和David Beazley的this讲话。


更一般地说,如果您还不熟悉生产者/消费者模式,则可能需要查看生产者/消费者模式。


2
投票

如果您尝试使用并发来提高CPython的性能,我强烈建议使用多处理库而不是多线程。这是因为GIL(Global Interpreter Lock)会对执行速度产生巨大影响(在某些情况下,它可能会导致代码运行速度比单线程版本慢)。另外,如果您想了解有关此主题的更多信息,我建议您阅读this presentation by David Beazley。多处理通过为每个进程生成一个新的Python解释器实例来绕过此问题,从而使您可以充分利用多核架构。

© www.soinside.com 2019 - 2024. All rights reserved.