正如标题中提到的那样,我遇到一个问题,即我收到一个完整的对象,我想将其拆分为两个对象,一个对象带有标头,其值类似于以下内容:
public class Pipeline {
// @JsonIgnoreProperties({ "schema" })
static public class Message {
@JsonIgnore
public Object schema;
public initialPurchaseOrder payload;
}
static public class initialPurchaseOrder {
public Timestamp CHANGED_TIMESTAMP;
public String EBELN;
public String AEDAT;
public String WAERS;
public String LASTCHANGEDATETIME;
public String MANDT;
public String LAND1;
public String BATXT;
public String EKOTX;
public Double CALC_MENGE;
public Double CALC_NETWR;
public String EBELP;
public String TXZ01;
public String WGBEZ;
public String MEINS;
public String MENGE;
public String NETWR;
public String LOEKZ;
}
static public class orderHeader {
public String EBELN;
public String AEDAT;
public String WAERS;
public String EKOTX;
public String BATXT;
public String LAND1;
}
static public class orderItem {
public String EBELP;
public Double CALC_NETWR;
public String TXZ01;
public Double CALC_MENGE;
public String MEINS;
public String WGBEZ;
public String LOEKZ;
}
static public class finalPurchaseOrder {
public orderHeader header;
public ArrayList<orderItem> items;
}
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Map<String, Object> serdeProps = new HashMap<>();
final Serializer<Message> MessageSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", Message.class);
MessageSerializer.configure(serdeProps, false);
final Deserializer<Message> MessageDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", Message.class);
MessageDeserializer.configure(serdeProps, false);
final StreamsBuilder builder = new StreamsBuilder();
final Serde<String> stringSerdes = Serdes.String();
final Serde<Message> MessageSerde = Serdes.serdeFrom(MessageSerializer, MessageDeserializer);
final KStream<String, Message> source = builder.stream("testTopic", Consumed.with(stringSerdes, MessageSerde));
final KGroupedStream<String, initialPurchaseOrder> groupedStream = source.map((k, v) -> KeyValue.pair(v.payload.EBELN, v.payload))
.peek((k, v) -> System.out.printf("After keying: "+ k + ", value: " + v.toString() +"\n"))
.groupByKey();
source.to("output1");
这里的问题是,到目前为止,我什至无法将initialPurchaseOrder对象拆分为orderHeader和orderItem。
此后,我应该能够同时映射两个对象并将它们分组。
您已经有您的KStream
。只需使用map()
并将相应的构造函数添加到class orderHeader
和class orderItem
:
final KStream<String, initialPurchaseOrder> source =
builder.stream("testTopic", Consumed.with(stringSerdes, MessageSerde))
map((k, v) -> KeyValue.pair(v.payload.EBELN, v.payload));
KStream<orderHeader, orderItem> stream =
source.map((k,v) -> (new orderHeader(v), new orderItem(v));
使用:
class orderHeader {
orderHeader(initialPurchaseOrder ipo) {
EBELN = iop.EBELN;
AEDAT = iop.AEDAT;
//...
}
}
// similar for `orderItem`