我目前有一个流程,但需要重新制定它以提高效率。我将描述新流程,我想知道新流程是否最优并遵循正确的编码实践。
本质上,我只有一个数据源:Teradata。我使用 SQL 从 Teradata 提取数据,这些 SQL 查询嵌入在 R 中。此 R 脚本将在工作日每天运行,但周末不运行。 R 脚本应该提取最近 14 天的数据,但是由于它每天运行,因此在第一次数据提取之后,我只需要从那时起提取最新一天的数据。
我正在使用来自 Teradata 的 7 个单独的表,这些表也是每天加载的。我应该有某种触发器来检查每个表以查看它是否已加载新数据,如果已加载,则提取新数据。如果没有,请闲置 5 分钟,然后再次检查。
此外,周二、周三、周四、周五,我只需要查询最新一天的新数据。然而,在周一,由于该过程不在周末运行,我将需要最近 3 天的数据(周日、周六、周五)。
这是我的一些非常粗略的伪代码大纲:
1. Define functions:
~~~
2. Pull in the base data for the last 14 days:
A <- sql_query_last_14_days
## this assignment only needs to be ran once, thus will comment it out after the
## first go around
B <- saved RData file
## this will not be used in the first run of the process, but will be used for the
## second run and every subsequent run
3. Loop to check for table loading and query data:
while True:
for each table in tables_list:
if table_is_loaded(table):
if is_monday():
query_and_append_data_for_monday(table)
else:
query_and_append_data(table)
break
else:
wait_for_table_loading()
4. Data Manipulation
5. Export the updated dataset to CSV.
6. Save the dataset to an .RData file.
使用
now <- Sys.time()
和prevtime <- now - 14*86400
(两周前)初始化一个流程,查询这两个时间之间有时间戳的所有数据。 (实际上,如果您将其作为脚本运行,则定义 now
和查询命中之间的时间间隔将是毫秒,因此它实际上应该是“所有较新的数据”。获取此数据后,保存 (saveRDS(list(dat = querydat, time = now), format(now, format="%Y%m%d.rds"))
或类似的。
第二天启动时,拉取前一天的
.rds
文件(例如 yest <- readRDS("20240402.rds")
),使用 prevtime <- yest$time
和 now <- Sys.time()
并再次运行查询。当这种情况发生在周一时,它自然会查询整个周末,因为最后保存的 .rds
文件应该来自上周五。 (此外,如果您由于假期或假期或“哎呀”或其他原因而碰巧跳过一天,下一个查询将自动包含您上次离开的位置。)
利用这些数据,按照您需要的方式处理数据。根据数据的大小和计算的复杂性,我通常更喜欢保存“原始”(查询后立即)数据;我这样做是因为如果我的处理过程发生任何变化,我不需要重新下载原始数据就能够重新开始计算。如果计算“昂贵”并且大小不是过高,请考虑将原始数据和计算后数据存储到
.rds
文件(或单独的 .rds
文件)中,以减少“明天”所需的工作量。
文件格式主要是偏好。我更喜欢
.rds
而不是 .rda
,因为我不喜欢 load(.)
在我的环境中创建多个对象,而且我真的很喜欢将原始数据及其相关时间戳保留在同一个地方。如果您需要导出为CSV(例如,对于某些外部非R进程)那么没关系,否则我倾向于知道当我很匆忙时使用CSV,数据不大,并且我希望需要手动和/或使用 Excel 查看原始数据。如果我正在为某些 Web 服务/端点/闪亮应用程序“部署”数据数据,那么我经常使用 .parquet
文件来实现表格延迟访问、压缩、效率和接近 .rds
的速度,比 CSV 快得多以及更多“大数据”友好。