Dask: SSHCluster 工人登录到哪里?(KilledWorker异常)

问题描述 投票:0回答:1

我为dask写了一个小的启动脚本。SSHClusterLocalCluster 工作得很好(见下文)。

但是,当我让它运行时,有一个工人用它的

KilledWorker: ("('from-delayed-pandas_read_text-read-block-head-1-1-from-delayed-f0f78ffea007aeeccc7a9e04880d0195', 0)", <Worker 'tcp://192.168.56.11:34965', name: 0, memory: 0, processing: 1>)

例外,我不知道在哪里可以找到 崩溃日志 回溯 的远程工作人员?有办法在Scheduler Host上集中收集吗?还是在远程机器上有一些日志?

现有答案 只说明,它是登录到 stdout - 但我也找不到任何日志......。

from dask.distributed import Client, LocalCluster, SSHCluster
import time, datetime
import pandas as pd
import numpy as np
import os, sys
from collections import Counter

# Add Home Folder of CGi to path
os.environ["HOME"] = "/home/cgi/"
os.path.expanduser("~/.ssh/id_rsa")
#os.path.expanduser('/home/cgi/')
#os.path.expandvars('/home/cgi/')


def run_cluster(local=0, HOST = '10.8.0.1', SCHEDULER_PORT = 8711, DASHBOARD_PORT=8710,
                DASK_WORKER_PROCESSES = 16, NTHREADS=2, SILENCE_LOGS = 0):

    start_msg = "Starting a "
    if local: start_msg += "local"
    else: start_msg += "ssh"
    start_msg += " dask cluster. SCHEDULER_PORT=%s and DASHBOARD_PORT=%s." % ( SCHEDULER_PORT, DASHBOARD_PORT )
    print(start_msg)

    dashboard_address = ':%s' % DASHBOARD_PORT

    if local:
        cluster = LocalCluster(dashboard_address=dashboard_address, scheduler_port=SCHEDULER_PORT,
                               n_workers=DASK_WORKER_PROCESSES, host=HOST, silence_logs=SILENCE_LOGS)
    else:
        worker_hosts = [
            "localhost", "localhost", "localhost", "localhost",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11",
            "192.168.56.11","192.168.56.11","192.168.56.11","192.168.56.11"
        ]
        print("Starting a DASK SSHCluster with (%s) workers on %s different hosts... "
              % len(worker_hosts), len(set(worker_hosts)))

        cluster = SSHCluster(
            worker_hosts,
            connect_options = {"known_hosts": None},
            worker_options = {"nthreads": NTHREADS},
            scheduler_options={"port": SCHEDULER_PORT, "dashboard_address": dashboard_address}
        )
        print("SSHCLUSTER>%s" % cluster)

    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, dashboard_address))

    #cluster.scale(3)
    client = Client(cluster)
    #print(cluster)
    print(client)
    print("Press Enter to quit ...")
    while (not time.sleep(5)):
        continue
        time.sleep(0.1)

if __name__ == '__main__':

    import argparse

    parser = argparse.ArgumentParser(description='Webserver which runs the dash/plotly dashboard(s). Name: BT_HISTORY')
    parser.add_argument('-l', '--local-cluster', help='1/0', required=False, default=1)
    parser.add_argument('-q', '--quiet', help='1/0', required=False, default=False)
    parser.add_argument('-dp', '--dashboard-port', help='port to run dashboard (default=%s)' % 8710, default=8710, required=False)
    parser.add_argument('-sp', '--scheduler-port', help='port to run scheduler (default=%s)' % 8711, default=8711, required=False)
    args = vars(parser.parse_args())
    print("args>%s" % args)
    LOCAL = int(args['local_cluster'])
    DASHBOARD_PORT = int(args['dashboard_port'])
    SCHEDULER_PORT = int(args['scheduler_port'])
    SILENCE_LOGS = int(args['quiet'])

    run_cluster(local=LOCAL, DASHBOARD_PORT=DASHBOARD_PORT, SCHEDULER_PORT=SCHEDULER_PORT, SILENCE_LOGS=SILENCE_LOGS)
logging dask dask-distributed
1个回答
0
投票

你说的没错,Dask Worker会将数据管道到stdout。 收集这些日志通常是其他系统的工作。

我相信,从今天起(2020-06-12)。dask-ssh 命令行工具实际上使用的代码路径略有不同,可能会收集日志。 你可以试试。

SSHCluster也可能有一个 logsget_logs 方法,但我怀疑它只适用于仍在运行的工人。

© www.soinside.com 2019 - 2024. All rights reserved.