线程池小试
本文主要介绍简单的线程池的实现和问题的解决。
主要参考http://www.cnblogs.com/li-daphne/p/5583224.html
什么是线程池
由于线程的创建和开销比较大,所以我们一开始就直接创建一系列线程对象放在线程池中。需要用线程的时候从线程池中取,用完了再放回线程池。
线程池维护一个任务队列。将任务插入到任务队列中,然后线程去取。
线程池的实现
ThreadPool.h
#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED
#include <deque>
#include <string>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
// 使用C++98 语言规范实现的线程池: 面向对象做法,每一个job都是Task继承类的对象
namespace lz
{
class Task
{
public:
Task(char* arg = NULL, const std::string taskName = "")
: arg_(arg)
, taskName_(taskName)
{
//printf("task construct: %s\n",arg_);
}
virtual ~Task()
{
}
void setArg(char* arg)
{
arg_ = arg;
}
virtual int run()= 0;
protected:
char* arg_;
std::string taskName_;
};
class ThreadPool
{
public:
ThreadPool(int threadNum = 10);
~ThreadPool();
public:
size_t addTask(Task *task);
void stop();
int size();
void start();
Task* take();
private:
int createThreads();
static void* threadFunc(void * threadData);
private:
ThreadPool& operator=(const ThreadPool&);
ThreadPool(const ThreadPool&);
private:
volatile bool isRunning_;
int threadsNum_;
pthread_t* threads_;
std::deque<Task*> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t condition_;
};
}
#endif // THREADPOOL_H_INCLUDED
头文件中主要定义了任务类Task,包括成员变量 arg_, taskName_,
成员函数setArg。纯虚函数run。
ThreadPool类:
- addTask()将任务插入队列。
- start()调用createThreads()函数,主要是创建线程。
- pthread_create(&threads_[i], NULL, threadFunc, this);
- 这里的this以成员变量作为参数。
- threadFunc就是线程运行的函数。
- ThreadPool* pool = static_cast<ThreadPool*>(arg);
- 另外调用pool->take()和task->run()
- pthread_create(&threads_[i], NULL, threadFunc, this);
- take()就是取一个任务。
- stop()停止线程池。
- size()线程池中运行的线程数。
ThreadPool.cpp是线程池的具体实现。
#include "stdafx.h"
#include "ThreadPool.h"
#include <stdio.h>
#include <assert.h>
namespace lz
{
ThreadPool::ThreadPool(int threadNum)
{
threadsNum_ = threadNum;
//isRunning_ = true;
}
void ThreadPool::start(){
createThreads();
isRunning_ = true;
}
ThreadPool::~ThreadPool()
{
stop();
for(std::deque<Task*>::iterator it = taskQueue_.begin(); it != taskQueue_.end(); ++it)
{
delete *it;
}
taskQueue_.clear();
}
int ThreadPool::createThreads()
{
pthread_mutex_init(&mutex_, NULL);
pthread_cond_init(&condition_, NULL);
threads_ = (pthread_t*)malloc(sizeof(pthread_t) * threadsNum_);
for (int i = 0; i < threadsNum_; i++)
{
pthread_create(&threads_[i], NULL, threadFunc, this);
}
return 0;
}
size_t ThreadPool::addTask(Task *task)
{
pthread_mutex_lock(&mutex_);
taskQueue_.push_back(task);
int size = taskQueue_.size();
pthread_mutex_unlock(&mutex_);
pthread_cond_signal(&condition_);
return size;
}
void ThreadPool::stop()
{
if (!isRunning_)
{
return;
}
isRunning_ = false;
pthread_cond_broadcast(&condition_);
for (int i = 0; i < threadsNum_; i++)
{
pthread_join(threads_[i], NULL);
}
free(threads_);
threads_ = NULL;
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&condition_);
}
int ThreadPool::size()
{
pthread_mutex_lock(&mutex_);
int size = taskQueue_.size();
pthread_mutex_unlock(&mutex_);
return size;
}
Task* ThreadPool::take()
{
Task* task = NULL;
while (!task)
{
pthread_mutex_lock(&mutex_);
while (taskQueue_.empty() && isRunning_)
{
pthread_cond_wait(&condition_, &mutex_);
}
if (!isRunning_)
{
pthread_mutex_unlock(&mutex_);
break;
}
else if (taskQueue_.empty())
{
pthread_mutex_unlock(&mutex_);
continue;
}
assert(!taskQueue_.empty());
task = taskQueue_.front();
taskQueue_.pop_front();
pthread_mutex_unlock(&mutex_);
}
return task;
}
void* ThreadPool::threadFunc(void* arg)
{
pthread_t tid = pthread_self();
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while (pool->isRunning_)
{
Task* task = pool->take();
if (!task)
{
printf("thread %lu will exit\n", tid);
break;
}
assert(task);
task->run();
}
return 0;
}
//bool ThreadPool::getisRunning_(){return isRunning_;}
}
main.cpp
// pthreadPool.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <iostream>
#include <stdio.h>
//#include <unistd.h>
#include <windows.h>
#include <stdlib.h>
#include <pthread.h>
#include "ThreadPool.h"
using namespace std;
class Mytask : public lz::Task{
public:
Mytask(){ }
virtual int run(){
//printf("thread[%lu] :",pthread_self());
//printf("%s\n",arg_);
printf("%s thread[%lu] :%\n",arg_,pthread_self(),arg_);
//printf("%s :%s\n",arg_,arg_);
//
Sleep(1000);
return 0;
}
};
int main()
{
cout << "begin" << endl;
char szTmp[] = "hello world";
Mytask taskobj;
taskobj.setArg((char*)szTmp);
taskobj.setInt(4);
lz::ThreadPool threadPool(4);
threadPool.start();
for(int i = 0;i<8;i++){
threadPool.addTask(&taskobj);
}
while(1){
printf("there are still %d tasks need to process\n",threadPool.size());
if(threadPool.size()==0){
threadPool.stop();
printf("now i will exit from main\n");
exit(0);
}
Sleep(2000);
}
cout<<"end"<<endl;
return 0;
}
注意一下MyTask中的run函数。
一开始运行都是下面这个结果。
[threadID]:(null)
和同学讨论了很久,怀疑是构造函数,或者是继承、run函数传递的问题。
但是当我把输出这句话写成下面这种形式:
printf("%s thread[%lu] :%\n",arg_,pthread_self(),arg_);
输出的结果是
hello world [threadID]:(null)
搞不懂为什么在一个函数前后输出同一个成员变量的结果会不同。