我使用 R 处理 zigbee2mqtt 的日志文件,它记录这样的条目
head(data)
[1] "info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{\"battery\":17,\"humidity\":50.1,\"linkquality\":105,\"temperature\":17.25,\"voltage\":2500}'"
[2] "info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{\"battery\":20,\"battery_low\":true,\"illuminance\":37914,\"illuminance_lux\":12369,\"led_control\":\"off\",\"linkquality\":69,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2600}'"
[3] "info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{\"linkquality\":21,\"state\":\"OFF\"}'"
[4] "info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{\"battery\":60,\"battery_low\":false,\"illuminance\":31370,\"illuminance_lux\":1371,\"led_control\":\"fault_only\",\"linkquality\":21,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17.06,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2800}'"
[5] "info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{\"linkquality\":21,\"state\":\"OFF\"}'"
[6] "info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{\"battery\":14,\"humidity\":54.87,\"linkquality\":21,\"temperature\":14.41,\"voltage\":2500}'"
我有
data<-c("info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{\"battery\":17,\"humidity\":50.1,\"linkquality\":105,\"temperature\":17.25,\"voltage\":2500}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{\"battery\":20,\"battery_low\":true,\"illuminance\":37914,\"illuminance_lux\":12369,\"led_control\":\"off\",\"linkquality\":69,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2600}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{\"linkquality\":21,\"state\":\"OFF\"}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{\"battery\":60,\"battery_low\":false,\"illuminance\":31370,\"illuminance_lux\":1371,\"led_control\":\"fault_only\",\"linkquality\":21,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17.06,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2800}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{\"linkquality\":21,\"state\":\"OFF\"}'",
"info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{\"battery\":14,\"humidity\":54.87,\"linkquality\":21,\"temperature\":14.41,\"voltage\":2500}'"
我做了以下处理:
library(tidyverse)
library(jsonlite)
library(RColorBrewer)
library(ggthemes)
library(furrr)
plan(multisession, workers = availableCores() - 1)
log_data.start.timestamp <- Sys.time()
log_data <- data %>%
enframe(name = NULL, value = "line") %>%
mutate(timestamp = str_extract(line, "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}"),
topic = str_extract(line, "(?<=topic ')[^']+(?=')"),
payload = str_extract(line, "(?<=payload ')[^']+(?=')")) %>%
mutate(topic = str_remove(topic, "zigbee2mqtt/")) %>%
distinct() %>%
mutate(payload = future_map(payload, ~{
tryCatch({
json <- fromJSON(.x, flatten = TRUE)
# Remove unwanted fields
json$data <- NULL
json$update <- NULL
json$type <- NULL
json$tamper <- NULL
json$power_on_behavior <- NULL
json$battery_low <- NULL
json$occupancy_timeout <- NULL
as_tibble(json)
}, error = function(e) {
tibble(value = .x)
})
})) %>%
select(-line) %>%
mutate(timestamp = as.POSIXct(timestamp, format = "%Y-%m-%d %H:%M:%S")) %>%
unnest(payload) %>%
mutate(date = as.Date(timestamp), time = format(timestamp, "%H:%M:%S")) %>%
distinct() %>%
arrange(timestamp)
Sys.time()-log_data.start.timestamp
我明白了
head(log_data)
# A tibble: 6 × 15
timestamp topic battery humidity linkquality temperature voltage illuminance illuminance_lux led_control occupancy state action date time
<dttm> <chr> <int> <dbl> <int> <dbl> <int> <int> <int> <chr> <lgl> <chr> <chr> <date> <chr>
1 2024-03-11 14:08:01 TempBadEG 17 50.1 105 17.2 2500 NA NA NA NA NA NA 2024-03-11 14:08:01
2 2024-03-11 14:08:04 MotionBadOben 20 NA 69 17 2600 37914 12369 off FALSE NA NA 2024-03-11 14:08:04
3 2024-03-11 14:08:04 SwitchBadOben NA NA 21 NA NA NA NA NA NA OFF NA 2024-03-11 14:08:04
4 2024-03-11 14:08:22 MotionBadEG 60 NA 21 17.1 2800 31370 1371 fault_only FALSE NA NA 2024-03-11 14:08:22
5 2024-03-11 14:08:22 SwitchBadEG NA NA 21 NA NA NA NA NA NA OFF NA 2024-03-11 14:08:22
6 2024-03-11 14:08:44 TempKucheUG 14 54.9 21 14.4 2500 NA NA NA NA NA NA 2024-03-11 14:08:44
从那里我可以进行分析。 但是, 对于 5 天的日志记录(5 MB 输入),尽管使用
furrr
我需要 20 秒来完成此步骤。我怀疑有效负载处理和 json 扁平化会导致处理时间增加。
我无法想象如果我得到一年的数据,这将如何扩大规模。由于不同的传感器报告不同的有效负载,因此我需要一种通用方法来展开 json 有效负载,而无需预先指定变量。这还允许添加或更改可能报告不同变量的传感器。
有没有办法让它更高效、更快?
我没有对它进行基准测试,但我猜想所有并行处理开销以及将每个(相对较小的)JSON 移入和移出工作程序的成本都高于您从中获得的收益。如果您需要并行化,也许可以考虑更大的块,即所有每日或至少所有每小时记录。
以下只是测试
jsonlite::stream_in()
(使用默认处理程序)首先将所有 json 字符串组合为单个 nd-json 字符串时的执行情况。
生成了 25000 行(磁盘上大约 5.3MB 作为文本)用于测试,处理时间不到 6 秒。
library(dplyr)
library(tidyr)
data_raw <-
c("info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{\"battery\":17,\"humidity\":50.1,\"linkquality\":105,\"temperature\":17.25,\"voltage\":2500}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{\"battery\":20,\"battery_low\":true,\"illuminance\":37914,\"illuminance_lux\":12369,\"led_control\":\"off\",\"linkquality\":69,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2600}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{\"linkquality\":21,\"state\":\"OFF\"}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{\"battery\":60,\"battery_low\":false,\"illuminance\":31370,\"illuminance_lux\":1371,\"led_control\":\"fault_only\",\"linkquality\":21,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17.06,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2800}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{\"linkquality\":21,\"state\":\"OFF\"}'",
"info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{\"battery\":14,\"humidity\":54.87,\"linkquality\":21,\"temperature\":14.41,\"voltage\":2500}'") |>
rep(length.out = 25000)
glimpse(data_raw)
#> chr [1:25000] "info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{\"battery\":17,\"humidity\":5"| __truncated__ ...
tictoc::tic()
data <-
data_raw |>
tibble(line = _) |>
separate_wider_regex(line, patterns = c("^info\\s+",
ts = ".*", ": MQTT publish: topic '",
topic = "[^']+", "',[^']+'",
payload = "[^']+", "'$")) |>
mutate(ts = lubridate::ymd_hms(ts))
data
#> # A tibble: 25,000 × 3
#> ts topic payload
#> <dttm> <chr> <chr>
#> 1 2024-03-11 14:08:01 zigbee2mqtt/TempBadEG "{\"battery\":17,\"humidity\":…
#> 2 2024-03-11 14:08:04 zigbee2mqtt/MotionBadOben "{\"battery\":20,\"battery_low…
#> 3 2024-03-11 14:08:04 zigbee2mqtt/SwitchBadOben "{\"linkquality\":21,\"state\"…
#> 4 2024-03-11 14:08:22 zigbee2mqtt/MotionBadEG "{\"battery\":60,\"battery_low…
#> 5 2024-03-11 14:08:22 zigbee2mqtt/SwitchBadEG "{\"linkquality\":21,\"state\"…
#> 6 2024-03-11 14:08:44 zigbee2mqtt/TempKucheUG "{\"battery\":14,\"humidity\":…
#> 7 2024-03-11 14:08:01 zigbee2mqtt/TempBadEG "{\"battery\":17,\"humidity\":…
#> 8 2024-03-11 14:08:04 zigbee2mqtt/MotionBadOben "{\"battery\":20,\"battery_low…
#> 9 2024-03-11 14:08:04 zigbee2mqtt/SwitchBadOben "{\"linkquality\":21,\"state\"…
#> 10 2024-03-11 14:08:22 zigbee2mqtt/MotionBadEG "{\"battery\":60,\"battery_low…
#> # ℹ 24,990 more rows
bind_cols(select(data, -payload),
data$payload |>
paste0(collapse = "\n") |>
textConnection() |>
jsonlite::stream_in() |>
unnest_wider(update, names_sep = ".")
)
#> Found 500 records... Found 1000 records... Found 1500 records... Found 2000 records... Found 2500 records... Found 3000 records... Found 3500 records... Found 4000 records... Found 4500 records... Found 5000 records... Found 5500 records... Found 6000 records... Found 6500 records... Found 7000 records... Found 7500 records... Found 8000 records... Found 8500 records... Found 9000 records... Found 9500 records... Found 10000 records... Found 10500 records... Found 11000 records... Found 11500 records... Found 12000 records... Found 12500 records... Found 13000 records... Found 13500 records... Found 14000 records... Found 14500 records... Found 15000 records... Found 15500 records... Found 16000 records... Found 16500 records... Found 17000 records... Found 17500 records... Found 18000 records... Found 18500 records... Found 19000 records... Found 19500 records... Found 20000 records... Found 20500 records... Found 21000 records... Found 21500 records... Found 22000 records... Found 22500 records... Found 23000 records... Found 23500 records... Found 24000 records... Found 24500 records... Found 25000 records... Imported 25000 records. Simplifying...
#> # A tibble: 25,000 × 18
#> ts topic battery humidity linkquality temperature voltage
#> <dttm> <chr> <int> <dbl> <int> <dbl> <int>
#> 1 2024-03-11 14:08:01 zigbee2… 17 50.1 105 17.2 2500
#> 2 2024-03-11 14:08:04 zigbee2… 20 NA 69 17 2600
#> 3 2024-03-11 14:08:04 zigbee2… NA NA 21 NA NA
#> 4 2024-03-11 14:08:22 zigbee2… 60 NA 21 17.1 2800
#> 5 2024-03-11 14:08:22 zigbee2… NA NA 21 NA NA
#> 6 2024-03-11 14:08:44 zigbee2… 14 54.9 21 14.4 2500
#> 7 2024-03-11 14:08:01 zigbee2… 17 50.1 105 17.2 2500
#> 8 2024-03-11 14:08:04 zigbee2… 20 NA 69 17 2600
#> 9 2024-03-11 14:08:04 zigbee2… NA NA 21 NA NA
#> 10 2024-03-11 14:08:22 zigbee2… 60 NA 21 17.1 2800
#> # ℹ 24,990 more rows
#> # ℹ 11 more variables: battery_low <lgl>, illuminance <int>,
#> # illuminance_lux <int>, led_control <chr>, occupancy <lgl>,
#> # occupancy_timeout <int>, tamper <lgl>, update.installed_version <int>,
#> # update.latest_version <int>, update.state <chr>, state <chr>
tictoc::toc()
#> 5.88 sec elapsed
创建于 2024-03-16,使用 reprex v2.1.0