可以使用Spring云流kafka-streams创建GlobalKTable的任何示例?

问题描述 投票:0回答:1

如果有多个GlobalKTable创建和连接的Spring云kafka流示例,请分享。

spring-cloud-stream spring-kafka
1个回答
2
投票

这来自Confluent GlobalKTables示例,GlobalKTables和KStreams之间有两个连接。注意第一个流是通过标准的“输入”...

@Component
public class TableStreamListener {

//    private final StreamsBuilder builder = new StreamsBuilder();

    @EnableBinding(DataGen.class)
    public class DataAnalyticsProcessorApplication {

        /**
         * DevNotes: compilation fails unless method returns a KStream
         *
         * @param ordersStream
         * @param customers
         * @param products
         * @return
         */
        @StreamListener
        @SendTo("output")
        public KStream<Object, EnrichedOrder> process(@Input("input") KStream<Object, Order> ordersStream,
                @Input("customers") GlobalKTable<Object, Customer> customers,
                @Input("products") GlobalKTable<Object, Product> products) {

            // Join the orders stream to the customer global table. As this is global table
            // we can use a non-key based join with out needing to repartition the input
            // stream
            KStream<Object, CustomerOrder> customerOrdersStream = ordersStream
                    // .peek((key, value) -> System.out.println("ordersStream -- key: " + key + " --
                    // value: " + value))
                    .join(customers, (key, value) -> value.getCustomerId(),
                            (order, customer) -> new CustomerOrder(customer, order));

            // Join the enriched customer order stream with the product global table. As
            // this is global table
            // we can use a non-key based join without needing to repartition the input
            // stream
            KStream<Object, EnrichedOrder> enrichedOrdersStream = customerOrdersStream
                    // .peek((key, value) -> System.out.println("customerOrdersStream2 -- key: " +
                    // key + " -- value: " + value.toString()))
                    .join(products, (key, value) -> value.getOrder().getProductId(),
                            (customerOrder, product) -> new EnrichedOrder(product, customerOrder.getCustomer(),
                                    customerOrder.getOrder()));

            return enrichedOrdersStream;
        }

    }

    interface DataGen extends KafkaStreamsProcessor {

        @Input("customers")
        GlobalKTable<?, ?> customers();

        @Input("products")
        GlobalKTable<?, ?> products();

    }
}
© www.soinside.com 2019 - 2024. All rights reserved.