Bokeh - 更新数据库更新图表

问题描述 投票:0回答:1

编辑:我在更新函数中使用ColumnDataSource.data修复了TypeError,现在问题是这是解决问题的正确方法。

我想显示一个包含来自数据库的信息的仪表板,我是Bokeh的新手,但我知道我需要运行一个Bokeh服务器应用程序。我的想法是在一个单独的线程中更新ColumnDataSource,但我不确定它是否会起作用。 这是一个测试,我一直试图看看我是否可以得到更新的情节,但我得到TypeError: 'ColumnDataSource' object is not iterable

from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, curdoc
import time
from threading import Thread
from random import randint
from tornado import gen

from functools import partial

data = {'x': [1,2,3,4], 'y': [10, 20, 30, 40]}

source = ColumnDataSource(data)
p = figure(plot_width = 700, plot_height = 700, 
            title = 'Histogram of Arrival Delays by Carrier',
            x_axis_label = 'Delay (min)', y_axis_label = 'Proportion')

doc = curdoc()

@gen.coroutine
def update(x, y):
    source.data.update(ColumnDataSource({'x': x, 'y':y}).data)

def blocking_task():
    while True:
        # do some blocking computation
        time.sleep(5)
        x = [ randint(0,10) for _ in range(4)]
        y = [ randint(20, 80) for _ in range(4)]
        # but update the document from callback
        doc.add_next_tick_callback(partial(update, x=x, y=y))

p.hbar(y='x', height=0.2, right='y', source=source)
doc.add_root(p)
thread = Thread(target=blocking_task)
thread.start()

我不确定这是正确的方法,所以任何帮助将不胜感激。

python bokeh
1个回答
1
投票

是的,只要您需要线程,这是正确的方法。一个小的修正:没有必要在每次更新时创建一个新的ColumnDataSource。您可以将新数据字典分配给现有的source.data。请参阅下面的略微修改的代码(适用于Bokeh v1.0.4)。

from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, curdoc
from functools import partial
from threading import Thread
from random import randint
from tornado import gen
import time

data = {'x': [1, 2, 3, 4], 'y': [10, 20, 30, 40]}
source = ColumnDataSource(data)
p = figure(plot_width = 700, plot_height = 700,
            title = 'Histogram of Arrival Delays by Carrier',
            x_axis_label = 'Delay (min)', y_axis_label = 'Proportion')

doc = curdoc()

@gen.coroutine
def update(x, y):
    source.data = {'x': x, 'y': y}

def blocking_task():
    while True:
        # do some blocking computation
        time.sleep(5)
        x = [ randint(0, 10) for _ in range(4)]
        y = [ randint(20, 80) for _ in range(4)]
        # but update the document from callback
        doc.add_next_tick_callback(partial(update, x = x, y = y))

p.hbar(y = 'x', height = 0.2, right = 'y', source = source)
doc.add_root(p)
thread = Thread(target = blocking_task)
thread.start()

如果您不需要线程,则更容易使用基于定期回调的方法:

from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, curdoc
from random import randint
import time

data = {'x': [1, 2, 3, 4], 'y': [10, 20, 30, 40]}
source = ColumnDataSource(data)
p = figure(plot_width = 700, plot_height = 700,
            title = 'Histogram of Arrival Delays by Carrier',
            x_axis_label = 'Delay (min)', y_axis_label = 'Proportion')
p.hbar(y = 'x', height = 0.2, right = 'y', source = source)

def update():
    x = [ randint(0, 10) for _ in range(4)]
    y = [ randint(20, 80) for _ in range(4)]
    source.data = {'x': x, 'y': y}

curdoc().add_periodic_callback(update, 5000)
curdoc().add_root(p)

您使用以下命令运行代码:

bokeh serve --show app.py

结果:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.