Dask:连续提交,处理所有提交的数据

问题描述 投票:4回答:1

[具有500个,并持续增长DataFrames,我想将对每个DataFrame独立数据)的操作提交到dask。我的主要问题是:dask是否可以保存连续提交的数据,因此我可以对所有提交的数据submit进行操作-不仅是新提交的?

但是让我们在一个示例中进行解释:

创建dask_server.py

from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'

def run_cluster():
    cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
    client = Client(cluster)
    print(client)
    print("Press Enter to quit ...")
    input()

if __name__ == '__main__':
    run_cluster()

现在我可以从我的my_stream.py连接并开始连接到submitgather数据:

DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)

def my_dask_function(lines):
    return lines['a'].mean() + lines['b'].mean

def async_stream_redis_to_d(max_chunk_size = 1000):
    while 1:

        # This is a redis queue, but can be any queueing/file-stream/syslog or whatever
        lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)

        futures = []
        df = pd.DataFrame(data=lines, columns=['a','b','c'])
        futures.append(dask_client.submit(my_dask_function, df))

        result = self.dask_client.gather(futures)
        print(result)

        time sleep(0.1)

if __name__ == '__main__':
    max_chunk_size = 1000
    thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
    #thread_stream_data_from_redis.setDaemon(True)
    thread_stream_data_from_redis.start()
    # Lets go

这可以按预期工作,而且很快!!!

但是接下来,我想首先实际appendlines之前进行计算-想知道这是否可能吗?因此,在这里的示例中,我想计算已提交的all行上的mean,而不仅仅是最后提交的行。

问题/方法:

  1. 此累积计算可能吗?
  2. 错误的替代方法1:我本地缓存所有行,并将submit all数据缓存到群集每次出现新行。这就像是指数开销。尝试了一下,它可以工作,但是很慢!
  3. [黄金选项:Python程序1推送数据。比有可能与另一个客户端(来自另一个python程序)到该累积数据并将分析逻辑从插入逻辑移开。我认为应该使用Published DataSets,但是这种高速附件是否适用?

可能相关:Distributed VariablesActors Worker

python-3.x dask dask-distributed streamz
1个回答
0
投票

将期货列表分配给已发布的数据集对我来说似乎很理想。这相对便宜(一切都是元数据),您会在几毫秒内保持最新状态

client.datasets["x"] = list_of_futures

def worker_function(...):
    futures = get_client().datasets["x"]
    data = get_client.gather(futures)
    ... work with data

正如您提到的,还有其他系统,例如PubSub或Actors。从您所说的内容来看,我怀疑Futures + Published数据集更简单,更实用。

© www.soinside.com 2019 - 2024. All rights reserved.