KSQL窗口式聚合流

问题描述 投票:0回答:1

我试图通过事件的一个属性对其进行分组,并随着时间的推移使用 KSQL窗口式聚合,特别是 会话窗口.

我有一个 STREAM 由Kafka主题与 TIMESTAMP 属性指定好。

当我尝试创建一个 STREAM 与会话窗口化的查询,如。

CREATE STREAM SESSION_STREAM AS
SELECT ...
  FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
   GROUP BY ...;

我总是得到这样的错误信息:

你的SELECT查询产生了一个TABLE。请使用CREATE TABLE AS SELECT语句代替。

是否可以创建一个 STREAM 与窗口式聚合?


当我尝试按照建议创建一个 TABLE 然后是 STREAM 包含了所有的会话启动事件,查询方式如下。

CREATE STREAM SESSION_START_STREAM AS
SELECT *
  FROM SESSION_TABLE
 WHERE WINDOWSTART=WINDOWEND;

KSQL通知我:

KSQL不支持在窗口表上进行持久化查询。

如何创建一个 STREAM 的事件在KSQL中启动会话窗口?

apache-kafka kafka-consumer-api apache-kafka-streams ksqldb
1个回答
3
投票

你的创建流语句,如果换成创建表语句,会创建一个不断更新的表。沉没主题 SESSION_STREAM 将包含表的变化流,即它的changelog.ksqlDB将其建模为TABLE,因为它具有TABLE语义,即表中只能存在任何特定键的单行。

ksqlDB将其建模为一个TABLE,因为它具有TABLE语义,即表中只能存在一个具有任何特定键的单行。 然而,changelog将包含已经应用到表中的变化的string。

如果你想要的是一个包含所有会话的主题,那么像这样的东西将创建一个主题。

-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT) 
    WITH (kafka_topic='data', value_format='json');

-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
  FROM DATA
WINDOW SESSION (5 SECONDS)
   GROUP BY USER_ID;

这将创建一个 SESSIONS 主题,其中包含对 SESSIONS 表:即它的变更日志。

如果你想把这个转换为会话启动事件的流,那么不幸的是,ksqlDB并没有 允许您直接从表中更改创建流,但您可以通过表的更改日志创建流。

-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS 
    SELECT * FROM SESSION_STREAM 
    WHERE WINDOWSTART = WINDOWEND;

注意,在即将到来的0. 10版本中,你将能够命名关键列在 SESSION_STREAM 正确。

CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
© www.soinside.com 2019 - 2024. All rights reserved.