沧歌


起风了,唯有努力生存


多线程检索文件 - 生产者消费者模型

我的CSDN博客是https://blog.csdn.net/daodao0/,但我感觉在CSDN上写限制很多,广告也很多,不甚满意。 所以github pages博客建立后,就渐渐不在CSDN上写了。一些我觉得有价值的博文我也搬运到了这里。 这篇是其中一篇。

———————— 分割线 —————————

要求

请分别开发一个单线程和多线程的程序来完成下述功能,并对其性能进行比较分析。检索1个超大文本文件(不小于1G)或多个较小的文本文件(个数不小于100, 文件尺寸不小于2K)某个关键字(单词)的出现次数。比较两程序用时。

具体实现

测试环境:ubuntu  16.04,读取一个1.1GB的文本文档

编译方法:使用g++编译器。终端输入g++ -Wall -o <目标文件> <源代码文件> -lpthread

代码说明

  • read_and_search.cpp:一次读取一行,单线程
  • read_and_search_queue_pthread.cpp:一次读取一行,双线程,使用队列作为缓冲区
  • read_and_search_pthread_mutex.c:一次读取1MB,注释USE_PTHREAD后是单线程,否则是双线程,使用互斥锁

结果分析

说明

时间(us微秒)

第1次

第2次

第3次

单线程,不使用队列,一次读一行

9034083

1478871

1473323

2线程,使用队列缓冲,一次读一行

9014760

8947088

9109844

单线程,一次读1MB

8892822

3124941

3114100

使用双线程,互斥量和信号量,一次读1MB

8903740

3051904

2998314

开始的时候写的程序是一次读一行进行处理的,双线程程序较单线程的程序首次运行速度差不多,读取并查找一个1G的文档都需要9s左右,双线程的优势并没有体现出来,猜想原因可能是内存载入磁盘数据的时候一次性读的是一整块,由于程序的空间局部性较好,操作系统的页面调度算法发挥了很大作用。

一个较为奇怪的地方是,单线程的程序运行第二次后时间明显会变短,而双线程没有明显变化。原因可能是双线程频繁向队列插入和取出元素,每个元素创建时都要分配内存并复制数据,队列开销比较大。

学习了操作系统的线程通信问题后,发现这个题目可以使用生产者——消费者问题模型解决,于是重新编写了代码。这次改成一次读取1MB的数据,速度较一次读一行要快。多线程编程时使用与条件变量相关的pthread调用,如pthread_cond_wait,pthread_cond_signal等来实现互斥访问临界区。多线程读文件平均比单线程快0.1s左右。

总的来说,多线程读文件的效果还是较单线程好一点,但是当程序空间局部性很好的时候,多线程的优势不明显,反而使编程更复杂,出错概率更大,得不偿失。

部分代码

read_and_search_pthread_mutex.c:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <pthread.h>

//#define USE_PTHREAD //注释掉该宏则是单线程

//------一次性读取的数据块----------------------------
#define MAXSIZE 1024*1024
typedef struct tag_datanode{
    char data[MAXSIZE];
    int size;
}datanode;
//生产者和消费者使用的缓冲区
datanode buff;

pthread_mutex_t the_mutex;
pthread_cond_t condc,condp;

int myglobal;//统计查找词出现的次数
int flag_reading;//表示主线程是否读完了文件


int fuzzymatch(const char *str,const char *what, int start_position, int end_position)
{//自定义函数模糊匹配
    //成功返回出现的位置
    //失败返回-1
    int base=start_position,j=0;
    while (( base+j < end_position)&&(what[j]!='\0')) {
        if (str[base+j]==what[j]) {
            j++;
        }else{
            base++;
            j=0;
        }
    }
    if (what[j]=='\0') return base;
    else return -1;
}

int num_of_word_in_cstring(char *str, char* findword, int str_len)
{//用来统计一个字符串里出现了多少次待查找的词
    int findnum=0;
    int found_place = fuzzymatch(str, findword, 0, str_len);
    int foundwordlen=strlen(findword);
    while (found_place!=-1){
    	findnum++;
    	found_place = fuzzymatch(str, findword, (found_place+foundwordlen), str_len);
    }
    return findnum;
}

void* consumer(void *arg)
{//消费数据 查找词的线程
    char findword[50];
    strcpy(findword,(char*)arg);

    datanode data_consumer;
    //printf("arg:%s  consumer start!\n",findword);
    int consumed_sum=0;
    while(flag_reading ){
        //printf("consumer try to enter critical region\n");
        pthread_mutex_lock(&the_mutex); //互斥使用缓冲区
        while(buff.size == 0) pthread_cond_wait(&condc,&the_mutex);
        //printf("consumer enter critical region\n");

        memcpy(&data_consumer,&buff,sizeof(datanode)); //从缓冲区中取出数据
        buff.size=0;
        pthread_cond_signal(&condp); //唤醒生产者
        pthread_mutex_unlock(&the_mutex); //释放缓冲区使用权

        //printf("consumer leave critical region | got data %d\n",data_consumer.size);
        consumed_sum += data_consumer.size;
        //消费数据
        myglobal += num_of_word_in_cstring(data_consumer.data,findword,data_consumer.size);
        //printf("%d ",myglobal);
    }
    if(buff.size!= 0){//生产者结束后,消费者处理剩余数据
        memcpy(&data_consumer,&buff,sizeof(datanode)); //从缓冲区中取出数据
        buff.size=0;
        //printf("consumer got the last data %d\n",data_consumer.size);
        consumed_sum += data_consumer.size;
        //消费数据
        myglobal += num_of_word_in_cstring(data_consumer.data,findword,data_consumer.size);
    }
    //printf("consumer end | consumed_sum:%d\n",consumed_sum);
    pthread_exit(0);
}

