
本文共 5252 字,大约阅读时间需要 17 分钟。
一、状态封装头文件
#ifndef CONDITION_H
#define CONDITION_H
#include <pthread.h>
//封装一个互斥量和条件变量作为状态
typedef struct condition
{
pthread_cond_t pcond;
pthread_mutex_t pmutex;
}condition_t;
int condition_init(condition_t* cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t cond);
int condition_timedwait(condition_t cond,const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif // CONDITION_H
二、状态封装实现
#include “condition.h”
int condition_init(condition_t *cond)
{
int status;
if(status=pthread_mutex_init(&cond->pmutex,NULL))
return status;
if(status=pthread_cond_init(&cond->pcond,NULL))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond,&cond->pmutex);
}
int condition_timedwait(condition_t *cond, const timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime);
}
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t *cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t *cond)
{
int status;
if(status=pthread_mutex_destroy(&cond->pmutex))
return status;
if(status=pthread_cond_destroy(&cond->pcond))
return status;
return 0;
}
三、线程池头文件
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include “condition.h”
typedef struct task
{
void* (*run)(void args);
void arg;
struct task *next;
}task_t;
typedef struct threadpool
{
condition_t ready;
task_t *taskfirst;
task_t *tasklast;
int counter;
int idle;
int max_threads;
int quit;
}threadpool_t;
void threadpool_init(threadpool_t *pool,int threads);
void threadpool_add_task(threadpool_t pool,void(*run)(void *args),void *arg);
void threadpool_destroy(threadpool_t *pool);
#endif // THREADPOOL_H
四、线程池实现
#include “threadpool.h”
#include <stdlib.h>
#include <stdio.h>
#include
#include <errno.h>
#include <time.h>
//创建线程的执行 pthread_create调用执行
void *thread_routine(void *arg)
{
struct itimerspec abstime;
int timeout;
printf(“thread %d is starting\n”,(int)pthread_self());
threadpool_t pool =(threadpool_t)arg;
while(1)
{
timeout=0;
//加锁 condition_lock(&pool->ready); pool->idle++; //没有任务,等待 while (pool->taskfirst==NULL&&!pool->quit) { printf("thread %d is waiting\n",(int)pthread_self()); //获取当前时间 clock_gettime(CLOCK_REALTIME,&abstime.it_value); abstime.it_value.tv_sec+=2; int status; status = condition_timedwait(&pool->ready,&abstime.it_value); if(status==ETIMEDOUT) { printf("thread %d is timeout\n",(int)pthread_self()); timeout =1; break; } } pool->idle--; if(pool->taskfirst!=NULL) { //取出任务 task_t *t = pool->taskfirst; pool->taskfirst=t->next; //解锁任务队列 condition_unlock(&pool->ready); //执行任务 t->run(t->arg); //释放内存 free(t); //重新加锁 condition_lock(&pool->ready); } //退出线程池,销毁线程 if(pool->quit&&pool->taskfirst==NULL) { pool->counter--; if(pool->counter==0) { condition_signal(&pool->ready); } condition_unlock(&pool->ready); break; } //超时销毁线程 if(timeout==1) { pool->counter--; condition_unlock(&pool->ready); break; } condition_unlock(&pool->ready);}printf("thread %d is exiting\n",(int)pthread_self());return NULL;
}
void threadpool_init(threadpool_t *pool, int threads)
{
pool->idle=0;
pool->quit=0;
pool->counter=0;
pool->tasklast=NULL;
pool->taskfirst=NULL;
pool->max_threads=threads;
condition_init(&pool->ready);
}
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *), void *arg)
{
task_t newtask = (task_t)malloc(sizeof (task_t));
newtask->next=NULL;
newtask->run=run;
newtask->arg=arg;
//加锁,添加新任务condition_lock(&pool->ready);if(pool->taskfirst==NULL){ pool->taskfirst=newtask;}else{ pool->tasklast->next=newtask;}pool->tasklast=newtask;//如果有空闲线程,唤醒线程做任务if(pool->idle>0){ condition_signal(&pool->ready);}//没有空闲的线程,创建一个线程else if(pool->counter<pool->max_threads){ pthread_t tid; pthread_create(&tid,NULL,thread_routine,pool); pool->counter++;}//解锁共享资源condition_unlock(&pool->ready);
}
void threadpool_destroy(threadpool_t *pool)
{
if(pool->quit)
{
return;
}
//加锁
condition_lock(&pool->ready);
pool->quit=1;
if(pool->idle>0)
{
condition_broadcast(&pool->ready);
}
while(pool->counter)
{
condition_wait(&pool->ready);
}
//解锁
condition_unlock(&pool->ready);
//释放锁资源
condition_destroy(&pool->ready);
}
五、测试线程池
#include
#include “threadpool.h”
#include <unistd.h>
using namespace std;
void* mytask(void arg)
{
printf(“thread %d is working on task %d\n”,(int)pthread_self(),(int*)arg);
sleep(1);
free(arg);
return NULL;
}
int main()
{
threadpool_t pool;
threadpool_init(&pool,10);
for(int i=0;i<100;i++)
{
int arg = (int) malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool,mytask,arg);
}
threadpool_destroy(&pool);
return 0;
}
六、测试结果
发表评论
最新留言
关于作者
