Kafka流-KSQL-拆分消息

问题描述 投票:0回答:2

是否有一种方法可以使用KSQL将消息拆分为多条消息并发布到新主题。需要明确的是,我不是在寻找基于Java的侦听器,而是将其迭代/流化为一个新主题。我需要写一个为我做的KSQL。

例如:

我需要invoice主题中的消息分成item_inventory_delta消息

发票主题

key:saleschecknumber

消息示例:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}

item_inventory_delta主题

:itemID

消息示例

1。

{
    "itemId": 123,
    "quantity": 1
}

2。

{
    "itemId": 345,
    "quantity": 5
}
apache-kafka apache-kafka-streams ksql
2个回答
1
投票

有许多方法可以使我理解,这与我们如何处理传入消息而不汇总消息有关。使用Kafka流处理器API的简单方法,可让您自定义处理逻辑。

Kafka Stream Processor API

Processor API允许开发人员定义和连接自定义处理器并与状态存储进行交互。使用Processor API,您可以定义处理一个接收到的任意流处理器一次记录,并将这些处理器与其关联的状态存储以构成代表一个定制处理逻辑

[[Note:您尚未定义将输出值的内容,因此我只是发布键和值相同,但是您可以选择定义输出键和值]

您可以如下定义Kafka流处理器API

Topology builder = new Topology();
builder.addSource("Source", "invoice")
                .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                        "sourceProcessor")

下面是自定义处理器方法,请注意其公正方法尚未完全实施

class InvoiceProcessor implements Processor<String, String> {
        private Gson gson = new Gson();

        //constructor
        .......
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;

        }

        @Override
        public void close() {
            // Any code for clean up would go here. This processor instance will not be used
            // again after this call.
        }

        @Override
        public void process(String key, String value) {
            try {

                //Create custom inventory to map JSON object  
                //List[Item] items is member object of Inventory class
                Inventory inventory = gson.fromJson(key, Inventory.class);


                //itertae item of items List[Items]
                for(Item item: inventory.getItems()){
                context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));

                }
                //


                }


        }

    }  

0
投票

对于KStream应用程序,您可以使用flatMap,它接受接受一条记录并返回零个或多个记录的可迭代项的功能:

case class Record(total: Double, salecounter: Int, items: List[Item])
case class Item(itemId: Int, quantity: Int)

// initialize the stream 
val inputStream: KStream[String, Record] = ??? 

// split the message
inputStream.flatMap { case (key, record) => 
  record.items.map(item => (key, item) )
}
© www.soinside.com 2019 - 2024. All rights reserved.