void* producer(void *arg)
{//生产数据  读文件线程
    FILE * in = (FILE *)arg;

    //printf("producer start!\n");
    datanode data_producer;
    int produced_sum=0;
    while(!feof(in)){
        //生产数据
        data_producer.size = fread(data_producer.data,1,MAXSIZE,in);

        //fprintf(stdout,"%*s",data_producer.size, data_producer.data);
        //printf("producer try to enter critical region\n");
        pthread_mutex_lock(&the_mutex); //互斥使用缓冲区
        while(buff.size != 0) pthread_cond_wait(&condp,&the_mutex);
        //printf("producer enter critical region | put data %d\n",data_producer.size);
        produced_sum+=data_producer.size;
        memcpy(&buff,&data_producer,sizeof(datanode));//向缓冲区中放数据
        pthread_cond_signal(&condc); //唤醒消费者
        pthread_mutex_unlock(&the_mutex); //释放缓冲区使用权
        //printf("producer leave critical region\n");
    }
    //printf("producer end | produced_sum:%d\n",produced_sum);
    flag_reading=0;
    pthread_exit(0);
}

int main(int argc, char **argv)
{
  //显示帮助信息
    if(argc < 2){
        printf("usage: read_and_search  filename findword\n");
        exit(-1);
    }

    //显示输入的参数
    printf("filename is %s\n findword is \"%s\"\n",argv[1],argv[2]);
#ifdef USE_PTHREAD
    printf("use pthread\n");
#endif // USE_PTHREAD
    printf("===============================\n");

    //创建变量,全局变量赋值
    //string filename=argv[1];
    //string findword=argv[2];
    memset((void*)&buff,0,sizeof(datanode));
    flag_reading = 1;
    myglobal=0;

#ifdef USE_PTHREAD
    pthread_t pro,con;
    pthread_mutex_init(&the_mutex,0);
    pthread_cond_init(&condc,0);
    pthread_cond_init(&condp,0);
#endif
    //--------------开始计时-------------------
    struct timeval start, end;
    gettimeofday( &start, NULL );

    //打开文件
    FILE *in;
    //ifstream in(argv[1]);
    in = fopen( argv[1] ,"r");
    if(in==NULL){
    	perror("Error opening file:");
    	exit(-2);
    }

#ifdef USE_PTHREAD
    //创建线程
    pthread_create(&con, NULL, consumer, (void*)argv[2]);
    pthread_create(&pro, NULL, producer, in);

    //等待线程结束
    pthread_join(pro, 0);
    pthread_join(con, 0);
#else
    while(!feof(in)){
        //生产数据
        buff.size = fread(buff.data,1,MAXSIZE,in);
        //消费数据
        myglobal += num_of_word_in_cstring(buff.data,argv[2],buff.size);

    }
#endif
    //-------------停止计时--------------------
    gettimeofday( &end, NULL );
    int timeuse = 1000000 * ( end.tv_sec - start.tv_sec ) + end.tv_usec -start.tv_usec;

#ifdef USE_PTHREAD
    pthread_cond_destroy(&condc);
    pthread_cond_destroy(&condp);
    pthread_mutex_destroy(&the_mutex);
#endif
    fclose(in);
    //输出统计结果
    printf("\"%s\" appears %d times.\n",argv[2],myglobal);
    //显示花费的时间
    printf("time: %d us\n\n",timeuse);

    return 0;
}

参考文献

  • [1]   《现代操作系统(原书第3版)》 Andrew S.Tanenbaum著,陈向群 马洪兵等译:2.3.6 互斥量 P74-76
  • [2]   Linux下多线程(pthread)编程实例 - 游手好弦 信步涂鸦 - 博客频道 -CSDN.NET http://blog.csdn.net/do2jiang/article/details/5527155
最近的文章

Linux下压缩和解压缩命令总结

相信不少人都曾经为在Linux下解压文件而头疼,笔者将解压文件的方法总结了一下,以备查阅。Linux下压缩和解压缩命令总结先上一张清单~ 文件扩展名 解压命令 压缩命令 .gz 法1:gunzip FileName.gz 法2:gzip -d FileName.gz gzip FileName ...…

linux继续阅读
更早的文章

Git使用笔记

我的CSDN博客是https://blog.csdn.net/daodao0/,但我感觉在CSDN上写限制很多,广告也很多,不甚满意。所以github pages博客建立后,就渐渐不在CSDN上写了。一些我觉得有价值的博文我也搬运到了这里。这篇是其中一篇。———————— 分割线 —————————要说如何学习使用Git,我自然推荐廖雪峰的git教程。还有一份Git命令备忘清单也值得参考。这...…

git, coding, linux继续阅读