我不知道这个GenerateSequence是如何工作的,因为我必须每小时或每天定期从Mongo中读取值,我创建了一个读取MongoDB的ParDo,还在GlobalWindows中添加了一个触发器的窗口(触发器我会根据需求更新)。但下面的代码片段给出返回类型错误,所以你能不能帮我纠正下面的代码行?同时找到错误的快照。另外,这个Generate Sequence对我的情况有什么帮助?
PCollectionView<List<String>> list_of_vins = pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
.apply(ParDo.of(new DoFn<Long, List<String>>() {
@ProcessElement
public void process(ProcessContext c) {
// Read entire DB, and output as a List<String>
final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
MongoClient mongoClient = MongoClients.create(uriString);
MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
c.output((List<String>) ((View) mongoCollection).asList());
}
})
.apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));
你需要像这样在window变换上指定类型。
.apply(Window.<List<String>>into(...));
@danielm and all,
我已经更新了我的代码,似乎它的工作,但几个问题,需要澄清,继续与此。
PCollection<String> list_of_vins_1 = pipeline
// Generate a tick every 15 seconds
.apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
// Just to check if individual ticks are being generated once every day
.apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
@ProcessElement
public void processElement(@Element Long tick, OutputReceiver<Document> out) {
// reading values from Mongo DB
out.output(mongoDocuments);
}
}
}
)).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
.apply(ParDo.of(new ConvertDocuemntToStringFn()));
// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());
我能够从mongo db读取值,每一个Ticker Duration提到,但我不确定这将增加我的sideinput大小。就像我把这个list_of_data_1作为一个sideinput,在管道中,它显示元素的数量增加。
假设mongo数据库有20000个集合,如果这个ticker每2分钟运行一次,那么添加的元素数量将是20000乘以ticker运行的次数,即20000+20000+20000+20000+......以此类推。
所以我的问题是 每次在Side inputs中添加元素或者Sideinput刷新的时候,Sideinput总是有20000个值或者MongoDB的什么值,它是追加还是覆盖?