[具有500个,并持续增长DataFrames
,我想将对每个DataFrame独立数据)的操作提交到dask
。我的主要问题是:dask
是否可以保存连续提交的数据,因此我可以对所有提交的数据submit
进行操作-不仅是新提交的?
但是让我们在一个示例中进行解释:
创建dask_server.py
:
from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'
def run_cluster():
cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
client = Client(cluster)
print(client)
print("Press Enter to quit ...")
input()
if __name__ == '__main__':
run_cluster()
现在我可以从我的my_stream.py
连接并开始连接到submit
和gather
数据:
DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)
def my_dask_function(lines):
return lines['a'].mean() + lines['b'].mean
def async_stream_redis_to_d(max_chunk_size = 1000):
while 1:
# This is a redis queue, but can be any queueing/file-stream/syslog or whatever
lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)
futures = []
df = pd.DataFrame(data=lines, columns=['a','b','c'])
futures.append(dask_client.submit(my_dask_function, df))
result = self.dask_client.gather(futures)
print(result)
time sleep(0.1)
if __name__ == '__main__':
max_chunk_size = 1000
thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
#thread_stream_data_from_redis.setDaemon(True)
thread_stream_data_from_redis.start()
# Lets go
这可以按预期工作,而且很快!!!
但是接下来,我想首先实际append
lines
之前进行计算-想知道这是否可能吗?因此,在这里的示例中,我想计算已提交的all行上的mean
,而不仅仅是最后提交的行。
问题/方法:
submit
all数据缓存到群集每次出现新行。这就像是指数开销。尝试了一下,它可以工作,但是很慢! 将期货列表分配给已发布的数据集对我来说似乎很理想。这相对便宜(一切都是元数据),您会在几毫秒内保持最新状态
client.datasets["x"] = list_of_futures
def worker_function(...):
futures = get_client().datasets["x"]
data = get_client.gather(futures)
... work with data
正如您提到的,还有其他系统,例如PubSub或Actors。从您所说的内容来看,我怀疑Futures + Published数据集更简单,更实用。