我正在使用docker-compose来设置可扩展的气流群集。我的解决方案是基于这个Dockerfile https://hub.docker.com/r/puckel/docker-airflow/
我的问题是将日志设置为从s3写入/读取。当一个dag完成后,我得到这样的错误
*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.
*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00
我在airflow.cfg
文件中设置了一个新的部分
[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx
然后在airflow.cfg
的远程日志部分中指定s3路径
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn
我是否正确设置了这个并且有错误?这里有成功的秘诀吗?
- 更新
我尝试以URI和JSON格式导出,似乎都不起作用。然后我导出了aws_access_key_id和aws_secret_access_key,然后气流开始捡起它。现在我在工作日志中得到了他的错误
6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
- 更新
我发现这个链接以及https://www.mail-archive.com/[email protected]/msg00462.html
然后我进入我的一个工作机器(与Web服务器和调度程序分开)并在python中运行这段代码
import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))
我收到此错误。
boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden
我尝试导出几种不同类型的AIRFLOW_CONN_
envs,如https://airflow.incubator.apache.org/concepts.html连接部分和此问题的其他答案所述。
s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3
{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}
{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}
我还导出了AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY但没有成功。
这些凭据存储在数据库中,因此一旦我在UI中添加它们,它们应该由工作人员接收,但由于某些原因他们无法写入/读取日志。
您需要通过气流UI设置s3连接。为此,您需要转到气流UI上的Admin - > Connections选项卡,并为S3连接创建一个新行。
一个示例配置是:
Conn Id:my_conn_S3
Conn类型:S3
额外:{“aws_access_key_id”:“your_aws_key_id”,“aws_secret_access_key”:“your_aws_secret_key”}
更新气流1.10使记录a lot easier.
对于s3日志记录,按照the above answer设置连接挂钩
然后只需将以下内容添加到airflow.cfg中
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
对于gcs日志记录,
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_logging = True
remote_base_log_folder = gs://my-bucket/path/to/logs
remote_log_conn_id = MyGCSConn
注意:从Airflow 1.9开始,远程日志记录一直是significantly altered。如果您使用的是1.9,请继续阅读。
参考here
完整说明:
#Add this variable to the top of the file. Note the trailing slash.
S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
LOGGING_CONFIG = ...
Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
'loggers': {
'airflow.task': {
'handlers': ['s3.task'],
...
},
'airflow.task_runner': {
'handlers': ['s3.task'],
...
},
'airflow': {
'handlers': ['console'],
...
},
}
task_log_reader = s3.task
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = <name of the s3 platform hook>
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
[2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
[2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
[2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
[2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
(自Airflow 1.10.2起更新)
如果您不使用管理界面,这是一个解决方案。
我的Airflow不能在持久性服务器上运行...(它每天都会在Heroku上的Docker容器中重新启动。)我知道我错过了很多很棒的功能,但在我的最小设置中,我永远不要触摸管理界面或cfg文件。相反,我必须在bash脚本中设置特定于Airflow的环境变量,该脚本会覆盖.cfg文件。
Apache的气流[S3]
首先,您需要安装s3
子包才能将Airflow日志写入S3。 (boto3
适用于DAG中的Python作业,但S3Hook
取决于s3子包。)
还有一个注意事项:conda安装doesn't handle this yet,所以我必须做pip install apache-airflow[s3]
。
环境变量
在bash脚本中,我设置了这些core
变量。从these instructions开始,但使用命名约定AIRFLOW__{SECTION}__{KEY}
作为环境变量,我做:
export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
S3连接ID
上面的s3_uri
是我编写的连接ID。在Airflow中,它对应于另一个环境变量AIRFLOW_CONN_S3_URI
。它的值是您的S3路径,必须采用URI形式。那是
s3://access_key:secret_key@bucket/key
存储此,但您处理其他敏感的环境变量。
通过此配置,Airflow将能够将您的日志写入S3。他们将沿着s3://bucket/key/dag/task_id/timestamp/1.log
的道路前进。
有关从Airflow 1.8升级到Airflow 1.10的附录
我最近将我的生产管道从Airflow 1.8升级到1.9,然后升级到1.10。好消息是变化很小;其余的工作只是找出包安装的细微差别(与原始问题有关S3日志无关)。
(1)首先,我需要使用Airflow 1.9升级到Python 3.6。
(2)包裹名称从airflow
变为apache-airflow
1.9。你也可能在你的this遇到pip install
。
(3)包装psutil
必须在Airflow的特定版本范围内。当你做pip install apache-airflow
时,你可能会遇到这个。
(4)Airflow 1.9+需要python3-dev头文件。
(5)以下是实质性的变化:现在需要export AIRFLOW__CORE__REMOTE_LOGGING=True
。和
(6)S3中的日志路径略有不同,我在答案中更新了:s3://bucket/key/dag/task_id/timestamp/1.log
。
但就是这样!日志在1.9中不起作用,所以我建议直接使用1.10,现在可以使用了。
要使用最近的Airflow更新完成Arne的答案,您不需要将task_log_reader
设置为另一个值而不是默认值:task
好像你按照默认的日志记录模板airflow/config_templates/airflow_local_settings.py,你可以看到since this commit(注意处理程序的名称改为to's3': {'task'...
而不是s3.task
),远程文件夹(REMOTE_BASE_LOG_FOLDER
)上的值将用正确的文件夹替换处理程序:
REMOTE_LOGGING = conf.get('core', 'remote_logging')
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
有关如何登录/读取S3的更多详细信息:https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3
对于遵循the above answer中非常有用的说明的任何人,只需注意:如果您偶然发现此问题:“ModuleNotFoundError:没有名为'airflow.utils.log.logging_mixin.RedirectStdHandler'的模块”as referenced here(使用airflow 1.9时会发生),修复很简单 - 使用这个基本模板:https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(并遵循the above answer中的所有其他说明)
主分支中存在的当前模板incubator-airflow/airflow/config_templates/airflow_local_settings.py包含对类“airflow.utils.log.s3_task_handler.S3TaskHandler”的引用,该类在apache-airflow == 1.9.0 python包中不存在。希望这可以帮助!
让它与Kube中的Airflow 10配合使用。我有以下env var集:
AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3