无法在pyflink中测试字数统计程序

问题描述 投票:0回答:0

我的核心逻辑如下:

from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import Types


    class WordCount(object):
        def __init__(self, env):
            self.env = env
            self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
            self.env.set_parallelism(1)
    
        def word_count(self):
            text = self.env.from_collection(["ABC is a good boy. ABC works for an XYZ org"])
    
            word_count = text.flat_map(lambda x: str(x).lower().split()) \
                .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
                .key_by(lambda i: i[0]) \
                .reduce(lambda i, j: (i[0], i[1] + j[1]))
    
            #word_count.print()
            self.env.execute("some-name")   

当我尝试通过类似下面的方式调用此方法时,它工作正常:

WordCount(StreamExecutionEnvironment.get_execution_environment()).word_count()  

但是我的测试用例失败了,因为我的

word_count
方法没有返回任何东西。

class TestWordCount(TestCase):

    def test_word_count(self):
        env = Mock()
        actual = WordCount(env).word_count()
        expected = """(a,1)
                        (an,1)
                        (is,1)
                        (abc,2)
                        (for,1)
                        (org,1)
                        (xyz,1)
                        (boy.,1)
                        (good,1)
                        (works,1)"""
        self.assertEqual(actual, expected)   

错误:

Assertion error:  
(a,1)
                        (an,1)
                        (is,1)
                        (abc,2)
                        (for,1)
                        (org,1)
                        (xyz,1)
                        (boy.,1)
                        (good,1)
                        (works,1) != None   

我在这里错过了什么?另外,测试 Pyflink 程序的最佳方法是什么?任何帮助深表感谢。谢谢!

apache-flink flink-streaming pyflink
© www.soinside.com 2019 - 2024. All rights reserved.