Flink流媒体示例,生成自己的数据。

问题描述 投票:0回答:1

之前我问过一个简单的 你好世界范例 为Flink。这给我提供了一些很好的例子!但是我想问一下更 "流 "的例子,即我们每秒钟产生一个输入值。

然而,我想问一个更 "流 "的例子,我们每秒钟产生一个输入值。最好是随机的,但即使每次都是相同的值也可以。

我们的目标是得到一个流,在最小的外部触摸下 "移动"。

因此我的问题是:如何显示Flink实际的数据流?

如何在没有外部依赖的情况下 让Flink显示出实际的数据流?

我发现如何通过在外部生成数据并写入Kafka,或者监听一个公共源来显示这个问题,然而我试图用最小的依赖性来解决这个问题(比如从Nifi中的GenerateFlowFile开始)。

apache-flink flink-streaming
1个回答
2
投票

这里有一个例子。这是一个如何使你的源和汇可插拔的例子。我们的想法是,在开发过程中,你可能会使用一个随机的源并打印结果,在测试中,你可能会使用一个硬连接的输入事件列表,并在列表中收集结果,而在生产中,你会使用真正的源和汇。

这就是工作。

/*
 * Example showing how to make sources and sinks pluggable in your application code so
 * you can inject special test sources and test sinks in your tests.
 */

public class TestableStreamingJob {
    private SourceFunction<Long> source;
    private SinkFunction<Long> sink;

    public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
        this.source = source;
        this.sink = sink;
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> LongStream =
                env.addSource(source)
                        .returns(TypeInformation.of(Long.class));

        LongStream
                .map(new IncrementMapFunction())
                .addSink(sink);

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
        job.execute();
    }

    // While it's tempting for something this simple, avoid using anonymous classes or lambdas
    // for any business logic you might want to unit test.
    public class IncrementMapFunction implements MapFunction<Long, Long> {

        @Override
        public Long map(Long record) throws Exception {
            return record + 1 ;
        }
    }

}

这里是... RandomLongSource:

public class RandomLongSource extends RichParallelSourceFunction<Long> {

    private volatile boolean cancelled = false;
    private Random random;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        random = new Random();
    }

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (!cancelled) {
            Long nextLong = random.nextLong();
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(nextLong);
            }
        }
    }

    @Override
    public void cancel() {
        cancelled = true;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.