我的核心逻辑如下:
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import Types
class WordCount(object):
def __init__(self, env):
self.env = env
self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
self.env.set_parallelism(1)
def word_count(self):
text = self.env.from_collection(["ABC is a good boy. ABC works for an XYZ org"])
word_count = text.flat_map(lambda x: str(x).lower().split()) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
#word_count.print()
self.env.execute("some-name")
当我尝试通过类似下面的方式调用此方法时,它工作正常:
WordCount(StreamExecutionEnvironment.get_execution_environment()).word_count()
但是我的测试用例失败了,因为我的
word_count
方法没有返回任何东西。
class TestWordCount(TestCase):
def test_word_count(self):
env = Mock()
actual = WordCount(env).word_count()
expected = """(a,1)
(an,1)
(is,1)
(abc,2)
(for,1)
(org,1)
(xyz,1)
(boy.,1)
(good,1)
(works,1)"""
self.assertEqual(actual, expected)
错误:
Assertion error:
(a,1)
(an,1)
(is,1)
(abc,2)
(for,1)
(org,1)
(xyz,1)
(boy.,1)
(good,1)
(works,1) != None
我在这里错过了什么?另外,测试 Pyflink 程序的最佳方法是什么?任何帮助深表感谢。谢谢!