闪烁的流媒体加入状态

问题描述 投票:0回答:1

Spark提供了一些出色的流功能。最近,https://spark.rstudio.com/guides/streaming/ R使用结构化流媒体通过sparklyR获得了流媒体功能。

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html支持许多JOIN变体(在某个窗口中加水印)

如何在sparklyR中使用这些窗口功能?

编辑

我对两种情况感兴趣:

Windowed aggregation

(标度)

val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

(R)stream_watermark(df,column =“timestamp”,threshold =“10 minutes”)替换此.withWatermark("timestamp", "10 minutes")我在哪里可以找到window($"timestamp", "10 minutes", "5 minutes"),

Streaming joins

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking如何移植到sparklyR?

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)

trigger

如何设置触发器:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers中定义的触发器

stream_trigger_interval可以指定固定的触发间隔,但是未指定或运行一次或连续?

r apache-spark sparklyr
1个回答
0
投票

对于窗口聚合:

library(sparklyr)
library(dplyr)
library(future)
conf <- spark_config()
spark <- spark_connect(master = "local", config = conf)
`
source <- "streaming_source"
destination <- "streaming_destination"
if(file.exists(source)) unlink(source, TRUE)
if(file.exists(destination)) unlink(destination, TRUE)

lenght_df <- 1000
dates <- base::sample(seq(as.Date('2018-01-01'), as.Date('2019-06-01'), by="day"), replace = TRUE, lenght_df)
values <- rnorm(lenght_df)
event_category <- base::sample(c("regular", "error", "security_alert"), lenght_df, replace = TRUE)
sampling_df <- data.frame(values, dates, event_category)
sampling_df <- sampling_df %>%
  rename(timestamp = dates) %>%
  rename(category = event_category) %>%
  rename(value = values)
head(sampling_df)

stream_generate_test(df = sampling_df, iterations = 1, path = source)

read_folder <- stream_read_csv(spark, source) 
process_stream <- read_folder %>%
  stream_watermark(column="timestamp", threshold="5 days") %>% 
  group_by(time_window = window(timestamp,  "7 days", "3 days"), category) %>%
  summarise(
    mean = mean(value, na.rm = TRUE),
    count = n()
  ) %>%
  sdf_separate_column("time_window", into=c("beginning", "end")) %>%
  select(-time_window) %>%
  arrange(desc(count))

my_table <- "stream"
write_output <- stream_write_memory(process_stream, name = my_table)
##########################################
tbl(spark, my_table)  # execute repeatedly
tbl(spark, my_table)  # execute repeatedly
tbl(spark, my_table)  # execute repeatedly
##########################################

invisible(future(stream_generate_test(df = sampling_df, interval = 0.2, iterations = 100, path = source)))
stream_view(write_output)

流连接

  • static-stream工作正常
  • 流流: 内部:有效 外:TODO弄明白,失败了:Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition

触发器

TODO弄清楚了

© www.soinside.com 2019 - 2024. All rights reserved.