ModuleNotFoundError:没有名为“airflow.providers.influxdb”的模块

问题描述 投票:0回答:1

我正在尝试使用 Apache Airflow 运行一些 InfluxDB 查询,但我不断收到相同的错误消息。我按照这个文档做了所有事情(https://airflow.apache.org/docs/apache-airflow-providers-influxdb/stable/index.html)并且我下载了正确的包,确保我也下载了正确的版本。 我尝试重新安装 apache-airflow-providers-influxdb 软件包并降级版本,但仍然遇到相同的错误。 我希望以前有人遇到过这个问题并可以帮助我。

我在 Windows 上工作,我在 Ubuntu WSL 上尝试过,但仍然遇到相同的错误。我正在使用 python 3.11.2。 这是我的代码

from datetime import datetime
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook
from airflow.operators.python import PythonOperator
import socket
import telnetlib

from influxdb_client import Point


urls = 
token =
organization = 


@task(task_id="fetch_system_stats")
def fetch_system_stats():

    influxdb_hook = InfluxDBHook()
    client = influxdb_hook.get_conn()

    # Fetch system stats from InfluxDB
    tables = client.query('''
        from(bucket: "a1hrinfra/autogen")
        |> range(start: -5m)
        |> filter(fn: (r) => r["_measurement"] == "system")
        |> filter(fn: (r) => r["_field"] == "uptime")
        |> filter(fn: (r) => r["osmajor"] == )
        |> filter(fn: (r) => not (r["host"] =~ /^xlvp.*|^xlvt.*|^lx.*/))
        |> group(columns: ["host"])
        |> last(column: "_time")
    ''')

    # Get IP addresses of hosts
    ips = []
    for row in tables:
        ips.append(row['host'])

    # Connect to each host with telnet to check availability
    for ip in ips:
        try:
            tn = telnetlib.Telnet(ip, 22, timeout=5)
            tn.close()
            data = Point("system").tag("live host", ip).field("response", "Available")
            influxdb_hook.write(bucket="bucket", org=organization, record=data)
        except Exception as e:
            data = Point("system").tag("live host", ip).field("response", f"Not available: {e}")
            influxdb_hook.write(bucket="bucket", org=organization, record=data)

def get_host(ip):
    return socket.gethostbyaddr(ip)[0]

with DAG(
    dag_id="influx_server_availabilty",
    schedule=None,
    start_date=datetime(2021, 1, 1),
    max_active_runs=1,
) as dag:
    fetch_system_stats()

python pip module airflow influxdb
1个回答
0
投票

我必须编写一个自定义 Dockerfile 并用它运行 compose。 https://airflow.apache.org/docs/docker-stack/build.html#examples-of-image-extending

© www.soinside.com 2019 - 2024. All rights reserved.