我有一个在集群模式下运行的简单Spark应用程序。
val funcGSSNFilterHeader = (x: String) => {
println(!x.contains("servedMSISDN")
!x.contains("servedMSISDN")
}
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val ggsnFileLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\GGSN\\Files1", filterF, false)
val ggsnArrays = ggsnFileLines
.map(x => x._2.toString()).filter(x => funcGSSNFilterHeader(x))
ggsnArrays.foreachRDD(s => {println(x.toString()})
我需要在map函数中打印!x.contains(“servedMSISDN”)以进行调试,但这不会在控制台上打印
您的代码包含驱动程序(主/主)和执行程序(在集群模式下以节点运行)。
在“map”中运行的函数在执行程序上运行
即,当您处于群集模式时,在地图功能中执行打印将导致打印到节点控制台(您将看不到)。
为了调试程序,您可以:
请注意,除了本地vs群集模式之外 - 您的代码中似乎有拼写错误:
ggsnArrays.foreachRDD(s => {println(x.toString()})
应该:
ggsnArrays.foreachRDD(s => {println(x.toString)})
两种可能性:您的日志位于工作节点上,因此您必须检查工作日志以获取这些日志消息。如前所述,您可以在本地模式下运行应用程序以检查计算机上的日志。顺便说一下,最好使用SLF4j而不仅仅是println,但我认为这只是为了学习:)
在片段中没有ssc.start()
和ssc.awaitTermination()
。你运行这些命令了吗?如果没有,foreachRDD将不会被执行。如果示例没问题,请在脚本末尾添加这些行并重试,但请检查工作节点日志:)