Kafka流如何将对象转换为两个对象,并且它们使用一个对象作为键,而另一个对象作为groupBy的值

问题描述 投票:0回答:1

正如标题中提到的那样,我遇到一个问题,即我收到一个完整的对象,我想将其拆分为两个对象,一个对象带有标头,其值类似于以下内容:

public class Pipeline {

    // @JsonIgnoreProperties({ "schema" })
    static public class Message {
        @JsonIgnore
        public Object schema;
        public initialPurchaseOrder payload;
    }

    static public class initialPurchaseOrder {
        public Timestamp CHANGED_TIMESTAMP;
        public String EBELN;
        public String AEDAT;
        public String WAERS;
        public String LASTCHANGEDATETIME;
        public String MANDT;
        public String LAND1;
        public String BATXT;
        public String EKOTX;
        public Double CALC_MENGE;
        public Double CALC_NETWR;
        public String EBELP;
        public String TXZ01;
        public String WGBEZ;
        public String MEINS;
        public String MENGE;
        public String NETWR;
        public String LOEKZ;
    }

    static public class orderHeader {
        public String EBELN;
        public String AEDAT;
        public String WAERS;
        public String EKOTX;
        public String BATXT;
        public String LAND1;
    }

    static public class orderItem {
        public String EBELP;
        public Double CALC_NETWR;
        public String TXZ01;
        public Double CALC_MENGE;
        public String MEINS;
        public String WGBEZ;
        public String LOEKZ;
    }

    static public class finalPurchaseOrder {
        public orderHeader header;
        public ArrayList<orderItem> items;
    }

    public static void main(final String[] args) {

        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        final Map<String, Object> serdeProps = new HashMap<>();

        final Serializer<Message> MessageSerializer = new JsonPOJOSerializer<>();
        serdeProps.put("JsonPOJOClass", Message.class);
        MessageSerializer.configure(serdeProps, false);

        final Deserializer<Message> MessageDeserializer = new JsonPOJODeserializer<>();
        serdeProps.put("JsonPOJOClass", Message.class);
        MessageDeserializer.configure(serdeProps, false);

        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<String> stringSerdes = Serdes.String();

        final Serde<Message> MessageSerde = Serdes.serdeFrom(MessageSerializer, MessageDeserializer);
        final KStream<String, Message> source = builder.stream("testTopic", Consumed.with(stringSerdes, MessageSerde));

        final KGroupedStream<String, initialPurchaseOrder> groupedStream = source.map((k, v) -> KeyValue.pair(v.payload.EBELN, v.payload))
                .peek((k, v) -> System.out.printf("After keying: "+ k + ", value: " + v.toString() +"\n"))
                .groupByKey();

        source.to("output1");

这里的问题是,到目前为止,我什至无法将initialPurchaseOrder对象拆分为orderHeader和orderItem。

此后,我应该能够同时映射两个对象并将它们分组。

java apache-kafka-streams
1个回答
0
投票

您已经有您的KStream。只需使用map()并将相应的构造函数添加到class orderHeaderclass orderItem

final KStream<String, initialPurchaseOrder> source =
    builder.stream("testTopic", Consumed.with(stringSerdes, MessageSerde))
    map((k, v) -> KeyValue.pair(v.payload.EBELN, v.payload));

KStream<orderHeader, orderItem> stream =
    source.map((k,v) -> (new orderHeader(v), new orderItem(v));

使用:

class orderHeader {
    orderHeader(initialPurchaseOrder ipo) {
      EBELN = iop.EBELN;
      AEDAT = iop.AEDAT;
      //...
    }
}

// similar for `orderItem`
© www.soinside.com 2019 - 2024. All rights reserved.