执行S3Hook list_keys或read_key方法时出错

问题描述 投票:0回答:1

我收到此错误消息:

{logging_mixin.py:112} INFO - [2020-03-22 12:34:53,672] {local_task_job.py:103} INFO - Task exited with return code -6
当我使用 S3 hook 的
list_keys
read_key
方法时。不过
get_credentials
方法效果很好。查遍了也没找到为什么会出现这种情况。

我正在使用

apache-airflow==1.10.9
boto3==1.12.21
botocore==1.15.21

这是我使用 S3Hook 的自定义运算符的代码:

class SASValueToRedshiftOperator(BaseOperator):
    """Custom Operator for extracting data from SAS source code.
    Attributes:
        ui_color (str): color code for task in Airflow UI.
    """
    ui_color = '#358150'

    @apply_defaults
    def __init__(self,
                 aws_credentials_id="",
                 redshift_conn_id="",
                 table="",
                 s3_bucket="",
                 s3_key="",
                 sas_value="",
                 columns="",
                 *args, **kwargs):
        """Extracts label mappings from SAS source code and store as Redshift table
        Args:
            aws_credentials_id (str): Airflow connection ID for AWS key and secret.
            redshift_conn_id (str): Airflow connection ID for redshift database.
            table (str): Name of table to load data to.
            s3_bucket (str): S3 Bucket Name Where SAS source code is store.
            s3_key (str): S3 Key Name for SAS source code.
            sas_value (str): value to search for in sas file for extraction of data.
            columns (list): resulting data column names.
        Returns:
            None
        """
        super(SASValueToRedshiftOperator, self).__init__(*args, **kwargs)
        self.aws_credentials_id = aws_credentials_id
        self.redshift_conn_id = redshift_conn_id
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.sas_value = sas_value
        self.columns = columns

    def execute(self, context):
        """Executes task for staging to redshift.
        Args:
            context (:obj:`dict`): Dict with values to apply on content.
        Returns:
            None   
        """
        s3 = S3Hook(self.aws_credentials_id)
        redshift_conn = BaseHook.get_connection(self.redshift_conn_id)

        self.log.info(s3)
        self.log.info(s3.get_credentials())
        self.log.info(s3.list_keys(self.s3_bucket))
python boto3 airflow
1个回答
0
投票
s3 = S3Hook(self.aws_credentials_id)
s3.list_keys(bucket_name=s3_bucket, prefix= s3_path, delimiter=delimiter)
© www.soinside.com 2019 - 2024. All rights reserved.