我已经为当前正在运行的一些辅助类编写了一些单元测试。该类负责加载 blob 存储中的一些文件,分割它们的内部行来读取它们并进行处理。
我的单元测试试图断言,给出一些文件列表,该进程正在执行预期的操作。 这在测试资源管理器中有效,在执行 pytest 时也在控制台中有效,并且覆盖率 100% 方法也有效。将更改提交到存储库时,我在 azure pipelines 进程内的构建中遇到错误,我尝试了我所知道的所有方法,但没有成功。
班级:
Hanimport logging
import re
from abc import abstractmethod
from common.blob_storage import BlobStorageHandler
class BlobProcessor:
def __init__(self, logger: logging.Logger) -> None:
self.logger = logger
async def process_files(
self,
blob_storage_handler: BlobStorageHandler,
dir_path: str,
filename_filter: str,
) -> None:
# List blobs in the directory
blobs = blob_storage_handler.container_client.list_blobs(
name_starts_with=dir_path
)
# Loop through matching blobs and read their content and publish a message for each line
for blob in blobs:
if re.search(filename_filter, blob.name.split("/")[-1]):
self.logger.info(f"{blob.name}: Processing file")
# Read file contents and split by newlines
blob_content_raw = blob_storage_handler.get_blob_data(blob.name)
blob_content_decoded = blob_content_raw.decode("utf-8-sig")
lines = [line.strip() for line in blob_content_decoded.split("\n") if line]
# Process and send lines
await self.process_lines(lines)
self.logger.info(f"{blob.name}: Successfully processed {len(lines)} messages")
@abstractmethod
async def process_lines(self, lines: list[str]) -> None:
pass
这是完成的单元测试:
from unittest import IsolatedAsyncioTestCase
from unittest.mock import Mock
from common.blob_storage import BlobStorageHandler
from common.blob_processor import BlobProcessor
class TestBlobProcessor(IsolatedAsyncioTestCase):
async def test_process_files_with_matching_blob(self):
# Arrange
mock_logger = Mock()
mock_blob_storage_handler = Mock(spec=BlobStorageHandler)
mock_blob_storage_handler.container_client = Mock()
mock_blob = Mock()
mock_blob.name = "test_file.txt"
mock_blob_storage_handler.container_client.list_blobs.return_value = [mock_blob]
mock_blob_storage_handler.get_blob_data.return_value = b"Line 1\nLine 2\nLine 3\n"
data_processor = BlobProcessor(mock_logger)
# Act
await data_processor.process_files(mock_blob_storage_handler, "test_dir", ".*\\.txt")
# Assert
mock_logger.info.assert_any_call("test_file.txt: Processing file")
mock_logger.info.assert_any_call("test_file.txt: Successfully processed 3 messages")
我在天蓝色管道上遇到的错误是这样的:
E TypeError: BlobProcessor.init() 缺少 3 个必需的位置参数:'blob_storage_handler'、'dir_path' 和 'filename_filter'
测试也正在完美调试,我正确地迭代了每个,并且读取了我放入测试参数中的确切行...
通过将原始分支重新设置为我当前的开发分支来解决问题,基本上,构建活动中的管道会阻止将来的合并。
因此,合并到原始分支是相对于发布的原始代码进行的。 在该代码中,另一位开发人员在数据处理器类中批准了带有 3 个附加参数的拉取请求,因此我的代码可以正常工作,但仅限于我的本地环境。
恢复:管道使用已发布代码的最新版本进行构建,如果您的分支已过时,可能会引发这样的问题。