我试图按如下方式建模系统:
到达根据预定义的时间表生成,并且具有由数据帧提供的已知处理时间。在模拟开始时,有一个容量等于min_daemons的服务器。到目前为止很简单,但是nxt部分变得棘手:根据以下算法,这个容量在整个模拟中的间隔[min_daemons,max_daemons]会有所不同:
如果在模拟期间的任何时间,队列长度达到或超过incr_count的值,并且对于incr_delay保持等于或高于此级别,则将额外的容量单位添加到主服务器。这可以在任何时候发生,前提是容量不会超过max_daemons。
反之亦然。如果在任何时候,队列长度小于decr_count,并且对于decr_delay保持等于或低于此级别,则移除容量单位,可能降低到min_daemons的级别。
我创建了一个轨迹,用于在满足上述服务器容量更改条件时分支出来的所有到达者。这个问题是容量的变化总是与到达事件有关。我真正想要的是一个独立于到达轨迹的过程,它始终监视队列长度并进行适当的容量变化。
我考虑通过某种虚拟到达过程来替代地实现这一点,比如模拟的每一秒,但我不确定是否可以防止虚拟到达与服务器容量的真实到来竞争。
#instantiate simulation environment
env <- simmer("queues") %>% add_resource("daemon",1) %>% add_global("incr_start",99999) %>% add_global("decr_start",99999)
run <- trajectory() %>%
branch(option = function() if (get_queue_count(env,"daemon") >= incr_count) {1}
else if (get_queue_count(env,"daemon") <= decr_count) {2}
else {3}
,
continue = c(T, T, T)
,
trajectory("increment?")
%>% branch(option = function() if (now(env) - get_global(env,"incr_start") >= incr_delay
& get_capacity(env,"daemon") < max_daemons) {1}
else if (get_global(env,"incr_start")==99999) {2}
else {3}
,
continue = c(T, T, T)
,
trajectory("increment")
%>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
%>% log_(function ()
{paste("Queue has exceeded count for ",now(env)-get_global(env,"incr_start")," seconds.")})
%>% set_capacity(resource = "daemon", value = 1, mod="+")
,
trajectory("update incr start")
%>% set_global("incr_start",now(env))
%>% log_("Queue count is now above increment count. Starting increment timer.")
,
trajectory("do nothing")
%>% log_("Did not meet increment criteria. Doing nothing.")
)
,
trajectory("decrement?")
%>% branch(option = function() if (now(env) - get_global(env,"decr_start") >= decr_delay
& get_capacity(env,"daemon") > min_daemons) {1}
else if (get_global(env,"decr_start")==99999) {2}
else {3}
,
continue = c(T, T, T)
,
trajectory("decrement")
%>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
%>% log_(function ()
{paste("Queue has been less than count for ",now(env)-get_global(env,"incr_start")," seconds.")})
%>% set_capacity(resource = "daemon", value = -1, mod="+")
,
trajectory("update decr start")
%>% set_global("decr_start",now(env))
%>% log_("Queue count is now below decrement count. Starting decrement timer.")
,
trajectory("do nothing")
%>% log_("Did not meet decrement criteria. Doing nothing.")
)
,
trajectory("reset_timer")
%>% log_("Did not meet criteria to increment or decrement. Resetting timers.")
%>% set_global("decr_start", values = 99999)
%>% set_global("decr_start", values = 99999)
) %>%
seize("daemon") %>%
log_("Now running") %>%
log_(function () {paste(get_queue_count(env,"daemon")," runs in the queue.")}) %>%
timeout_from_attribute("service") %>%
release("daemon") %>%
log_("Run complete")
env %>%
add_dataframe("run", run, arr,time="absolute") %>%
run(200)
我需要做一些调试来验证模拟是否按照我的意图运行,但我完全理解这个模型是错误的。我希望设计不会过多地损害它的有效性,但我也想得到关于如何设计更真实的东西的反馈。
定期检查状态会破坏具有离散事件的整个目的。我们这里有一个异步过程,因此建模的正确方法是使用信号。
我们需要问自己:队列什么时候可以......
seize()
活动。因此,在尝试抓住资源之前,我们需要检查入队的到达人数并采取相应行动:
如果它等于decr_count
,则必须发送信号以取消任何降低服务器容量的尝试。
如果它等于incr_count - 1
,则必须发送信号以请求增加服务器的容量。
别无他法。seize()
之后的下一个活动。所以在扣押资源后,我们还需要检查入队的到达人数:
如果它等于incr_count - 1
,则必须发送信号以取消任何增加服务器容量的尝试。
如果它等于decr_count
,则必须发送信号以请求降低服务器的容量。
别无他法。检查入队到达次数和决定需要什么类型信号的程序(如果有的话)可以捆绑在一个可重复使用的功能中(我们称之为check_queue
),如下所示:
library(simmer)
env <- simmer()
check_queue <- function(.trj, resource, mod, lim_queue, lim_server) {
.trj %>% branch(
function() {
if (get_queue_count(env, resource) == lim_queue[1])
return(1)
if (get_queue_count(env, resource) == lim_queue[2] &&
get_capacity(env, resource) != lim_server)
return(2)
0 # pass
},
continue = c(TRUE, TRUE),
trajectory() %>% send(paste("cancel", mod[1])),
trajectory() %>% send(mod[2])
)
}
main <- trajectory() %>%
check_queue("resource", c("-", "+"), c(decr_count, incr_count-1), max_daemons) %>%
seize("resource") %>%
check_queue("resource", c("+", "-"), c(incr_count-1, decr_count), min_daemons) %>%
timeout_from_attribute("service") %>%
release("resource")
这样,主轨迹非常简单。然后,我们需要几个进程来接收这样的信号,并在一段延迟后增加/减少容量:
change_capacity <- function(resource, mod, delay, limit) {
trajectory() %>%
untrap(paste("cancel", mod)) %>%
trap(mod) %>%
wait() %>%
# signal received
untrap(mod) %>%
trap(paste("cancel", mod),
handler = trajectory() %>%
# cancelled! start from the beginning
rollback(Inf)) %>%
timeout(delay) %>%
set_capacity(resource, as.numeric(paste0(mod, 1)), mod="+") %>%
# do we need to keep changing the capacity?
rollback(2, check=function() get_capacity(env, resource) != limit) %>%
# start from the beginning
rollback(Inf)
}
incr_capacity <- change_capacity("resource", "+", incr_delay, max_daemons)
decr_capacity <- change_capacity("resource", "-", decr_delay, min_daemons)
最后,我们将资源,流程和数据添加到模拟环境中:
env %>%
add_resource("resource", min_daemons) %>%
add_generator("incr", incr_capacity, at(0)) %>%
add_generator("decr", decr_capacity, at(0)) %>%
add_dataframe("arrival", main, data)
请注意,我没有检查此代码。它可能需要一些调整,但一般的想法是存在的。