优化多线程 C 程序以处理大型 TSV 文件

问题描述 投票:0回答:1

问题描述

我使用 POSIX 编写了一个多线程 C 程序来处理三个大型 TSV 输入文件。我的目标是根据这些输入生成输出文件,但我面临性能问题。

  • 文件 1:包含国家/地区密钥对(例如,非洲 0 美洲印第安人 1 ...).

  • 文件 2:一个大型 n x m 矩阵,其中单个数字值对应于文件 1 中的键。

    示例:

    0\t3\t4\t1\t2...
    
    0\t3\t4\t1\t2...
    
  • 文件 3:另一个大矩阵 (n x m+1),包含 0、1、9 列和整数值索引列。 示例:

    111111\t0\t1\t0\t0\t1...
    
    222222\t0\t1\t1\t1\t1...
    

目标:对于每个国家/地区,我需要创建一个与文件 3 结构相同的输出文件 (country.txt)。输出逻辑取决于文件 2 中的键与文件 3 中的值的匹配。

输出示例:

对于键为 1 的 Amerindian.txt,我们从文件 3(索引)复制第一个值,然后将每个值与特定国家/地区的键值进行比较。因此,尽管 File2 中的行位置为 4,但我们在每个位置都观察到 9 不匹配,因此在该位置我们从 File3 复制值,这会导致以下输出:

  111111\t9\t9\t9\t0\t9...

  222222\t9\t9\t9\t1\t9...

当前的方法

  • 我已经用 C 实现了多线程解决方案。
  • 程序可以运行,但速度非常慢。使用 24 个线程处理 50,000 行、17,000 列需要超过 8 个小时(根据 htop 中的 TIME+ 估计)。频繁进入磁盘睡眠状态的线程由 htop S 列中的
    D
    指示。
  • 我尝试过每 50,000 行分割输入文件以适应 RAM,但这并没有显着提高性能。

关键代码段




// Main function and processFile function details
int main(int argc, char *argv[]) {
//... library loading, typedef, declaration, parameter loading, read files into memory

    int currentLine = 0;
    for (int i = 0; i < numThreads; ++i) {

printf("Debug: Allocating ThreadArg for thread %d\n", i);

   threadArgs[i] = (ThreadArg *)malloc(sizeof(ThreadArg));
   int threadLineCount = linesPerThread + (i < remainingLines ? 1 : 0);
   threadArgs[i]->localOriginLines = malloc(sizeof(char *) * threadLineCount);
        threadArgs[i]->startLine = currentLine;
        threadArgs[i]->endLine = currentLine + linesPerThread;
        if (i < remainingLines) threadArgs[i]->endLine++;
        currentLine = threadArgs[i]->endLine;
        threadArgs[i]->specificOriginIndex = specificOriginIndex;
printf("Debug: Thread %d - startLine: %d, endLine: %d, specificOriginIndex: %d\n", i, threadArgs[i]->startLine, threadArgs[i]->endLine, threadArgs[i]->specificOriginIndex);
   if (pthread_create(&threads[i], NULL, processFile, threadArgs[i])) {
        fprintf(stderr, "Failed to allocate memory for thread arguments\n");
        
        // Cleanup already allocated resources for threadArgs
        for (int j = 0; j < i; ++j) {
            free(threadArgs[j]);
        }

      goto cleanup;

        return 1; // Exit due to memory allocation failure
    }
}


// Thread joining and data aggregation
for (int i = 0; i < numThreads; ++i) {
    printf("Waiting for Thread %d to finish\n", i);
    char ***threadResult;
    pthread_join(threads[i], (void **)&threadResult);

    // Aggregate data from each thread
    for (int j = threadArgs[i]->startLine; j < threadArgs[i]->endLine && j < txtLineCount; ++j) {
        for (int k = 0; k < originCount; ++k) {
            if (threadResult[k][j] != NULL) {
                originLines[k][j] = strdup(threadResult[k][j]); // Copy data safely
            }
        }
    }

    // Free memory allocated in the thread
    for (int k = 0; k < originCount; ++k) {
        for (int j = 0; j < txtLineCount; ++j) {
            free(threadResult[k][j]);
        }
        free(threadResult[k]);
    }
    free(threadResult); 
    free(threadArgs[i]); // Free the thread argument
}
   printf("Program finished\n");
        goto cleanup;
 
    return 0;
//cleanup code
}

