Win32 ReadFile 函数在不相关的线程上阻塞

问题描述 投票:0回答:1

我正在尝试启动许多(外部)进程并捕获它们的输出。由于我的命令行超过 8191 个字符,我无法使用

popen
/
pclose
,而是使用
CreateProcess
Win32 API 功能,如这篇 Win32 学习文章中所述。

现在,我的粗略轮廓如下所示:

  1. 产生 N 个线程(C++
    <thread>
    's)
  2. 每个线程都尝试获取一项工作,如果有,则使用给定的参数启动外部进程
  3. WaitForSingleObject
    直到外部流程完成
  4. ReadFile
    输出管道直至为空
  5. 关闭所有手柄
  6. 转到2),检查是否需要做更多工作。

我正在观察以下情况:所有线程都启动其外部进程,但由于某种原因,所有线程都会阻塞

ReadFile
,直到每个线程都达到
ReadFile
! 如果我让一些应该关闭的句柄打开等等,我会理解死锁,但我不明白为什么以及如何发生这种情况。 再说一次,如果如果我请求太多字节,
ReadFile
会阻塞,我会理解死锁,但为什么当每个线程达到
ReadFile
时它会解除阻塞?

为了演示这个问题,我创建了一个(几乎)最小的工作示例。 它有两个组件,一个模仿外部进程 (

sleepHelper.cpp
),另一个产生可变数量的线程:一个始终需要
5 seconds
才能运行,而所有其他线程都需要
500ms

这使我们能够观察行为:

.\MinimalExample.exe 3
Using thread count = 3
Thead 3: Waiting for object took 525ms.
Thead 2: Waiting for object took 525ms.
Thead 3: Reading output took 4489ms.
Thead 2: Reading output took 4489ms.
Thead 3: Executing process took 5022ms.
Thread 3 is exiting.
Thead 2: Executing process took 5022ms.
Thread 2 is exiting.
Thead 1: Waiting for object took 5015ms.
Thead 1: Reading output took 0ms.
Thead 1: Executing process took 5023ms.
Thread 1 is exiting.
Total runtime was 5025ms.

我的问题是:

  1. 为什么
    ReadFile
    会阻塞在这里?
  2. 为什么当所有线程都到达此点时才返回?

为了您的方便,您可以在这里找到 MWE

