我为dask写了一个小的启动脚本。SSHCluster
和 LocalCluster
工作得很好(见下文)。
但是,当我让它运行时,有一个工人用它的
KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-1-from-delayed-f0f78ffea007aeeccc7a9e04880d0195', 0)", <Worker 'tcp://192.168.56.11:34965', name: 0, memory: 0, processing: 1>)
例外,我不知道在哪里可以找到 崩溃日志 回溯 的远程工作人员?有办法在Scheduler Host上集中收集吗?还是在远程机器上有一些日志?
在 现有答案 只说明,它是登录到 stdout
- 但我也找不到任何日志......。
from dask.distributed import Client, LocalCluster, SSHCluster
import time, datetime
import pandas as pd
import numpy as np
import os, sys
from collections import Counter
# Add Home Folder of CGi to path
os.environ["HOME"] = "/home/cgi/"
os.path.expanduser("~/.ssh/id_rsa")
#os.path.expanduser('/home/cgi/')
#os.path.expandvars('/home/cgi/')
def run_cluster(local=0, HOST = '10.8.0.1', SCHEDULER_PORT = 8711, DASHBOARD_PORT=8710,
DASK_WORKER_PROCESSES = 16, NTHREADS=2, SILENCE_LOGS = 0):
start_msg = "Starting a "
if local: start_msg += "local"
else: start_msg += "ssh"
start_msg += " dask cluster. SCHEDULER_PORT=%s and DASHBOARD_PORT=%s." % ( SCHEDULER_PORT, DASHBOARD_PORT )
print(start_msg)
dashboard_address = ':%s' % DASHBOARD_PORT
if local:
cluster = LocalCluster(dashboard_address=dashboard_address, scheduler_port=SCHEDULER_PORT,
n_workers=DASK_WORKER_PROCESSES, host=HOST, silence_logs=SILENCE_LOGS)
else:
worker_hosts = [
"localhost", "localhost", "localhost", "localhost",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
"192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11"
]
print("Starting a DASK SSHCluster with (%s) workers on %s different hosts... "
% len(worker_hosts), len(set(worker_hosts)))
cluster = SSHCluster(
worker_hosts,
connect_options = {"known_hosts": None},
worker_options = {"nthreads": NTHREADS},
scheduler_options={"port": SCHEDULER_PORT, "dashboard_address": dashboard_address}
)
print("SSHCLUSTER>%s" % cluster)
print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, dashboard_address))
#cluster.scale(3)
client = Client(cluster)
#print(cluster)
print(client)
print("Press Enter to quit ...")
while (not time.sleep(5)):
continue
time.sleep(0.1)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Webserver which runs the dash/plotly dashboard(s). Name: BT_HISTORY')
parser.add_argument('-l', '--local-cluster', help='1/0', required=False, default=1)
parser.add_argument('-q', '--quiet', help='1/0', required=False, default=False)
parser.add_argument('-dp', '--dashboard-port', help='port to run dashboard (default=%s)' % 8710, default=8710, required=False)
parser.add_argument('-sp', '--scheduler-port', help='port to run scheduler (default=%s)' % 8711, default=8711, required=False)
args = vars(parser.parse_args())
print("args>%s" % args)
LOCAL = int(args['local_cluster'])
DASHBOARD_PORT = int(args['dashboard_port'])
SCHEDULER_PORT = int(args['scheduler_port'])
SILENCE_LOGS = int(args['quiet'])
run_cluster(local=LOCAL, DASHBOARD_PORT=DASHBOARD_PORT, SCHEDULER_PORT=SCHEDULER_PORT, SILENCE_LOGS=SILENCE_LOGS)
你说的没错,Dask Worker会将数据管道到stdout。 收集这些日志通常是其他系统的工作。
我相信,从今天起(2020-06-12)。dask-ssh
命令行工具实际上使用的代码路径略有不同,可能会收集日志。 你可以试试。
SSHCluster也可能有一个 logs
或 get_logs
方法,但我怀疑它只适用于仍在运行的工人。