// Modified processFile function for threading
void *processFile(void *arg) {
    ThreadArg *threadArg = (ThreadArg *)arg;
    // Allocate memory for localOriginLines
    threadArg->localOriginLines = malloc(originCount * sizeof(char **));
    for (int i = 0; i < originCount; ++i) {
        threadArg->localOriginLines[i] = malloc(txtLineCount * sizeof(char *));
        for (int j = 0; j < txtLineCount; ++j) {
            threadArg->localOriginLines[i][j] = NULL; // Corrected type to char*
        }
    }

    for (int lineIndex = threadArg->startLine; lineIndex < threadArg->endLine && lineIndex < txtLineCount; ++lineIndex) {
        char *tsvLine = strdup(tsvLines[lineIndex]);
        if (!tsvLine) {
            perror("Failed to duplicate TSV line for tokenization");
            continue;
        }

        char **tsvTokens = NULL;
        int tsvTokenCount = 0;
        char *saveptr;
        char *token = strtok_r(tsvLine, "\t", &saveptr);
        while (token) {
            char **temp = realloc(tsvTokens, (tsvTokenCount + 1) * sizeof(char *));
            if (temp == NULL) {
                perror("Failed to reallocate memory for tsvTokens");
                break;
            }
            tsvTokens = temp;
            tsvTokens[tsvTokenCount++] = strdup(token);
            token = strtok_r(NULL, "\t", &saveptr);
        }

        for (int originIndex = 0; originIndex < originCount; ++originIndex) {
            const char *txtLineContent = txtLines[lineIndex];
            size_t newLineSize = strlen(txtLineContent) + tsvTokenCount * 2; // Example size calculation
            threadArg->newline = calloc(newLineSize, sizeof(char));
            if (!threadArg->newline) {
                perror("Failed to allocate memory for newLine");
                break; // Skip this line on allocation failure
            }
            threadArg->currentPtr = threadArg->newline;
            
            // Copy initial ID part from txtLineContent, if present
            const char *firstTab = strchr(txtLineContent, '\t');
            size_t idLength = firstTab ? (firstTab - txtLineContent) : 0;
            if (idLength > 0) {
                strncpy(threadArg->currentPtr, txtLineContent, idLength);
                threadArg->currentPtr += idLength;
                *threadArg->currentPtr++ = '\t';
            }

            txtLineContent = firstTab ? firstTab + 1 : txtLineContent;
            for (int columnIndex = 0; columnIndex < tsvTokenCount; ++columnIndex) {
                int tokenValue = atoi(tsvTokens[columnIndex]);
                *threadArg->currentPtr++ = (tokenValue == origins[originIndex].code && columnIndex < strlen(txtLineContent)) ? txtLineContent[columnIndex] : '9';
                if (columnIndex < tsvTokenCount - 1 && columnIndex < strlen(txtLineContent) - 1) {
                    *threadArg->currentPtr++ = '\t';
                }
            }
            *threadArg->currentPtr = '\0';
            threadArg->localOriginLines[originIndex][lineIndex] = strdup(threadArg->newline);
        }

        for (int i = 0; i < tsvTokenCount; ++i) {
            free(tsvTokens[i]);
        }
        free(tsvTokens);
        free(tsvLine);
    }

    // Return the localOriginLines for further processing outside the thread
    return threadArg->localOriginLines;
}

问题: 如何优化我的 C 程序以提高其处理这些大文件的性能?我的多线程方法中是否有特定的技术或更改可以帮助减少处理时间并避免磁盘睡眠问题?

c multithreading posix
1个回答
0
投票

每个文件有 50000 行和 17000 列,所以每个文件大约有 2 GB。如果您同时在多个线程上复制每一行和每行上的标记,我预计会有大量非常小的内存分配,最终您将耗尽 RAM 并开始交换到磁盘。

我认为您不需要所有的内存分配或复制。

文件 1 并不是特别有趣。您将其读入内存,并将其转换为某种键值列表,然后开始为每个线程生成一次。既然你说键是个位数的值,我怀疑这是你的瓶颈。

由于您需要多次读取文件2和3(如果我理解正确的话,每个国家一次),我认为在启动时将这两个文件读取到RAM是有意义的,因此您不需要多次打开相同的文件.

现在,实际解析文件2和3:

没有分配,没有复制,没有

strtok
strlen
。执行简单的字符搜索来查找下一个位置,并使用刚刚获得的偏移量来使用已加载的文件 2 和 3 中的数据。例如:

  1. 找到文件 3 的第一个制表符分隔符。现在您有了行开始的偏移量,即索引开始和索引结束。只需将其与选项卡一起直接写入输出文件即可。分隔符。

  2. 在文件 2 中查找第一个选项卡,在文件 3 中查找下一个选项卡。现在您拥有第一个键和第一个值的位置。只需比较文件 2 密钥的 ascii 值(密钥是个位数),无需进行数字转换。根据比较结果,直接将结果写入输出;值 9 或文件 3 中的值。

  3. 继续重复步骤 2,直到到达行尾,在这种情况下,请再次从步骤 1 开始。

重要的一点是,在写入文件时,您只移动数据(没有无用的复制或分配),并且您只需要访问数据最少的时间(例如,不需要

strlen
,因为我们可以根据偏移量计算字符串长度.)

© www.soinside.com 2019 - 2024. All rights reserved.