SleepHelper(需要编译为

sleepHelper.exe

#include <chrono>
#include <iostream>
#include <string>
#include <thread>

int main(int argc, char** argv) {
    if (argc <= 1) {
        std::cerr << "Error: Expected an argument." << std::endl;
        return -1;
    }
    std::string const arg(argv[1]);
    auto const time = std::stoull(arg);

    std::cout << "Going to sleep for " << time << " ms." << std::endl;

    auto const sleepStart = std::chrono::steady_clock::now();
    const std::chrono::duration<std::size_t, std::milli> duration(time);
    std::this_thread::sleep_for(duration);
    auto const sleepEnd = std::chrono::steady_clock::now();
    auto const timeSlept = std::chrono::duration_cast<std::chrono::microseconds>(sleepEnd - sleepStart).count();
    std::cout << "Slept for " << timeSlept << "us." << std::endl;

    return 0;
}

主要代码:

#include <array>
#include <chrono>
#include <filesystem>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <processthreadsapi.h>

static std::mutex mutex;

void PrintMessageLocked(std::string const& s) {
    std::lock_guard<std::mutex> lock(mutex);
    std::cout << s << std::endl;
}

void PrintTimeLocked(char const* what, std::size_t const& threadId, std::size_t const& usTaken) {
    std::lock_guard<std::mutex> lock(mutex);
    std::cout << "Thead " << threadId << ": " << what << " took " << usTaken << "ms." << std::endl;
}

std::tuple<int, std::string> ExecuteProcessAndCaptureOutput(std::string command, std::string workingDirectory, std::size_t const& myThreadId) {
    /*
        Why this whole mumbu-jumbo instead of a simple popen/pclose()?
        Because the maximum length of command line arguments is 8191 characters on Windows, but almost all invocations are longer than that.
    */
    HANDLE g_hChildStd_OUT_Rd = NULL;
    HANDLE g_hChildStd_OUT_Wr = NULL;

    SECURITY_ATTRIBUTES saAttr;

    // Set the bInheritHandle flag so pipe handles are inherited. 
    saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);
    saAttr.bInheritHandle = TRUE;
    saAttr.lpSecurityDescriptor = NULL;

    // Create a pipe for the child process's STDOUT. 
    DWORD const outputBufferSize = 1024 * 1024;
    if (!CreatePipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr, outputBufferSize)) {
        std::cerr << "Failed to create STDOUT pipe for child process, last error = " << GetLastError() << std::endl;
        throw;
    }

    // Ensure the read handle to the pipe for STDOUT is not inherited.
    if (!SetHandleInformation(g_hChildStd_OUT_Rd, HANDLE_FLAG_INHERIT, 0)) {
        std::cerr << "Failed to set handle information for child process, last error = " << GetLastError() << std::endl;
        throw;
    }

    STARTUPINFOA startupInfo;
    PROCESS_INFORMATION processInformation;
    ZeroMemory(&startupInfo, sizeof(startupInfo));
    startupInfo.cb = sizeof(startupInfo);
    ZeroMemory(&processInformation, sizeof(processInformation));

    startupInfo.hStdError = g_hChildStd_OUT_Wr;
    startupInfo.hStdOutput = g_hChildStd_OUT_Wr;
    startupInfo.dwFlags |= STARTF_USESTDHANDLES;

    BOOL bSuccess = CreateProcessA(NULL, command.data(), NULL, NULL, TRUE, 0, NULL, workingDirectory.data(), &startupInfo, &processInformation);
    if (!bSuccess) {
        std::cerr << "Failed to create process, last error = " << GetLastError() << std::endl;
        throw;
    }

    // Close unnecessary pipes
    CloseHandle(g_hChildStd_OUT_Wr);

    // Wait until child process exits.
    auto const waitStart = std::chrono::steady_clock::now();
    DWORD const timeoutInMs = 30000; // Can be INFINITE
    auto const waitResult = WaitForSingleObject(processInformation.hProcess, timeoutInMs);
    if (waitResult == WAIT_TIMEOUT) {
        std::cerr << "Internal Error: Execution took longer than " << timeoutInMs << "ms, abandoning!" << std::endl;

        std::cerr << "Command was: '" << command << "'" << std::endl;
        auto const terminationResult = TerminateProcess(processInformation.hProcess, -1);
        if (terminationResult == 0) {
            std::cerr << "Internal Error: Failed to terminate sub-process :(" << std::endl;
        }
    }
    auto const waitEnd = std::chrono::steady_clock::now();
    auto const waitTimeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(waitEnd - waitStart).count();
    PrintTimeLocked("Waiting for object", myThreadId, waitTimeTaken);

    DWORD exitCode = 0;
    bSuccess = GetExitCodeProcess(processInformation.hProcess, &exitCode);
    if (bSuccess == 0) {
        std::cerr << "Internal Error: Failed to get exit code from process!" << std::endl;
        throw;
    }
    else if (exitCode == STILL_ACTIVE) {
        std::cerr << "Internal Error: Failed to get exit code from process, it is still running!" << std::endl;
        throw;
    }

    // Close process and thread handles. 
    CloseHandle(processInformation.hProcess);
    CloseHandle(processInformation.hThread);

    // Collect output from process
    auto const outputStart = std::chrono::steady_clock::now();
    constexpr DWORD BUFFER_SIZE = 4096;
    std::array<char, BUFFER_SIZE> buffer;
    std::string resultString;
    DWORD dwRead = 0;
    try {
        do {
            bSuccess = ReadFile(g_hChildStd_OUT_Rd, buffer.data(), BUFFER_SIZE, &dwRead, NULL);
            if (!bSuccess || dwRead == 0) break;
            resultString.append(buffer.data(), dwRead);
        } while (true);
    }
    catch (...) {
        std::cerr << "Internal Error: Encountered an exception while reading process output!" << std::endl;
        throw;
    }
    auto const outputEnd = std::chrono::steady_clock::now();
    auto const outputTimeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(outputEnd - outputStart).count();
    PrintTimeLocked("Reading output", myThreadId, outputTimeTaken);

    // And close the handle to reading STDOUT
    CloseHandle(g_hChildStd_OUT_Rd);

    return std::make_tuple(static_cast<int>(exitCode), resultString);
}



