线程池小试

本文主要介绍简单的线程池的实现和问题的解决。

主要参考http://www.cnblogs.com/li-daphne/p/5583224.html

什么是线程池

由于线程的创建和开销比较大,所以我们一开始就直接创建一系列线程对象放在线程池中。需要用线程的时候从线程池中取,用完了再放回线程池。
线程池维护一个任务队列。将任务插入到任务队列中,然后线程去取。

线程池的实现

ThreadPool.h

#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED
#include <deque>
#include <string>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>

// 使用C++98 语言规范实现的线程池: 面向对象做法,每一个job都是Task继承类的对象
namespace lz
{
    class Task
    {
    public:
        Task(char* arg = NULL, const std::string taskName = "")
            : arg_(arg)
            , taskName_(taskName)
        {
            //printf("task construct: %s\n",arg_);
        }
        virtual ~Task()
        {
        }
        void setArg(char* arg)
        {
            arg_ = arg;
        }

        virtual int run()= 0;

    protected:
        char*       arg_;
        std::string taskName_;
    };


    class ThreadPool
    {
    public:
        ThreadPool(int threadNum = 10);
        ~ThreadPool();

    public:
        size_t addTask(Task *task);
        void   stop();
        int    size();
        void   start();
        Task*  take();

    private:
        int createThreads();
        static void* threadFunc(void * threadData);

    private:
        ThreadPool& operator=(const ThreadPool&);
        ThreadPool(const ThreadPool&);

    private:
        volatile  bool              isRunning_;
        int                         threadsNum_;
        pthread_t*                  threads_;

        std::deque<Task*>           taskQueue_;
        pthread_mutex_t             mutex_;
        pthread_cond_t              condition_;
    };
}
#endif // THREADPOOL_H_INCLUDED

头文件中主要定义了任务类Task,包括成员变量 arg_, taskName_,
成员函数setArg。纯虚函数run。

ThreadPool类:

  • addTask()将任务插入队列。
  • start()调用createThreads()函数,主要是创建线程。
    • pthread_create(&threads_[i], NULL, threadFunc, this);
      • 这里的this以成员变量作为参数。
      • threadFunc就是线程运行的函数。
      • ThreadPool* pool = static_cast<ThreadPool*>(arg);
      • 另外调用pool->take()和task->run()
  • take()就是取一个任务。
  • stop()停止线程池。
  • size()线程池中运行的线程数。

ThreadPool.cpp是线程池的具体实现。

#include "stdafx.h"
#include "ThreadPool.h"
#include <stdio.h>
#include <assert.h>

namespace lz
{
    ThreadPool::ThreadPool(int threadNum)
    {
        threadsNum_ = threadNum;
        //isRunning_ = true;
    }

    void ThreadPool::start(){
        createThreads();
        isRunning_ = true;
    }

    ThreadPool::~ThreadPool()
    {
        stop();
        for(std::deque<Task*>::iterator it = taskQueue_.begin(); it != taskQueue_.end(); ++it)
        {
            delete *it;
        }
        taskQueue_.clear();
    }

    int ThreadPool::createThreads()
    {
        pthread_mutex_init(&mutex_, NULL);
        pthread_cond_init(&condition_, NULL);
        threads_ = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum_);
        for (int i = 0; i < threadsNum_; i++)
        {
            pthread_create(&threads_[i], NULL, threadFunc, this);
        }
        return 0;
    }

    size_t ThreadPool::addTask(Task *task)
    {
        pthread_mutex_lock(&mutex_);
        taskQueue_.push_back(task);
        int size = taskQueue_.size();
        pthread_mutex_unlock(&mutex_);
        pthread_cond_signal(&condition_);
        return size;
    }

    void ThreadPool::stop()
    {
        if (!isRunning_)
        {
            return;
        }

        isRunning_ = false;
        pthread_cond_broadcast(&condition_);

        for (int i = 0; i < threadsNum_; i++)
        {
            pthread_join(threads_[i], NULL);
        }

        free(threads_);
        threads_ = NULL;

        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&condition_);
    }

    int ThreadPool::size()
    {
        pthread_mutex_lock(&mutex_);
        int size = taskQueue_.size();
        pthread_mutex_unlock(&mutex_);
        return size;
    }

    Task* ThreadPool::take()
    {
        Task* task = NULL;
        while (!task)
        {
            pthread_mutex_lock(&mutex_);
            while (taskQueue_.empty() && isRunning_)
            {
                pthread_cond_wait(&condition_, &mutex_);
            }

            if (!isRunning_)
            {
                pthread_mutex_unlock(&mutex_);

                break;
            }
            else if (taskQueue_.empty())
            {
                pthread_mutex_unlock(&mutex_);
                continue;
            }

            assert(!taskQueue_.empty());
            task = taskQueue_.front();
            taskQueue_.pop_front();
            pthread_mutex_unlock(&mutex_);
        }
        return task;
    }

    void* ThreadPool::threadFunc(void* arg)
    {
        pthread_t tid = pthread_self();
        ThreadPool* pool = static_cast<ThreadPool*>(arg);
        while (pool->isRunning_)
        {
            Task* task = pool->take();

            if (!task)
            {
                printf("thread %lu will exit\n", tid);
                break;
            }

            assert(task);
            task->run();
        }
        return 0;
    }
    //bool ThreadPool::getisRunning_(){return isRunning_;}

}

main.cpp

// pthreadPool.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"

#include <iostream>
#include <stdio.h>
//#include <unistd.h>
#include <windows.h>
#include <stdlib.h>
#include <pthread.h>
#include "ThreadPool.h"

using namespace std;
class Mytask : public  lz::Task{
public:
    Mytask(){	}
    virtual int run(){

        //printf("thread[%lu] :",pthread_self());
        //printf("%s\n",arg_);

        printf("%s thread[%lu] :%\n",arg_,pthread_self(),arg_);

        //printf("%s  :%s\n",arg_,arg_);
        //
        Sleep(1000);
        return 0;
    }
};


int main()
{
    cout << "begin" << endl;
    char szTmp[] = "hello world";
    Mytask taskobj;
    taskobj.setArg((char*)szTmp);
    taskobj.setInt(4);


    lz::ThreadPool threadPool(4);
    threadPool.start();

    for(int i = 0;i<8;i++){

        threadPool.addTask(&taskobj);
    }

    while(1){
        printf("there are still %d tasks need to process\n",threadPool.size());
        if(threadPool.size()==0){
            threadPool.stop();
            printf("now i will exit from main\n");
            exit(0);
        }
        Sleep(2000);
    }
    cout<<"end"<<endl;
    return 0;
}

注意一下MyTask中的run函数。
一开始运行都是下面这个结果。

[threadID]:(null)

和同学讨论了很久,怀疑是构造函数,或者是继承、run函数传递的问题。
但是当我把输出这句话写成下面这种形式:

printf("%s thread[%lu] :%\n",arg_,pthread_self(),arg_);

输出的结果是

hello world [threadID]:(null)

搞不懂为什么在一个函数前后输出同一个成员变量的结果会不同。