我有
Processor
课:
class Processor:
def __init__(self, session_provider):
self.session_provider = session_provider
def run(self):
...
self.session_provider.get_session().stop()
我不希望会话在
run
方法结束时关闭,但稍后关闭。我试着用patch
暂时改变它:
import unittest
from unittest.mock import Mock, MagicMock, patch
from src.components.processor import Processor
class TestProcessor(unittest.TestCase):
def test_run(self):
...
with('src.components.processor.self.session_provider.get_session.stop', new_callable=Mock()):
processor.run()
但是我收到错误:
AttributeError: module 'src.components.processor' has no attribute 'self'
我该如何解决这个问题?
我发现我需要修补结束调用,而不是嵌套路径:
with patch('pyspark.sql.SparkSession.stop', new_callable = Mock()):
...
processor.run()