我正在使用 Spark Streaming 2.1。我想定期刷新一些缓存表(由 spark 提供的 DataSource 加载,如 parquet、MySQL 或用户定义的数据源)。
如何刷新表格?
假设我有一些表
spark.read.format("").load().createTempView("my_table")
它也被缓存
spark.sql("cache table my_table")
以下代码是否足以刷新表格,以及何时 接下来加载表,它会自动缓存
spark.sql("refresh table my_table")
或者我必须手动执行
spark.table("my_table").unpersist
spark.read.format("").load().createOrReplaceTempView("my_table")
spark.sql("cache table my_table")
并发刷新表安全吗?
并发我的意思是使用
ScheduledThreadPoolExecutor
在主线程之外进行刷新工作。
当我在表上调用刷新时,如果 Spark 使用缓存表会发生什么情况?
在 Spark 2.2.0 中,如果表被 hive 或一些外部工具更新,他们引入了刷新表元数据的功能。
可以通过API实现,
spark.catalog.refreshTable("my_table")
此 API 将更新该表的元数据以保持一致。
我在使用 SparkSession 从 hive 读取表时遇到问题,特别是方法表,即
spark.table(table_name)
。每次写完表格并尝试阅读它
我收到这个错误:
java.IO.FileNotFoundException ...底层文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或通过重新创建所涉及的数据集/数据帧来显式使 Spark 中的缓存无效。
我尝试使用
spark.catalog.refreshTable(table_name)
刷新表也 sqlContext 都没有用。
我的解决方案写在表格中并阅读后使用:
val usersDF = spark.read.load(s"/path/table_name")
工作正常。
这是个问题吗?也许 hdfs 上的数据还没有更新?
我刷新了表 但是现在我的选择查询没有返回结果。 我在 s3 中有数百个分区。 我如何为所有这些分区重新创建元数据和缓存