我正在尝试使用 Apache Airflow 运行一些 InfluxDB 查询,但我不断收到相同的错误消息。我按照这个文档做了所有事情(https://airflow.apache.org/docs/apache-airflow-providers-influxdb/stable/index.html)并且我下载了正确的包,确保我也下载了正确的版本。 我尝试重新安装 apache-airflow-providers-influxdb 软件包并降级版本,但仍然遇到相同的错误。 我希望以前有人遇到过这个问题并可以帮助我。
我在 Windows 上工作,我在 Ubuntu WSL 上尝试过,但仍然遇到相同的错误。我正在使用 python 3.11.2。 这是我的代码
from datetime import datetime
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.influxdb.hooks.influxdb import InfluxDBHook
from airflow.operators.python import PythonOperator
import socket
import telnetlib
from influxdb_client import Point
urls =
token =
organization =
@task(task_id="fetch_system_stats")
def fetch_system_stats():
influxdb_hook = InfluxDBHook()
client = influxdb_hook.get_conn()
# Fetch system stats from InfluxDB
tables = client.query('''
from(bucket: "a1hrinfra/autogen")
|> range(start: -5m)
|> filter(fn: (r) => r["_measurement"] == "system")
|> filter(fn: (r) => r["_field"] == "uptime")
|> filter(fn: (r) => r["osmajor"] == )
|> filter(fn: (r) => not (r["host"] =~ /^xlvp.*|^xlvt.*|^lx.*/))
|> group(columns: ["host"])
|> last(column: "_time")
''')
# Get IP addresses of hosts
ips = []
for row in tables:
ips.append(row['host'])
# Connect to each host with telnet to check availability
for ip in ips:
try:
tn = telnetlib.Telnet(ip, 22, timeout=5)
tn.close()
data = Point("system").tag("live host", ip).field("response", "Available")
influxdb_hook.write(bucket="bucket", org=organization, record=data)
except Exception as e:
data = Point("system").tag("live host", ip).field("response", f"Not available: {e}")
influxdb_hook.write(bucket="bucket", org=organization, record=data)
def get_host(ip):
return socket.gethostbyaddr(ip)[0]
with DAG(
dag_id="influx_server_availabilty",
schedule=None,
start_date=datetime(2021, 1, 1),
max_active_runs=1,
) as dag:
fetch_system_stats()
我必须编写一个自定义 Dockerfile 并用它运行 compose。 https://airflow.apache.org/docs/docker-stack/build.html#examples-of-image-extending