我无法在 AWS EMR 上运行 dask 纱线集群

问题描述 投票:0回答:1

我想使用 YarnCluster 在 EMR 上运行 dask。 我使用了下面的引导脚本,但我已经在 SSH 控制台中运行了这些说明。

#!/bin/bash
HELP="Usage: bootstrap-dask [OPTIONS]

Example AWS EMR Bootstrap Action to install and configure Dask and Jupyter

By default it does the following things:
- Installs miniconda
- Installs dask, distributed, dask-yarn, pyarrow, and s3fs. This list can be
  extended using the --conda-packages flag below.
- Packages this environment for distribution to the workers.
- Installs and starts a jupyter notebook server running on port 8888. This can
  be disabled with the --no-jupyter flag below.

Options:
    --jupyter / --no-jupyter    Whether to also install and start a Jupyter
                                Notebook Server. Default is True.
    --password, -pw             Set the password for the Jupyter Notebook
                                Server. Default is 'dask-user'.
    --conda-packages            Extra packages to install from conda.
"

# Parse Inputs. This is specific to this script, and can be ignored
# -----------------------------------------------------------------

# -----------------------------------------------------------------------------
# 1. Check if running on the master node. If not, there's nothing do.
# -----------------------------------------------------------------------------
grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \
|| { echo "Not running on master node, nothing to do" && exit 0; }


# -----------------------------------------------------------------------------
# 2. Install Miniconda
# -----------------------------------------------------------------------------
echo "Installing Miniconda"
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh
bash /tmp/miniconda.sh -b -p $HOME/miniconda
rm /tmp/miniconda.sh
echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc
source $HOME/.bashrc
conda update conda -y

# configure conda environment
#source ~/miniconda/etc/profile.d/conda.sh
#conda activate base

# -----------------------------------------------------------------------------
# 3. Install packages to use in packaged environment
#
# We install a few packages by default, and allow users to extend this list
# with a CLI flag:
#
# - dask-yarn >= 0.7.0, for deploying Dask on YARN.
# - pyarrow for working with hdfs, parquet, ORC, etc...
# - s3fs for access to s3
# - conda-pack for packaging the environment for distribution
# - ensure tornado 5, since tornado 6 doesn't work with jupyter-server-proxy
# -----------------------------------------------------------------------------
echo "Installing base packages"
conda install \
-c conda-forge \
-y \
-q \
dask-yarn \
s3fs \
conda-pack \
tornado 

pip3 install pyarrow

# -----------------------------------------------------------------------------
# 4. Package the environment to be distributed to worker nodes
# -----------------------------------------------------------------------------
echo "Packaging environment"
conda pack -q -o $HOME/environment.tar.gz


# -----------------------------------------------------------------------------
# 5. List all packages in the worker environment
# -----------------------------------------------------------------------------
echo "Packages installed in the worker environment:"
conda list


# -----------------------------------------------------------------------------
# 6. Configure Dask
#
# This isn't necessary, but for this particular bootstrap script it will make a
# few things easier:
#
# - Configure the cluster's dashboard link to show the proxied version through
#   jupyter-server-proxy. This allows access to the dashboard with only an ssh
#   tunnel to the notebook.
#
# - Specify the pre-packaged python environment, so users don't have to
#
# - Set the default deploy-mode to local, so the dashboard proxying works
#
# - Specify the location of the native libhdfs library so pyarrow can find it
#   on the workers and the client (if submitting applications).
# ------------------------------------------------------------------------------
echo "Configuring Dask"
mkdir -p $HOME/.config/dask
cat <<EOT >> $HOME/.config/dask/config.yaml
distributed:
  dashboard:
    link: "/proxy/{port}/status"

yarn:
  environment: /home/hadoop/environment.tar.gz
  deploy-mode: local

  worker:
    env:
      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/

  client:
    env:
      ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/
EOT
# Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user
echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc




# -----------------------------------------------------------------------------
# 8. Install jupyter notebook server and dependencies
#
# We do this after packaging the worker environments to keep the tar.gz as
# small as possible.
#
# We install the following packages:
#
# - notebook: the Jupyter Notebook Server
# - ipywidgets: used to provide an interactive UI for the YarnCluster objects
# - jupyter-server-proxy: used to proxy the dask dashboard through the notebook server
# -----------------------------------------------------------------------------
    echo "Installing Jupyter"
    conda install \
    -c conda-forge \
    -y \
    -q \
    notebook \
    ipywidgets \
    jupyter-server-proxy \
    jupyter


# -----------------------------------------------------------------------------
# 9. List all packages in the client environment
# -----------------------------------------------------------------------------
echo "Packages installed in the client environment:"
conda list


# -----------------------------------------------------------------------------
# 10. Configure Jupyter Notebook
# -----------------------------------------------------------------------------
echo "Configuring Jupyter"
mkdir -p $HOME/.jupyter
JUPYTER_PASSWORD="dask-user"
HASHED_PASSWORD=`python -c "from notebook.auth import passwd; print(passwd('$JUPYTER_PASSWORD'))"`
cat <<EOF >> $HOME/.jupyter/jupyter_notebook_config.py
c.NotebookApp.password = u'$HASHED_PASSWORD'
c.NotebookApp.open_browser = False
c.NotebookApp.ip = '0.0.0.0'
c.NotebookApp.port = 8888
EOF


# -----------------------------------------------------------------------------
# 11. Define an upstart service for the Jupyter Notebook Server
#
# This sets the notebook server up to properly run as a background service.
# -----------------------------------------------------------------------------
echo "Configuring Jupyter Notebook Upstart Service"
cat <<EOF > /tmp/jupyter-notebook.service
[Unit]
Description=Jupyter Notebook
[Service]
ExecStart=$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py

Type=simple
PIDFile=/run/jupyter.pid
WorkingDirectory=$HOME
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo mv /tmp/jupyter-notebook.service /etc/systemd/system/
sudo systemctl enable jupyter-notebook.service


# -----------------------------------------------------------------------------
# 12. Start the Jupyter Notebook Server
# -----------------------------------------------------------------------------
echo "Starting Jupyter Notebook Server"
sudo systemctl daemon-reload
sudo systemctl restart jupyter-notebook.service

#$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py

在此之后我使用

$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py
启动 jupyter 笔记本 jupyter笔记本启动成功。 当我在笔记本上运行这段代码时

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster
cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)

它给出了类似的错误

AttributeError                            Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 client = Client(cluster)

File ~/miniconda/lib/python3.9/site-packages/distributed/client.py:835, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    832 elif isinstance(getattr(address, "scheduler_address", None), str):
    833     # It's a LocalCluster or LocalCluster-compatible object
    834     self.cluster = address
--> 835     status = getattr(self.cluster, "status")
    836     if status and status in [Status.closed, Status.closing]:
    837         raise RuntimeError(
    838             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    839         )

AttributeError: 'YarnCluster' object has no attribute 'status'

此外,当我使用 LocalCluster 而不是 YarnCluster 时,它运行得很好。我被困在这里好几天了,请帮忙。还有我们如何配置工作节点。

jupyter-notebook dask hadoop-yarn amazon-emr dask-distributed
1个回答
0
投票

你解决了吗?我陷入了同样的错误:(

© www.soinside.com 2019 - 2024. All rights reserved.