Apache Beam:使用MongoDbIO.read()刷新我从MongoDB读取的一个侧输入。

问题描述 投票:0回答:2

我不知道这个GenerateSequence是如何工作的,因为我必须每小时或每天定期从Mongo中读取值,我创建了一个读取MongoDB的ParDo,还在GlobalWindows中添加了一个触发器的窗口(触发器我会根据需求更新)。但下面的代码片段给出返回类型错误,所以你能不能帮我纠正下面的代码行?同时找到错误的快照。另外,这个Generate Sequence对我的情况有什么帮助?

enter image description here

PCollectionView<List<String>> list_of_vins = pipeline
                  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
                  .apply(ParDo.of(new DoFn<Long, List<String>>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      // Read entire DB, and output as a List<String>
                        final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
                        MongoClient mongoClient = MongoClients.create(uriString);
                        MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
                        MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
                        c.output((List<String>) ((View) mongoCollection).asList());
                    }
                  })
                  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));
mongodb apache-beam dataflow apache-beam-io apache-beam-pipeline
2个回答
0
投票

你需要像这样在window变换上指定类型。

.apply(Window.<List<String>>into(...));

0
投票

@danielm and all,

我已经更新了我的代码,似乎它的工作,但几个问题,需要澄清,继续与此。

PCollection<String> list_of_vins_1 = pipeline
            // Generate a tick every 15 seconds
            .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
            // Just to check if individual ticks are being generated once every day
            .apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
                    @ProcessElement
                    public void processElement(@Element Long tick, OutputReceiver<Document> out) {
                            // reading values from Mongo DB
                            out.output(mongoDocuments);
                        }
                    }
                }
            )).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
            .apply(ParDo.of(new ConvertDocuemntToStringFn()));

// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());

我能够从mongo db读取值,每一个Ticker Duration提到,但我不确定这将增加我的sideinput大小。就像我把这个list_of_data_1作为一个sideinput,在管道中,它显示元素的数量增加。

假设mongo数据库有20000个集合,如果这个ticker每2分钟运行一次,那么添加的元素数量将是20000乘以ticker运行的次数,即20000+20000+20000+20000+......以此类推。

所以我的问题是 每次在Side inputs中添加元素或者Sideinput刷新的时候,Sideinput总是有20000个值或者MongoDB的什么值,它是追加还是覆盖?

© www.soinside.com 2019 - 2024. All rights reserved.