我正在运行一个 AWS Glue 作业,该作业正在执行其应该执行的操作,它从 Kinesis 流中获取记录并将其放入数据湖中。但它以失败告终,错误如下:
StreamingQueryException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;
来自cloudwatch的日志:
23/11/06 16:05:59 ERROR GlueExceptionAnalysisListener: [Glue Exception Analysis] {
"Event": "GlueETLJobExceptionEvent",
"Timestamp": 1699286759755,
"Failure Reason": "Traceback (most recent call last):\n File \"/tmp/azure-activity-to-ocsf-pyspark2.py\", line 300, in <module>\n \"checkpointLocation\": args[\"TempDir\"] + \"/\" + args[\"JOB_NAME\"] + \"/checkpoint/\",\n File \"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py\", line 678, in forEachBatch\n raise e\n File \"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py\", line 668, in forEachBatch\n query.start().awaitTermination()\n File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py\", line 101, in awaitTermination\n return self._jsq.awaitTermination()\n File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n answer, self.gateway_client, self.target_id, self.name)\n File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 117, in deco\n raise converted from None\npyspark.sql.utils.StreamingQueryException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: KKMW0JW7VXKSRCCJ; S3 Extended Request ID: K+qiigS/kCTXbG9A7Tc/I7QtLBpsVSfUxzCApZZAtOwkLjYZgoZPFIiHi7+DlHvYwed9syWsx78=; Proxy: null), S3 Extended Request ID: K+qiigS/kCTXbG9A7Tc/I7QtLBpsVSfUxzCApZZAtOwkLjYZgoZPFIiHi7+DlHvYwed9syWsx78=\n=== Streaming Query ===\nIdentifier: [id = b3618842-c4c3-4b21-be2e-679f15677208, runId = 68cf2099-0c14-4fbd-a976-51c9e91d6506]\nCurrent Committed Offsets: {KinesisSource[securityLakeAzureActivityStream]: {\"shardId-000000000002\":{\"iteratorType\":\"TRIM_HORIZON\",\"iteratorPosition\":\"\"},\"metadata\":{\"streamName\":\"securityLakeAzureActivityStream\",\"batchId\":\"2\"},\"shardId-000000000001\":{\"iteratorType\":\"TRIM_HORIZON\",\"iteratorPosition\":\"\"},\"shardId-000000000000\":{\"iteratorType\":\"AFTER_SEQUENCE_NUMBER\",\"iteratorPosition\":\"49645809341923458802793491468334882006293674050208137218\"}}}\nCurrent Available Offsets: {KinesisSource[securityLakeAzureActivityStream]: {\"shardId-000000000002\":{\"iteratorType\":\"TRIM_HORIZON\",\"iteratorPosition\":\"\"},\"metadata\":{\"streamName\":\"securityLakeAzureActivityStream\",\"batchId\":\"3\"},\"shardId-000000000001\":{\"iteratorType\":\"TRIM_HORIZON\",\"iteratorPosition\":\"\"},\"shardId-000000000000\":{\"iteratorType\":\"AFTER_SEQUENCE_NUMBER\",\"iteratorPosition\":\"49645809341923458802793546272532794821692601702159482882\"}}}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [cast(data#18 as string) AS $json$data_infer_schema$_temporary$#27]\n+- Project [UDF(data#5) AS data#18, streamName#6, partitionKey#7, sequenceNumber#8, approximateArrivalTimestamp#9]\n +- StreamingExecutionRelation KinesisSource[securityLakeAzureActivityStream], [data#5, streamName#6, partitionKey#7, sequenceNumber#8, approximateArrivalTimestamp#9]\n",
"Stack Trace": [
{
"Declaring Class": "deco",
"Method Name": "raise converted from None",
"File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
"Line Number": 117
},
{
"Declaring Class": "__call__",
"Method Name": "answer, self.gateway_client, self.target_id, self.name)",
"File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
"Line Number": 1305
},
{
"Declaring Class": "awaitTermination",
"Method Name": "return self._jsq.awaitTermination()",
"File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py",
"Line Number": 101
},
{
"Declaring Class": "forEachBatch",
"Method Name": "query.start().awaitTermination()",
"File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py",
"Line Number": 668
},
{
"Declaring Class": "forEachBatch",
"Method Name": "raise e",
"File Name": "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py",
"Line Number": 678
},
{
"Declaring Class": "<module>",
"Method Name": "\"checkpointLocation\": args[\"TempDir\"] + \"/\" + args[\"JOB_NAME\"] + \"/checkpoint/\",",
"File Name": "/tmp/azure-activity-to-ocsf-pyspark2.py",
"Line Number": 300
}
],
"Last Executed Line number": 300,
"script": "azure-activity-to-ocsf-pyspark2.py"
}
第300行是这样的:
glueContext.forEachBatch(
frame=dataframe_KinesisStream_node1,
batch_function=processBatch,
options={
"windowSize": "100 seconds",
**"checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",**
},
检查点文件夹是在 S3 中创建的,我可以看到。作业设置中给出的临时文件夹访问得很好,我可以在那里看到作业创建的文件。
这是 Glue Job 中的代码:Github 链接
访问角色策略:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::aws-security-data-lake-*/*",
"arn:aws:s3:::securitylake-glue-assets-*/*"
],
"Effect": "Allow"
}
]
}
为什么会发生这个错误?哪个存储桶出现错误?
另一个观察结果:如果我向上面添加删除对象权限,胶水作业将永远运行。
ACL、SCP等没有问题
存储桶(Glue 脚本、临时存储桶)策略:
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "Stmt1683139153218",
"Effect": "Allow",
"Principal": "*",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::securitylake-glue-assets-123456789-us-east-1/*",
"Condition": {
"Bool": {
"aws:SecureTransport": "true"
},
"ArnEquals": {
"aws:PrincipalArn": "arn:aws:iam::123456789:role/securityLakeGlueStreamingRole"
}
}
}
]
}
Security Lake Bucket 策略(Glue Job 写入的位置。我可以在此处看到新文件)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Principal": {
"AWS": "*"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::aws-security-data-lake-us-east-1-ogt1oa9bot0jmeduqbjxzmxzvu9eij/*",
"arn:aws:s3:::aws-security-data-lake-us-east-1-ogt1oa9bot0jmeduqbjxzmxzvu9eij"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
},
{
"Sid": "PutSecurityLakeObject",
"Effect": "Allow",
"Principal": {
"Service": "securitylake.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": [
"arn:aws:s3:::aws-security-data-lake-us-east-1-ogt1oa9bot0jmeduqbjxzmxzvu9eij/*",
"arn:aws:s3:::aws-security-data-lake-us-east-1-ogt1oa9bot0jmeduqbjxzmxzvu9eij"
],
"Condition": {
"StringEquals": {
"aws:SourceAccount": "644107485976",
"s3:x-amz-acl": "bucket-owner-full-control"
},
"ArnLike": {
"aws:SourceArn": "arn:aws:securitylake:us-east-1:644107485976:*"
}
}
}
]
}
在您的访问角色策略中,您需要列出不带通配符以及带通配符的 S3 存储桶,就像您对 Security Lake 策略所做的那样:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::aws-security-data-lake-*/*",
"arn:aws:s3:::aws-security-data-lake-*/",
"arn:aws:s3:::securitylake-glue-assets-*/",
"arn:aws:s3:::securitylake-glue-assets-*/*"
],
"Effect": "Allow"
}
]
}
理想情况下,我根本不会在 S3 存储桶名称中使用通配符,但现在应该可以了。