我有一个程序打算接收一个文件(WarandPeace.txt),将其读入给定数量的线程,并将其处理成单词列表和每个单词的计数。我在每个线程中分别打开文件,用
pread()
将其中的一部分读入一个字符串,然后用 strtok_r()
将字符串分解为标记。我将标记添加到列表中,在我发现相同标记重复的地方递增,然后最终将每个线程的列表组合成一个列表。
我的问题是,据我所知,
strtok_r
由于某种原因未能准确计算各种单词。例如,我知道 princess
这个词在文件中出现了 937 次,但在我的代码中 strtok_r
只出现了大约 300 次。无论我使用多少线程运行程序,错误仍然存在。将标记添加到列表似乎不是问题,因为当我在处理字符串时执行printf("%s,%i", token, count)
时,我得到相同的减少计数。
编辑:
这是重现问题的最小可编译片段
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
char *delim = "\"\'.“”‘’?:;-,—*($%)! \t\n\x0A\r";
int main(int argc, char *argv[]) {
char* fileName = "WarAndPeace.txt";
int fileID = open(fileName, O_RDONLY);
off_t fileSize = lseek(fileID, 0, SEEK_END);
printf("\nfilesize: %ld\n", fileSize);
char readFromFile[fileSize + 1];
readFromFile[fileSize] = '\0';
ssize_t readResult = pread(fileID, readFromFile, fileSize, 0);
char *token;
char *rest = readFromFile;
char *remaining;
printf("\nsize of string being fed to strtok_r(): %ld", strlen(rest));
token = strtok_r(rest, delim, &remaining);
int counter = 0;
if (strcmp(token, "princess") == 0) {
counter++;
printf("\ntoken - %s, count - %i\n", token, counter);
}
while ((token = strtok_r(NULL, delim, &remaining))) {
if (strcmp(token, "princess") == 0) {
counter++;
printf("\ntoken - %s, count - %i\n", token, counter);
}
}
}
这应该算公主937次,但只算了330次。
注意:如果你想自己测试这个,你需要下载战争与和平并重命名它WarAndPeace.txt.
这是我的完整程序:
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
// You may find this Useful
char * delim = "\"\'.“”‘’?:;-,—*($%)! \t\n\x0A\r";
// This struct holds a word from the file and its count, as well as a pointer
// to another word, in order to create a singly-linked list of words
struct Word {
char * word;
int count;
struct Word * next;
};
typedef struct Word Word;
// This struct holds the information passed to each thread as well as a pointer
// to the final shared list to be created, one of these will be created
// for each thread
struct ThreadRecord {
int totalThreads;
int thisThread;
Word ** listHead;
char * fileName;
};
typedef struct ThreadRecord ThreadRecord;
// This is the function passed into each thread that does the main work of
// the program
void * readFile( void * threadRecordVoid );
// this function runs after the threads have returned and sorts the completed
// list by size of count
void sortListBySize( Word ** listHead );
// This function runs as part of readFile and combines the multiple thread's
// list of words into a single combined list of words
void combineList (Word ** combinedList, Word ** transferList);
int main (int argc, char *argv[])
{
//***TO DO*** Look at arguments, open file, divide by threads
// Allocate and Initialize and storage structures
char * fileName = argv[1];
char * ptr;
int threadCount = (int)strtol(argv[2], &ptr, 10);
// The list pointer we pass to every thread to make our combined list
Word * listHead = NULL;
// A threadRecord created for every thread passing in that thread's number
// the total number of threads and the filename, which will allow each
// thread to access the file at the appropriate place.
ThreadRecord * threadRecords[threadCount];
for(int i = 0; i < threadCount; i++){
threadRecords[i] = malloc(sizeof(ThreadRecord));
threadRecords[i]->totalThreads = threadCount;
threadRecords[i]->thisThread = i;
threadRecords[i]->fileName = fileName;
threadRecords[i]->listHead = &listHead;
}
//**************************************************************
// DO NOT CHANGE THIS BLOCK
//Time stamp start
struct timespec startTime;
struct timespec endTime;
clock_gettime(CLOCK_REALTIME, &startTime);
//**************************************************************
// *** TO DO *** start your thread processing
// wait for the threads to finish
pthread_t threads[threadCount];
int threadReturn[threadCount];
// We run our threads, calling readFile
for (int i = 0; i < threadCount; i++){
threadReturn[i] = pthread_create( &threads[i], NULL, readFile, (void*) threadRecords[i]);
}
// we wait on our threads
for (int i = 0; i < threadCount; i++){
pthread_join(threads[i], NULL);
}
// If a thread returns an error, we say so and exit
for (int i = 0; i < threadCount; i++){
if (threadReturn[i] > 0) {
printf("Thread %d returned an error, quitting", i);
return 2;
}
}
// ***TO DO *** Process TOP 10 and display
// we sort the master list of threads
sortListBySize(threadRecords[0]->listHead);
// We print the top 10 threads
printf("\nThe top 10 words - %d threads\n", threadCount);
Word * printList = *threadRecords[0]->listHead;
for (int i = 0; i < 10 && printList != NULL; i++){
printf("\nWord - %s, count - %d\n", printList->word, printList->count);
printList = printList->next;
}
//**************************************************************
// DO NOT CHANGE THIS BLOCK
//Clock output
clock_gettime(CLOCK_REALTIME, &endTime);
time_t sec = endTime.tv_sec - startTime.tv_sec;
long n_sec = endTime.tv_nsec - startTime.tv_nsec;
if (endTime.tv_nsec < startTime.tv_nsec)
{
--sec;
n_sec = n_sec + 1000000000L;
}
printf("Total Time was %ld.%09ld seconds\n", sec, n_sec);
//**************************************************************
// ***TO DO *** cleanup
// here we free the list memory
Word * deleteList = *threadRecords[0]->listHead;
while(deleteList){
Word * tempWord = deleteList;
deleteList = deleteList->next;
free(tempWord->word);
tempWord->word = NULL;
free(tempWord);
tempWord = NULL;
}
// here we free the threadRecord memory
for (int i = 0; i < threadCount; i++){
free(threadRecords[i]);
threadRecords[i] = NULL;
}
}
// This is the function that runs for every thread, it divides up the file into a chunk for each
// thread, sorts that chunk of the file into words and totals of occurrences (sorting the words
// alphabetically), then it merges every thread's list into a combined list using mutex locks
// to ensure data safety. The merged list is also sorted, this way the sort step in which the
// threads can run independently is bumped from n to nlogn but the merge step in which the
// threads are limited by the lock is reduced from n^2 to nlogn, resulting in code that gets
// more efficient with more threads
void * readFile (void * threadRecordVoid){
// we recover the voided threadRecord that contains our passthrough data
ThreadRecord * threadRecord = (ThreadRecord *)threadRecordVoid;
// we open the file
int fileID = open(threadRecord->fileName, O_RDONLY);
// we get the filesize
off_t fileSize = lseek(fileID, 0, SEEK_END);
printf("\nfilesize: %ld\n", fileSize);
// we calculate how big each thread's chunk of the filesize will be
long fileChunkSize = fileSize / threadRecord->totalThreads;
printf("\nchunk size: %ld\n", fileChunkSize);
// we create the array we'll use to read from the file and end it
// with a null terminator
char readFromFile[fileChunkSize + 1];
readFromFile[fileChunkSize] = '\0';
// we read from the file
ssize_t readResult = pread(fileID,
readFromFile,
fileChunkSize,
fileChunkSize * threadRecord->thisThread);
/*
char sample[1000];
sample[999] = '\0';
strncpy(sample, readFromFile, 999);
printf("\nSample Text of War and Peace:\n%s\n", sample);
*/
// Here we begin breaking up the file and storing it as individual words
// with counts. We use strtok_r (because it's threadsafe) to read a word
// from the string we created from our thread's chunk of the file. We
// handle that word in various ways depending on how it sorts into the list.
// We move on to the next word until we're out of words. Then we're done and we
// move on ot the merge step.
// We start will a null node.
Word * threadListHead = NULL;
char* token;
char* rest = readFromFile;
// We get a token from strtok_r and check if it's null
int counter = 0;
while ((token = strtok_r(rest, delim, &rest)) ){
// We only do something with the token if it's at least six chars
if (strcmp(token, "princess") == 0) {
counter++;
printf("\ntoken - %s, count - %i\n", token, counter);
}
if (strlen(token) >= 6){
// This handles the very first node created in the list
if (threadListHead == NULL){
threadListHead = malloc(sizeof(Word));
threadListHead->word = malloc(sizeof(token));
strcpy(threadListHead->word,token);
threadListHead->count = 1;
threadListHead->next = NULL;
}
// This handles count increases at the head node
else if (strcmp(threadListHead->word, token) == 0) {
threadListHead->count++;
}
// This handles the case of whether a word is lower in the
// list than the head, in which case we create a new head
else if (strcmp(threadListHead->word, token) > 0) {
Word * newWord = malloc(sizeof(Word));
newWord->word = malloc(sizeof(token));
strcpy(newWord->word, token);
newWord->count = 1;
newWord->next = threadListHead;
threadListHead = newWord;
}
// here we handle traversing the list after the head
else {
Word * currWord = threadListHead;
// We traverse the list while the next word is not null and the next word
// is smaller than the current token
while(currWord->next != NULL && strcmp(currWord->next->word, token) < 0) {
currWord = currWord->next;
}
// If we get to null, we add a new node on the end of the list
if (currWord->next == NULL){
currWord->next = malloc(sizeof(Word));
currWord = currWord->next;
currWord->next = NULL;
currWord->word = malloc(sizeof(token));
strcpy(currWord->word, token);
currWord->count = 1;
if(strcmp(currWord->word, "princess") == 0){
printf("\nAdding at end\n%s - count %d",
currWord->word, currWord->count);
}
}
// if we have a matching word, we increase the count
else if (strcmp(currWord->next->word, token) == 0) {
currWord->next->count++;
/*
if(strcmp(currWord->next->word, "princess") == 0){
printf("\nincrementing count\n%s - count %d",
currWord->next->word, currWord->next->count);
}
*/
}
// if we have a node that matches a particular position in the list, we
// insert a new node into that position
else if (strcmp(currWord->next->word, token) > 0) {
Word * newWord = malloc(sizeof(Word));
newWord->word = malloc(sizeof(token));
strcpy(newWord->word, token);
newWord->count = 1;
newWord->next = currWord->next;
currWord->next = newWord;
if(strcmp(currWord->next->word, "princess") == 0){
printf("\ninserting into list\n%s - count %d",
currWord->next->word, currWord->next->count);
}
}
else {
printf("\nerror storing token - %s\n", token);
}
}
}
}
// Here we call the function that combines the sorted lists into a single list
combineList(threadRecord->listHead, &threadListHead);
/*
Word * transferHead = threadListHead;
while(transferHead) {
pthread_mutex_lock( &mutex1 );
if ((*threadRecord->listHead) == NULL){
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = NULL;
*threadRecord->listHead = tempWord;
}
else if (strcmp((*threadRecord->listHead)->word, transferHead->word) == 0){
Word * tempWord = transferHead;
transferHead = transferHead->next;
(*threadRecord->listHead)->count += tempWord->count;
free(tempWord->word);
tempWord->word = NULL;
free(tempWord);
tempWord = NULL;
}
else if (strcmp((*threadRecord->listHead)->word, transferHead->word) > 0){
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = *threadRecord->listHead;
*threadRecord->listHead = tempWord;
}
else {
Word * currWord = *threadRecord->listHead;
while(currWord->next != NULL && strcmp(currWord->next->word, transferHead->word) < 0){
currWord = currWord->next;
}
if (currWord->next == NULL){
//printf("segfault? currWord->next == NULL");
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = NULL;
currWord->next = tempWord;
}
else if (strcmp(currWord->next->word, transferHead->word) == 0){
//printf("segfault? strcmp(currWord->next->word, transferHead->word) == 0");
Word * tempWord = transferHead;
transferHead = transferHead->next;
currWord->next->count = currWord->next->count + tempWord->count;
free(tempWord->word);
tempWord->word = NULL;
free(tempWord);
tempWord = NULL;
}
else if (strcmp(currWord->next->word, transferHead->word) > 0)
{
//printf("segfault? strcmp(currWord->next->word, transferHead->word) > 0");
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = currWord->next;
currWord->next = tempWord;
}
else{
printf("\nError transferring word - %s\n", transferHead->word);
}
}
pthread_mutex_unlock( &mutex1 );
}
*/
/*
printf("\nThread %d - first 10 of threadRecord\n", threadRecord->thisThread);
printList = *threadRecord->listHead;
for(int i = 0; i < 10 && printList != NULL; i++){
printf("\nword - %s - count - %d\n", printList->word, printList->count);
printList = printList->next;
}
*/
/*
while(threadListHead){
Word * toDelete = threadListHead;
threadListHead = threadListHead->next;
free(toDelete);
toDelete = NULL;
}
*/
}
// This is the function that readFile runs to combine the sorted lists from each
// thread into a single combined list. It pulls nodes directly out of the
// existing lists and enters them into the new list, deleting duplicate entries
// after adding their counts to the existing entry's count
void combineList (Word ** combinedList, Word ** transferList){
Word * transferHead = *transferList;
while(transferHead) {
// We use a mutex lock here to control access to the combined list since
// writes to it need to be thread safe
pthread_mutex_lock( &mutex1 );
// Here we handle creating the first node in the previously empty list
if ((*combinedList) == NULL){
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = NULL;
*combinedList = tempWord;
}
// next we handle nodes that match the head, deleting the nodes after we've
// added in the count
else if (strcmp((*combinedList)->word, transferHead->word) == 0){
Word * tempWord = transferHead;
transferHead = transferHead->next;
(*combinedList)->count += tempWord->count;
free(tempWord->word);
tempWord->word = NULL;
free(tempWord);
tempWord = NULL;
}
// Next we handle words that go before the head by creating a new head and
// tagging the rest of the list onto it
else if (strcmp((*combinedList)->word, transferHead->word) > 0){
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = *combinedList;
*combinedList = tempWord;
}
// Next we handle traversal of the list beyond the head
else {
Word * currWord = *combinedList;
// We search the list while the next pointer is not null and is smaller than the
// current search term
while(currWord->next != NULL
&& strcmp(
currWord->next->word,
transferHead->word) < 0){
currWord = currWord->next;
}
// if we reach null, we insert the node at the end and make its next pointer the new
// NULL
if (currWord->next == NULL){
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = NULL;
currWord->next = tempWord;
if(strcmp("princess", tempWord->word) == 0){
printf("\ninserting at end of list\n%s - count %d",
tempWord->word,
tempWord->count);
}
}
// if the word is a match, we add the count to the list and delete the transfer node
else if (strcmp(currWord->next->word, transferHead->word) == 0){
Word * tempWord = transferHead;
transferHead = transferHead->next;
currWord->next->count = currWord->next->count + tempWord->count;
if(strcmp("princess", tempWord->word) == 0){
printf("\nadding to existing node\n%s - count %d",
currWord->next->word,
currWord->next->count);
}
free(tempWord->word);
tempWord->word = NULL;
free(tempWord);
tempWord = NULL;
}
// if the word slots in earlier in the list, we add the node in that location
else if (strcmp(currWord->next->word, transferHead->word) > 0)
{
Word * tempWord = transferHead;
transferHead = transferHead->next;
tempWord->next = currWord->next;
currWord->next = tempWord;
if(strcmp("princess", tempWord->word) == 0){
printf("\ninserting into list\n%s - count %d",
tempWord->word,
tempWord->count);
}
}
else{
printf("\nError transferring word - %s\n", transferHead->word);
}
}
pthread_mutex_unlock( &mutex1 );
}
}
// This function sorts the completed list by the size of the count using an insertion
// sort. Note: May come back to this later to sort by something with better time
// complexity to improve program performance
void sortListBySize ( Word ** listHead ){
Word * sortedList = NULL;
Word * currListHead = *listHead;
while ( currListHead) {
// Here we handle the first node in the sorted list, taking it from the
// unsorted list
if (sortedList == NULL){
sortedList = currListHead;
currListHead = currListHead->next;
sortedList->next = NULL;
}
// Here we handle nodes that sort before the list head, making them
// the new list head and tacking the list on after them
else if (sortedList->count <= currListHead->count){
Word * tempWord = currListHead;
currListHead = currListHead -> next;
tempWord->next = sortedList;
sortedList = tempWord;
}
// Here we handle sorted list traversal beyond the head
else {
Word * currWord = sortedList;
// We advance in the sorted list while the next pointer doesn't equal null
// and its count is greater than the unsorted list node's count
while(currWord->next != NULL && currWord->next->count > currListHead->count){
currWord = currWord->next;
}
// If we reach null, we insert the pointer at the end and set its next
// pointer to null
if (currWord->next == NULL) {
Word * tempWord = currListHead;
currListHead = currListHead->next;
tempWord->next = NULL;
currWord->next = tempWord;
}
// If we reach a position in the list earlier than null, we insert the node
// at that position
else if (currWord->next->count <= currListHead->count){
Word * tempWord = currListHead;
currListHead = currListHead->next;
tempWord->next = currWord->next;
currWord->next = tempWord;
}
}
}
// We set the listHead to the sortedList pointer, setting the list to the
// sorted list
*listHead = sortedList;
}
前10个线程的正确结果应该是:
Number 1 is Pierre with a count of 1963
Number 2 is Prince with a count of 1928
Number 3 is Natásha with a count of 1212
Number 4 is Andrew with a count of 1143
Number 5 is himself with a count of 1020
Number 6 is princess with a count of 916
Number 7 is French with a count of 881
Number 8 is before with a count of 833
Number 9 is Rostóv with a count of 776
Number 10 is thought with a count of 767
我的结果如下:
Word - Pierre, count - 1963
Word - Prince, count - 1577
Word - Natásha, count - 1213
Word - Andrew, count - 1143
Word - himself, count - 1017
Word - French, count - 881
Word - before, count - 779
Word - Rostóv, count - 776
Word - thought, count - 766
Word - CHAPTER, count - 730
有谁知道错误的来源是什么?
你称之为“最小可编译片段”的单线程代码是正确的。
错的是你的期望
你说你期待“公主”937次。我不知道你是怎么得到这个数字的,但我猜你是在像 Chrome 这样的浏览器中打开文本文件并搜索“公主”。当我这样做时,我确实看到了 937.
但是...
这包括
princess
Princess
princesses
Princesses
因为浏览器搜索 1) 不区分大小写且 2) 不适用于整个单词。
使用
grep
(在Linux上)我得到
grep princess -cw WarAndPeace.txt
330
就像你的程序一样。
所以我做了一些修改(注意:这不是在真实代码中做的方式):
token = strtok_r(rest, delim, &remaining);
int counter = 0;
if (token[0] == 'P') token[0] = 'p'; // HACK !!
if (strncmp(token, "princess", 8) == 0){ // HACK !!
counter++;
printf("\ntoken - %s, count - %i\n", token, counter);
}
while ((token = strtok_r(NULL, delim, &remaining)) ){
if (token[0] == 'P') token[0] = 'p'; // HACK !!
if (strncmp(token, "princess", 8) == 0) { // HACK !!
counter++;
printf("\ntoken - %s, count - %i\n", token, counter);
}
}
现在我得到 937
对于你的多线程代码这一行:
long fileChunkSize = fileSize / threadRecord->totalThreads;
不好。你做了一个整数除法,所以结果被截断了。如果文件是 209 字节并且你有 10 个线程,你会得到 209/10 = 20。所以最后 9 个字节没有被处理。
此外,您可能/将会发生这样的情况,即您拆分一个词,以便一个线程获得“prin”而下一个线程获得“cess”。而且您将无法正确计数。您需要一种更高级的文件分割方式。
ssize_t readResult = pread(fileID,
readFromFile,
fileChunkSize,
fileChunkSize * threadRecord->thisThread);
man:pread() 从文件描述符 fd 中读取“最多”count 个字节
检查您的退货。你可能只读了一小段书。
看起来你试图用“公主”调试并短路你的逻辑,发现问题在
strtok_r
。通常我会抱怨将你的代码减少到一个更小的例子,但我很确定你手动验证 counter
有错误的值,即使现在看的代码丢失了。除了简短的阅读之外,这并没有留下多少东西。
基于@Support Ukraine 提供的代码示例,我们可以减少代码的重复性并使用 POSIX 的
strncasecmp()
进行不区分大小写的比较。 (是的,strncasecmp()
不在 C 标准中,但我们在这里使用 POSIX 线程,所以我怀疑这会是一个问题)。
strcasecmp() 函数执行逐字节比较 字符串 s1 和 s2,忽略字符的大小写。
strncasecmp() 函数类似,除了它比较没有 超过 n 个字节的 s1 和 s2.
if (!strncasecmp (token, "princess", 8)) {
count++;
}
并且可以使用
strtok_r()
循环消除对 for
的重复调用:
for (char *token = rest; token = strtok_r (token, delim, &remaining); token = 0) {
if (!strncasecmp (token, "princess", 8) {
count++;
}
}
— @Fe203