是否有一种方法可以使用KSQL将消息拆分为多条消息并发布到新主题。需要明确的是,我不是在寻找基于Java的侦听器,而是将其迭代/流化为一个新主题。我需要写一个为我做的KSQL。
例如:
我需要invoice
主题中的消息分成item_inventory_delta
消息
key:saleschecknumber
消息示例:
{
"total": 12.33,
"salecounter": 1,
"items": [
{
"itemId": 123,
"quantity": 1
},
{
"itemId": 345,
"quantity": 5
}
]
}
键:itemID
消息示例
{
"itemId": 123,
"quantity": 1
}
2。
{
"itemId": 345,
"quantity": 5
}
有许多方法可以使我理解,这与我们如何处理传入消息而不汇总消息有关。使用Kafka流处理器API的简单方法,可让您自定义处理逻辑。
Processor API允许开发人员定义和连接自定义处理器并与状态存储进行交互。使用Processor API,您可以定义处理一个接收到的任意流处理器一次记录,并将这些处理器与其关联的状态存储以构成代表一个定制处理逻辑
[[Note:您尚未定义将输出值的内容,因此我只是发布键和值相同,但是您可以选择定义输出键和值]
您可以如下定义Kafka流处理器API
Topology builder = new Topology();
builder.addSource("Source", "invoice")
.addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
.addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
"sourceProcessor")
下面是自定义处理器方法,请注意其公正方法尚未完全实施
class InvoiceProcessor implements Processor<String, String> {
private Gson gson = new Gson();
//constructor
.......
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {
// Any code for clean up would go here. This processor instance will not be used
// again after this call.
}
@Override
public void process(String key, String value) {
try {
//Create custom inventory to map JSON object
//List[Item] items is member object of Inventory class
Inventory inventory = gson.fromJson(key, Inventory.class);
//itertae item of items List[Items]
for(Item item: inventory.getItems()){
context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
}
//
}
}
}
对于KStream
应用程序,您可以使用flatMap
,它接受接受一条记录并返回零个或多个记录的可迭代项的功能:
case class Record(total: Double, salecounter: Int, items: List[Item])
case class Item(itemId: Int, quantity: Int)
// initialize the stream
val inputStream: KStream[String, Record] = ???
// split the message
inputStream.flatMap { case (key, record) =>
record.items.map(item => (key, item) )
}