如何在 AWS Glue 工作线程中记录消息(在地图函数内)?

问题描述 投票:0回答:3

我能够按照 https://docs.aws.amazon.com/glue/latest/dg/monitor-continuous-logging-enable.html 中的说明进行操作,并在驱动程序中记录消息。但是当我尝试像这样在地图函数中使用记录器时

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
logger.info("starting glue job...") #successful
...
def transform(item):
    logger.info("starting transform...") #error
    ...transform logics...

Map.apply(frame = dynamicFrame, f = transform)

我收到此错误:

PicklingError:无法序列化对象:TypeError:无法pickle _thread.RLock对象

我进行了研究,该消息暗示记录器对象在传递给工作人员时无法序列化。

登录 AWS Glue 工作线程的正确方法是什么?

logging pyspark aws-glue aws-glue-spark
3个回答
1
投票

您的记录器对象无法发送到远程执行器,这就是您收到序列化错误的原因。 您必须在映射器函数内初始化记录器。

但是在转换中这样做可能会消耗大量资源。理想情况下,映射器应该快速且轻量,因为它们是在每一行上执行的。

以下是您至少可以在 Glue V3 中执行此操作的方法。日志事件最终将出现在错误日志中。

def transform(record):
    logging.basicConfig(level=logging.INFO, format="MAPPER %(asctime)s [%(levelname)s] [%(name)s] %(message)s")
    map_logger = logging.getLogger()
    map_logger.info("an info event")
    map_logger.error("an error event")

    return record

这是一个完整的示例脚本:

import logging

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms.dynamicframe_map import Map
from pyspark.context import SparkContext
from pyspark.sql.types import Row, IntegerType

# Configure logging for the driver
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] [%(name)s] %(message)s')
logger = logging.getLogger(__name__)


def main():
    logger.info("======== Job Initialisation ==========")

    sc = SparkContext()
    glue_context = GlueContext(sc)
    spark_session = glue_context.spark_session

    logger.info("======== Start of ETL ==========")

    df = spark_session.createDataFrame(range(1, 100), IntegerType())

    dynf = DynamicFrame.fromDF(df, glue_context, "dynf")

    # Apply mapper function on each row
    dynf = Map.apply(frame=dynf, f=transform)

    logger.info(f"Result: {dynf.show(10)}")

    logger.info("Done")


def transform(record):
    logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
    map_logger = logging.getLogger("transformer")
    map_logger.info("an info event")
    map_logger.error("an error event")

    return record

main()

1
投票

我在 Glue 中遇到了同样的问题,甚至尝试联系 AWS,但没有运气,然后@selle 的答案帮助了我。但我发现,如果没有记录器,您可以在执行器的错误日志中看到打印。您只需深入研究错误日志即可。

这将为您提供有关胶水原木的清晰图片...... https://docs.aws.amazon.com/glue/latest/dg/reduced-start-times-spark-etl-jobs.html#reduced-start-times-logging

顺便说一句,这是我在堆栈溢出中的第一篇文章


0
投票

哇,这个问题还困扰了我几天。最后,看到@Vignxsh的帖子后解决了。

© www.soinside.com 2019 - 2024. All rights reserved.