我正在尝试启动许多(外部)进程并捕获它们的输出。由于我的命令行超过 8191 个字符,我无法使用
popen
/pclose
,而是使用 CreateProcess
Win32 API 功能,如这篇 Win32 学习文章中所述。
现在,我的粗略轮廓如下所示:
<thread>
's)WaitForSingleObject
直到外部流程完成ReadFile
输出管道直至为空我正在观察以下情况:所有线程都启动其外部进程,但由于某种原因,所有线程都会阻塞
ReadFile
,直到每个线程都达到ReadFile
!
如果我让一些应该关闭的句柄打开等等,我会理解死锁,但我不明白为什么以及如何发生这种情况。
再说一次,如果如果我请求太多字节,ReadFile
会阻塞,我会理解死锁,但为什么当每个线程达到ReadFile
时它会解除阻塞?
为了演示这个问题,我创建了一个(几乎)最小的工作示例。 它有两个组件,一个模仿外部进程 (
sleepHelper.cpp
),另一个产生可变数量的线程:一个始终需要 5 seconds
才能运行,而所有其他线程都需要 500ms
。
这使我们能够观察行为:
.\MinimalExample.exe 3
Using thread count = 3
Thead 3: Waiting for object took 525ms.
Thead 2: Waiting for object took 525ms.
Thead 3: Reading output took 4489ms.
Thead 2: Reading output took 4489ms.
Thead 3: Executing process took 5022ms.
Thread 3 is exiting.
Thead 2: Executing process took 5022ms.
Thread 2 is exiting.
Thead 1: Waiting for object took 5015ms.
Thead 1: Reading output took 0ms.
Thead 1: Executing process took 5023ms.
Thread 1 is exiting.
Total runtime was 5025ms.
我的问题是:
ReadFile
会阻塞在这里?为了您的方便,您可以在这里找到 MWE。
SleepHelper(需要编译为
sleepHelper.exe
:
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
int main(int argc, char** argv) {
if (argc <= 1) {
std::cerr << "Error: Expected an argument." << std::endl;
return -1;
}
std::string const arg(argv[1]);
auto const time = std::stoull(arg);
std::cout << "Going to sleep for " << time << " ms." << std::endl;
auto const sleepStart = std::chrono::steady_clock::now();
const std::chrono::duration<std::size_t, std::milli> duration(time);
std::this_thread::sleep_for(duration);
auto const sleepEnd = std::chrono::steady_clock::now();
auto const timeSlept = std::chrono::duration_cast<std::chrono::microseconds>(sleepEnd - sleepStart).count();
std::cout << "Slept for " << timeSlept << "us." << std::endl;
return 0;
}
主要代码:
#include <array>
#include <chrono>
#include <filesystem>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <processthreadsapi.h>
static std::mutex mutex;
void PrintMessageLocked(std::string const& s) {
std::lock_guard<std::mutex> lock(mutex);
std::cout << s << std::endl;
}
void PrintTimeLocked(char const* what, std::size_t const& threadId, std::size_t const& usTaken) {
std::lock_guard<std::mutex> lock(mutex);
std::cout << "Thead " << threadId << ": " << what << " took " << usTaken << "ms." << std::endl;
}
std::tuple<int, std::string> ExecuteProcessAndCaptureOutput(std::string command, std::string workingDirectory, std::size_t const& myThreadId) {
/*
Why this whole mumbu-jumbo instead of a simple popen/pclose()?
Because the maximum length of command line arguments is 8191 characters on Windows, but almost all invocations are longer than that.
*/
HANDLE g_hChildStd_OUT_Rd = NULL;
HANDLE g_hChildStd_OUT_Wr = NULL;
SECURITY_ATTRIBUTES saAttr;
// Set the bInheritHandle flag so pipe handles are inherited.
saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);
saAttr.bInheritHandle = TRUE;
saAttr.lpSecurityDescriptor = NULL;
// Create a pipe for the child process's STDOUT.
DWORD const outputBufferSize = 1024 * 1024;
if (!CreatePipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr, outputBufferSize)) {
std::cerr << "Failed to create STDOUT pipe for child process, last error = " << GetLastError() << std::endl;
throw;
}
// Ensure the read handle to the pipe for STDOUT is not inherited.
if (!SetHandleInformation(g_hChildStd_OUT_Rd, HANDLE_FLAG_INHERIT, 0)) {
std::cerr << "Failed to set handle information for child process, last error = " << GetLastError() << std::endl;
throw;
}
STARTUPINFOA startupInfo;
PROCESS_INFORMATION processInformation;
ZeroMemory(&startupInfo, sizeof(startupInfo));
startupInfo.cb = sizeof(startupInfo);
ZeroMemory(&processInformation, sizeof(processInformation));
startupInfo.hStdError = g_hChildStd_OUT_Wr;
startupInfo.hStdOutput = g_hChildStd_OUT_Wr;
startupInfo.dwFlags |= STARTF_USESTDHANDLES;
BOOL bSuccess = CreateProcessA(NULL, command.data(), NULL, NULL, TRUE, 0, NULL, workingDirectory.data(), &startupInfo, &processInformation);
if (!bSuccess) {
std::cerr << "Failed to create process, last error = " << GetLastError() << std::endl;
throw;
}
// Close unnecessary pipes
CloseHandle(g_hChildStd_OUT_Wr);
// Wait until child process exits.
auto const waitStart = std::chrono::steady_clock::now();
DWORD const timeoutInMs = 30000; // Can be INFINITE
auto const waitResult = WaitForSingleObject(processInformation.hProcess, timeoutInMs);
if (waitResult == WAIT_TIMEOUT) {
std::cerr << "Internal Error: Execution took longer than " << timeoutInMs << "ms, abandoning!" << std::endl;
std::cerr << "Command was: '" << command << "'" << std::endl;
auto const terminationResult = TerminateProcess(processInformation.hProcess, -1);
if (terminationResult == 0) {
std::cerr << "Internal Error: Failed to terminate sub-process :(" << std::endl;
}
}
auto const waitEnd = std::chrono::steady_clock::now();
auto const waitTimeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(waitEnd - waitStart).count();
PrintTimeLocked("Waiting for object", myThreadId, waitTimeTaken);
DWORD exitCode = 0;
bSuccess = GetExitCodeProcess(processInformation.hProcess, &exitCode);
if (bSuccess == 0) {
std::cerr << "Internal Error: Failed to get exit code from process!" << std::endl;
throw;
}
else if (exitCode == STILL_ACTIVE) {
std::cerr << "Internal Error: Failed to get exit code from process, it is still running!" << std::endl;
throw;
}
// Close process and thread handles.
CloseHandle(processInformation.hProcess);
CloseHandle(processInformation.hThread);
// Collect output from process
auto const outputStart = std::chrono::steady_clock::now();
constexpr DWORD BUFFER_SIZE = 4096;
std::array<char, BUFFER_SIZE> buffer;
std::string resultString;
DWORD dwRead = 0;
try {
do {
bSuccess = ReadFile(g_hChildStd_OUT_Rd, buffer.data(), BUFFER_SIZE, &dwRead, NULL);
if (!bSuccess || dwRead == 0) break;
resultString.append(buffer.data(), dwRead);
} while (true);
}
catch (...) {
std::cerr << "Internal Error: Encountered an exception while reading process output!" << std::endl;
throw;
}
auto const outputEnd = std::chrono::steady_clock::now();
auto const outputTimeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(outputEnd - outputStart).count();
PrintTimeLocked("Reading output", myThreadId, outputTimeTaken);
// And close the handle to reading STDOUT
CloseHandle(g_hChildStd_OUT_Rd);
return std::make_tuple(static_cast<int>(exitCode), resultString);
}
void ThreadFunction(std::vector<std::size_t> const& times, std::size_t const myThreadId) {
std::size_t const maxTimes = times.size();
static std::atomic<std::size_t> workCounter = 0;
while (true) {
std::size_t const currentTimeIndex = workCounter++;
if (currentTimeIndex >= maxTimes) {
break;
}
auto const& myTime = times.at(currentTimeIndex);
auto const procStart = std::chrono::steady_clock::now();
auto const [res, output] = ExecuteProcessAndCaptureOutput(".\\sleepHelper.exe " + std::to_string(myTime), std::filesystem::current_path().string(), myThreadId);
auto const procEnd = std::chrono::steady_clock::now();
auto const procTimeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(procEnd - procStart).count();
PrintTimeLocked("Executing process", myThreadId, procTimeTaken);
}
{
std::lock_guard<std::mutex> lock(mutex);
std::cout << "Thread " << myThreadId << " is exiting." << std::endl;
}
}
int main(int argc, char** argv) {
std::size_t threadCount = 3;
if (argc > 1) {
threadCount = std::stoull(argv[1]);
}
if (threadCount < 1 || threadCount > 64) {
std::cerr << "Thread count out of bounds. Use 1 <= thread count <= 64." << std::endl;
return -1;
}
std::cout << "Using thread count = " << threadCount << std::endl;
std::vector<std::size_t> times = {
5000,
};
for (std::size_t i = 1; i < threadCount; ++i) {
times.push_back(500);
}
auto timeStart = std::chrono::steady_clock::now();
std::vector<std::thread> threads;
for (std::size_t i = 0; i < threadCount; ++i) {
threads.emplace_back(&ThreadFunction, times, i + 1);
}
for (auto& t : threads) {
t.join();
}
auto timeEnd = std::chrono::steady_clock::now();
std::cout << "Total runtime was " << std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd - timeStart).count() << "ms." << std::endl;
return 0;
}
备注:
ReadFile
执行一次成功读取,读取 49 字节,然后挂起,最后以读取的零字节返回。好吧,并不是您的线程“到达”了 ReadFile 调用,而是最后一个子进程退出了。
IMO,这就是为什么它很重要:当您阻止继承管道的读取句柄时,写入句柄会被继承(当然,孩子应该写入它) - 不幸的是,最后一个运行的孩子继承了所有写入句柄,使它们保持打开状态阻塞 ReadFile 直到它退出。
因此,我们需要一种有选择地继承句柄的方法:每个子进程应该只知道它“自己的”管道末端。
这里有一个相当老的问题,应该是相关的: 选择子进程继承哪些句柄这个问题链接到更旧的Raymond Chen 的文章。为了简单起见,我参考它。
出于演示目的,您可以包含他的
CreateProcessWithExplicitHandles
函数并将对 CreateProcessA 的调用替换为:
BOOL bSuccess = CreateProcessWithExplicitHandles(NULL, command.data(), NULL, NULL, TRUE, 0, NULL, workingDirectory.data(), &startupInfo, &processInformation,
1, &g_hChildStd_OUT_Wr);
最后两个参数指定有一个要继承的句柄(使用数组作为练习留给读者;-)) - 这是线程本地管道写入句柄。
现在,至少对我来说,你的最小示例按预期工作。
作为参考,我将包含
CreateProcessWithExplicitHandles
的来源:
BOOL CreateProcessWithExplicitHandles(
__in_opt LPCTSTR lpApplicationName,
__inout_opt LPTSTR lpCommandLine,
__in_opt LPSECURITY_ATTRIBUTES lpProcessAttributes,
__in_opt LPSECURITY_ATTRIBUTES lpThreadAttributes,
__in BOOL bInheritHandles,
__in DWORD dwCreationFlags,
__in_opt LPVOID lpEnvironment,
__in_opt LPCTSTR lpCurrentDirectory,
__in LPSTARTUPINFO lpStartupInfo,
__out LPPROCESS_INFORMATION lpProcessInformation,
// here is the new stuff
__in DWORD cHandlesToInherit,
__in_ecount(cHandlesToInherit) HANDLE *rgHandlesToInherit)
{
BOOL fSuccess;
BOOL fInitialized = FALSE;
SIZE_T size = 0;
LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList = NULL;
fSuccess = cHandlesToInherit < 0xFFFFFFFF / sizeof(HANDLE) &&
lpStartupInfo->cb == sizeof(*lpStartupInfo);
if (!fSuccess) {
SetLastError(ERROR_INVALID_PARAMETER);
}
if (fSuccess) {
fSuccess = InitializeProcThreadAttributeList(NULL, 1, 0, &size) ||
GetLastError() == ERROR_INSUFFICIENT_BUFFER;
}
if (fSuccess) {
lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>
(HeapAlloc(GetProcessHeap(), 0, size));
fSuccess = lpAttributeList != NULL;
}
if (fSuccess) {
fSuccess = InitializeProcThreadAttributeList(lpAttributeList,
1, 0, &size);
}
if (fSuccess) {
fInitialized = TRUE;
fSuccess = UpdateProcThreadAttribute(lpAttributeList,
0, PROC_THREAD_ATTRIBUTE_HANDLE_LIST,
rgHandlesToInherit,
cHandlesToInherit * sizeof(HANDLE), NULL, NULL);
}
if (fSuccess) {
STARTUPINFOEX info;
ZeroMemory(&info, sizeof(info));
info.StartupInfo = *lpStartupInfo;
info.StartupInfo.cb = sizeof(info);
info.lpAttributeList = lpAttributeList;
fSuccess = CreateProcess(lpApplicationName,
lpCommandLine,
lpProcessAttributes,
lpThreadAttributes,
bInheritHandles,
dwCreationFlags | EXTENDED_STARTUPINFO_PRESENT,
lpEnvironment,
lpCurrentDirectory,
&info.StartupInfo,
lpProcessInformation);
}
if (fInitialized) DeleteProcThreadAttributeList(lpAttributeList);
if (lpAttributeList) HeapFree(GetProcessHeap(), 0, lpAttributeList);
return fSuccess;
}