我已经将我的spark作业从spark 3.3.1升级到spark 3.5.0,我正在查询Mysql数据库并应用
UPPER(col) = UPPER(value)
在后续的sql查询中。它在 Spark 3.3.1 中按预期工作,但在 3.5.0 中不工作。
地点条件::
UPPER(vn) = 'ERICSSON' AND (upper(st) = 'OPEN' OR upper(st) = 'REOPEN' OR upper(st) = 'CLOSED')
数据库中的 st 列是 ENUM,它导致了问题。
以下是FILTER阶段的物理计划:
对于 3.3.1 :
+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED)))
对于 3.5.0 :
+- Filter ((upper(vn#11) = ERICSSON) AND (((upper(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true)) = OPEN) OR (upper(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true)) = REOPEN)) OR (upper(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true)) = CLOSED)))
我调试了一下,发现Spark在3.4.0版本中添加了一个属性,即spark.sql.readSideCharPadding,其默认值为true。
JIRA 链接:
https://issues.apache.org/jira/browse/SPARK-40697
在类CharVarcharCodegenUtils
中添加了一个新方法public static UTF8String readSidePadding(UTF8String inputStr, int limit) {
int numChars = inputStr.numChars();
if (numChars == limit) {
return inputStr;
} else if (numChars < limit) {
return inputStr.rpad(limit, SPACE);
} else {
return inputStr;
}
}
此方法在读取并导致问题时向 ENUM 值附加一些空白填充。
当我从 where 条件中删除 UPPER 函数时,FILTER 阶段如下所示:
+- Filter (((staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true) = OPEN ) OR (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true) = REOPEN )) OR (staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, st#42, 13, true, false, true) = CLOSED ))
您可以看到它在值后面添加了一些空格,并且查询运行良好,给出了正确的结果。
但是使用 UPPER 函数我没有获取数据。
我还尝试在以下情况下禁用此属性spark.sql.readSideCharPadding = false:
+- Filter (((upper(st#42) = OPEN) OR (upper(st#42) = REOPEN)) OR (upper(st#42) = CLOSED))
它将过滤器推送到带有空格的Mysql,但我没有获取数据。
PushedFilters: [*IsNotNull(vn), *EqualTo(vn,ERICSSON), *Or(Or(EqualTo(st,OPEN ),EqualTo(st,REOPEN )),EqualTo(st,CLOSED ))]
我无法将此过滤器移至 JDBC 读取查询,也无法删除 where 子句中的此 UPPER 函数。
任何人都可以有任何解决方案来解决这个问题而不更改我的代码吗?