Spark提供了一些出色的流功能。最近,https://spark.rstudio.com/guides/streaming/ R使用结构化流媒体通过sparklyR获得了流媒体功能。
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html支持许多JOIN变体(在某个窗口中加水印)
如何在sparklyR中使用这些窗口功能?
我对两种情况感兴趣:
(标度)
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
(R)stream_watermark(df,column =“timestamp”,threshold =“10 minutes”)替换此.withWatermark("timestamp", "10 minutes")
我在哪里可以找到window($"timestamp", "10 minutes", "5 minutes"),
?
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking如何移植到sparklyR?
// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
如何设置触发器:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers中定义的触发器
stream_trigger_interval
可以指定固定的触发间隔,但是未指定或运行一次或连续?
library(sparklyr)
library(dplyr)
library(future)
conf <- spark_config()
spark <- spark_connect(master = "local", config = conf)
`
source <- "streaming_source"
destination <- "streaming_destination"
if(file.exists(source)) unlink(source, TRUE)
if(file.exists(destination)) unlink(destination, TRUE)
lenght_df <- 1000
dates <- base::sample(seq(as.Date('2018-01-01'), as.Date('2019-06-01'), by="day"), replace = TRUE, lenght_df)
values <- rnorm(lenght_df)
event_category <- base::sample(c("regular", "error", "security_alert"), lenght_df, replace = TRUE)
sampling_df <- data.frame(values, dates, event_category)
sampling_df <- sampling_df %>%
rename(timestamp = dates) %>%
rename(category = event_category) %>%
rename(value = values)
head(sampling_df)
stream_generate_test(df = sampling_df, iterations = 1, path = source)
read_folder <- stream_read_csv(spark, source)
process_stream <- read_folder %>%
stream_watermark(column="timestamp", threshold="5 days") %>%
group_by(time_window = window(timestamp, "7 days", "3 days"), category) %>%
summarise(
mean = mean(value, na.rm = TRUE),
count = n()
) %>%
sdf_separate_column("time_window", into=c("beginning", "end")) %>%
select(-time_window) %>%
arrange(desc(count))
my_table <- "stream"
write_output <- stream_write_memory(process_stream, name = my_table)
##########################################
tbl(spark, my_table) # execute repeatedly
tbl(spark, my_table) # execute repeatedly
tbl(spark, my_table) # execute repeatedly
##########################################
invisible(future(stream_generate_test(df = sampling_df, interval = 0.2, iterations = 100, path = source)))
stream_view(write_output)
Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition
TODO弄清楚了