void ThreadFunction(std::vector<std::size_t> const& times, std::size_t const myThreadId) {
    std::size_t const maxTimes = times.size();
    static std::atomic<std::size_t> workCounter = 0;
    while (true) {
        std::size_t const currentTimeIndex = workCounter++;
        if (currentTimeIndex >= maxTimes) {
            break;
        }
        auto const& myTime = times.at(currentTimeIndex);

        auto const procStart = std::chrono::steady_clock::now();
        auto const [res, output] = ExecuteProcessAndCaptureOutput(".\\sleepHelper.exe " + std::to_string(myTime), std::filesystem::current_path().string(), myThreadId);
        auto const procEnd = std::chrono::steady_clock::now();
        auto const procTimeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(procEnd - procStart).count();
        PrintTimeLocked("Executing process", myThreadId, procTimeTaken);
    }

    {
        std::lock_guard<std::mutex> lock(mutex);
        std::cout << "Thread " << myThreadId << " is exiting." << std::endl;
    }
}

int main(int argc, char** argv) {
    std::size_t threadCount = 3;
    if (argc > 1) {
        threadCount = std::stoull(argv[1]);
    }
    if (threadCount < 1 || threadCount > 64) {
        std::cerr << "Thread count out of bounds. Use 1 <= thread count <= 64." << std::endl;
        return -1;
    }
    std::cout << "Using thread count = " << threadCount << std::endl;



    std::vector<std::size_t> times = {
        5000,
    };
    for (std::size_t i = 1; i < threadCount; ++i) {
        times.push_back(500);
    }

    auto timeStart = std::chrono::steady_clock::now();
    std::vector<std::thread> threads;
    for (std::size_t i = 0; i < threadCount; ++i) {
        threads.emplace_back(&ThreadFunction, times, i + 1);
    }
    for (auto& t : threads) {
        t.join();
    }

    auto timeEnd = std::chrono::steady_clock::now();
    std::cout << "Total runtime was " << std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd - timeStart).count() << "ms." << std::endl;

    return 0;
}

备注:

  • 我知道我可以先查看管道,只读取可用的字节,但由于学习文章仅使用 ReadFile 并且 当所有线程到达该点时它才起作用,我很好奇为什么会这样。
  • 从技术上讲,所有线程都使用
    ReadFile
    执行一次成功读取,读取 49 字节,然后挂起,最后以读取的零字节返回。
c++ windows multithreading winapi
1个回答
0
投票

好吧,并不是您的线程“到达”了 ReadFile 调用,而是最后一个子进程退出了。

IMO,这就是为什么它很重要:当您阻止继承管道的读取句柄时,写入句柄会被继承(当然,孩子应该写入它) - 不幸的是,最后一个运行的孩子继承了所有写入句柄,使它们保持打开状态阻塞 ReadFile 直到它退出。

因此,我们需要一种有选择地继承句柄的方法:每个子进程应该只知道它“自己的”管道末端。

这里有一个相当老的问题,应该是相关的: 选择子进程继承哪些句柄这个问题链接到更旧的Raymond Chen 的文章。为了简单起见,我参考它。

