我试图通过事件的一个属性对其进行分组,并随着时间的推移使用 KSQL窗口式聚合,特别是 会话窗口.
我有一个 STREAM
由Kafka主题与 TIMESTAMP
属性指定好。
当我尝试创建一个 STREAM
与会话窗口化的查询,如。
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
我总是得到这样的错误信息:
你的SELECT查询产生了一个TABLE。请使用CREATE TABLE AS SELECT语句代替。
是否可以创建一个 STREAM
与窗口式聚合?
当我尝试按照建议创建一个 TABLE
然后是 STREAM
包含了所有的会话启动事件,查询方式如下。
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
KSQL通知我:
KSQL不支持在窗口表上进行持久化查询。
如何创建一个 STREAM
的事件在KSQL中启动会话窗口?
你的创建流语句,如果换成创建表语句,会创建一个不断更新的表。沉没主题 SESSION_STREAM
将包含表的变化流,即它的changelog.ksqlDB将其建模为TABLE,因为它具有TABLE语义,即表中只能存在任何特定键的单行。
ksqlDB将其建模为一个TABLE,因为它具有TABLE语义,即表中只能存在一个具有任何特定键的单行。 然而,changelog将包含已经应用到表中的变化的string。
如果你想要的是一个包含所有会话的主题,那么像这样的东西将创建一个主题。
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
这将创建一个 SESSIONS
主题,其中包含对 SESSIONS
表:即它的变更日志。
如果你想把这个转换为会话启动事件的流,那么不幸的是,ksqlDB并没有 还 允许您直接从表中更改创建流,但您可以通过表的更改日志创建流。
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
注意,在即将到来的0. 10版本中,你将能够命名关键列在 SESSION_STREAM
正确。
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');