出于演示目的,您可以包含他的

CreateProcessWithExplicitHandles
函数并将对 CreateProcessA 的调用替换为:

 BOOL bSuccess = CreateProcessWithExplicitHandles(NULL, command.data(), NULL, NULL, TRUE, 0, NULL, workingDirectory.data(), &startupInfo, &processInformation,
    1, &g_hChildStd_OUT_Wr);

最后两个参数指定有一个要继承的句柄(使用数组作为练习留给读者;-)) - 这是线程本地管道写入句柄。

现在,至少对我来说,你的最小示例按预期工作。

作为参考,我将包含

CreateProcessWithExplicitHandles
的来源:

BOOL CreateProcessWithExplicitHandles(
  __in_opt     LPCTSTR lpApplicationName,
  __inout_opt  LPTSTR lpCommandLine,
  __in_opt     LPSECURITY_ATTRIBUTES lpProcessAttributes,
  __in_opt     LPSECURITY_ATTRIBUTES lpThreadAttributes,
  __in         BOOL bInheritHandles,
  __in         DWORD dwCreationFlags,
  __in_opt     LPVOID lpEnvironment,
  __in_opt     LPCTSTR lpCurrentDirectory,
  __in         LPSTARTUPINFO lpStartupInfo,
  __out        LPPROCESS_INFORMATION lpProcessInformation,
    // here is the new stuff
  __in         DWORD cHandlesToInherit,
  __in_ecount(cHandlesToInherit) HANDLE *rgHandlesToInherit)
{
 BOOL fSuccess;
 BOOL fInitialized = FALSE;
 SIZE_T size = 0;
 LPPROC_THREAD_ATTRIBUTE_LIST lpAttributeList = NULL;
 fSuccess = cHandlesToInherit < 0xFFFFFFFF / sizeof(HANDLE) &&
            lpStartupInfo->cb == sizeof(*lpStartupInfo);
 if (!fSuccess) {
  SetLastError(ERROR_INVALID_PARAMETER);
 }
 if (fSuccess) {
  fSuccess = InitializeProcThreadAttributeList(NULL, 1, 0, &size) ||
             GetLastError() == ERROR_INSUFFICIENT_BUFFER;
 }
 if (fSuccess) {
  lpAttributeList = reinterpret_cast<LPPROC_THREAD_ATTRIBUTE_LIST>
                                (HeapAlloc(GetProcessHeap(), 0, size));
  fSuccess = lpAttributeList != NULL;
 }
 if (fSuccess) {
  fSuccess = InitializeProcThreadAttributeList(lpAttributeList,
                    1, 0, &size);
 }
 if (fSuccess) {
  fInitialized = TRUE;
  fSuccess = UpdateProcThreadAttribute(lpAttributeList,
                    0, PROC_THREAD_ATTRIBUTE_HANDLE_LIST,
                    rgHandlesToInherit,
                    cHandlesToInherit * sizeof(HANDLE), NULL, NULL);
 }
 if (fSuccess) {
  STARTUPINFOEX info;
  ZeroMemory(&info, sizeof(info));
  info.StartupInfo = *lpStartupInfo;
  info.StartupInfo.cb = sizeof(info);
  info.lpAttributeList = lpAttributeList;
  fSuccess = CreateProcess(lpApplicationName,
                           lpCommandLine,
                           lpProcessAttributes,
                           lpThreadAttributes,
                           bInheritHandles,
                           dwCreationFlags | EXTENDED_STARTUPINFO_PRESENT,
                           lpEnvironment,
                           lpCurrentDirectory,
                           &info.StartupInfo,
                           lpProcessInformation);
 }
 if (fInitialized) DeleteProcThreadAttributeList(lpAttributeList);
 if (lpAttributeList) HeapFree(GetProcessHeap(), 0, lpAttributeList);
 return fSuccess;
}
© www.soinside.com 2019 - 2024. All rights reserved.